Spring Cloud微服务使用webSocket的方法

网友投稿 871 2022-10-17


Spring Cloud微服务使用webSocket的方法

webSocket

webSocket长连接是一种在单个tcp连接上进行全双工通信的协议,允许双向数据推送。一般微服务提供的restful API只是对前端请求做出相应。使用webSocket可以实现后端主动向前端推送消息。

网关配置

spring cloud 的网关组件有zuul和getway

getway

base:

config:

nacos:

nacoshost: localhost

port: 8848

spring:

application:

name: gateway

main:

allow-bean-definition-overriding: true

cloud:

nacos:

discovery:

server-addr: ${base.config.nacos.nacoshost}:${base.config.nacos.port}

gateway:

discovery:

locator:

enabled: true

routes:

# websocket

- id: CLOUD-WEBSOCKET

uri: lb:ws://cloud-websocket

predicates:

- Path=/cloud-websocket/**

server:

port: 8888

配置网关的时候注意添加ws协议。

zuul

zuul只能管理http请求,不推荐使用zuul管理websocket连接,推荐直连。

服务端

添加maven依赖

org.springframework.boot

spring-boot-starter-websocket

添加webSocket 配置

@Configuration

@EnableWebSocket

public class WebsocketConfiguration implements WebSocketConfigurer {

@Override

public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

// webSocket通道

// 指定处理器和路径

registry.addHandler(new WebSocketHandler(), "/websocket")

// 指定自定义拦截器

.addInterceptors(new WebSocketInterceptor())

// 允许跨域

.setAllowedOrigins("*");

// sockjs通道

registry.addHandler(new WebSocketHandler(), "/sock-js")

.addInterceptors(new WebSocketInterceptor())

.setAllowedOrigins("*")

// 开启sockJs支持

.withSockJS();

}

}

添加处理器

package com.auexpress.cloud.handler;

import com.alibaba.fastjson.JSONObject;

import org.apache.commons.lang3.StringUtils;

import org.springframework.web.socket.*;

import org.springframework.web.socket.handler.AbstractWebSocketHandler;

import java.io.IOException;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

/**

* @Description

* @ClassName WebSocketHandler

* @Author HYSong

* @date 2020.04.14 10:08

*/

public class WebSocketHandler extends AbstractWebSocketHandler {

/**

* 存储sessionId和webSocketSession

* 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储

* 在分布式系统中,要想别的办法实现webSocketSession共享

*/

private static Map sessionMap = new ConcurrentHashMap<>();

private static Map userMap = new ConcurrentHashMap<>();

/**

* webSocket连接创建后调用

*/

@Override

public void afterConnectionEstablished(WebSocketSession session) {

// 获取参数

String user = String.valueOf(session.getAttributes().get("user"));

userMap.put(user, session.getId());

sessionMap.put(session.getId(), session);

}

/**

* 接收到消息会调用

*/

@Override

public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {

JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());

String content = jsonObject.getString("content");

String targetAdminId = jsonObject.getString("targetId");

if("0".equals(targetAdminId)){

// 推送给所有人

userMap.forEach((key,value)->{

try {

this.sendMessage(key,content);

} catch (IOException e) {

e.printStackTrace();

}

});

}else{

sendMessage("1", content);

}

}

/**

* 连接出错会调用

*/

@Override

public void handleTransportError(WebSocketSession session, Throwable exception) {

sessionMap.remove(session.getId());

}

/**

* 连接关闭会调用

*/

@Override

public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {

sessionMap.remove(session.getId());

}

@Override

public boolean supportsPartialMessages() {

return false;

}

/**

* 后端发送消息

*/

public void sendMessage(String user, String message) throws IOException {

String sessionId = userMap.get(user);

if (StringUtils.isEmpty(sessionId)refhTYN) {

return;

}

WebSocketSession session = sessionMap.get(sessionId);

if (session == null) {

return;

}

session.sendMessage(new TextMessage(message));

}

}

添加拦截器

package com.auexpress.cloud.interceptor;

import org.springframework.http.server.ServerHttpRequest;

import org.springframework.http.server.ServerHttpResponse;

import org.springframework.http.server.ServletServerHttpRequest;

import org.springframework.web.socket.WebSocketHandler;

import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.Map;

/**

* @Description

* @ClassName WebSocketInterceptor

* @Author HYSong

* @date 2020.04.14 10:09

*/

public class WebSocketInterceptor implements HandshakeInterceptor {

/**

* handler处理前调用,attributes属性最终在WebSocketSession里,

* 可能通过webSocketSession.getAttributes().get(key值)获得

*/

@Override

public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) {

if (request instanceof ServletServerHttpRequest) {

ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;

// 获取请求路径携带的参数

String user = serverHttpRequest.getServletRequest().getParameter("user");

attributes.put("user", user);

return true;

} else {

return false;

}

}

@Override

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网络工程师成长日记167-并没有立即发大财的岗位
下一篇:网络osi七层复习,未复习整理完,后续补齐
相关文章

 发表评论

暂时没有评论,来抢沙发吧~