springboot websocket集群(stomp协议)连接时候传递参数

网友投稿 898 2023-01-02


springboot websocket集群(stomp协议)连接时候传递参数

最近在公司项目中接到个需求。就是后台跟前端浏览器要保持长连接,后台主动往前台推数据。

网上查了下,websocket stomp协议处理这个很简单。尤其是跟springboot 集成。

但是由于开始是单机玩的,很顺利。

但是后面部署到生产搞集群的话,就会出问题了。

假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到。但是B节点由于没有跟浏览器A建立连接。B节点发的消息浏览器就收不到了。

网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的。

还有很多思路都是通过session获取信息的。但是这都不是我需要的。我需要的是从前台传递参数,连接的时候每个节点保存下。然后通过SimpleUserRegistry.getUser获取。

话不多说,直接上代码。

var WEB_SOCKET = {

topic : "",

url : "",

stompClient : null,

connect : function(url, topic, callback,userid) {

this.url = url;

this.topic = topic;

var socket = new SockJS(url); //连接SockJS的endpoint名称为"endpointOyzc"

WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端

WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//连接WebSocket服务端

// console.log('Connected:' + frame);

//通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息

WEB_SOCKET.stompClient.subscribe(topic, callback);

});

}

};

这是响应的前端代码。只需要引入两个js。调用new SockJS(url) 就代表跟服务器建立连接了。

@Configuration

//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样

@EnableWebSocketMessageBroker

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

@Autowired

private GetHeaderParamInterceptor getHeaderParamInterceptor;

@Override

//注册STOMP协议的节点(endpoint),并映射指定的url

public void registerStompEndpoints(StompEndpointRegistry registry) {

//注册一个STOMP的endpoint,并指定使用SockJS协议

registry.addEndpoint("/endpointOyzc")

.setAllowedOrigins("*")

.withSockJS();

/* registry.addEndpoint("/endpointOyzc")

.setAllowedOrigins("*")

.setHandshakeHandler(xlHandshakeHandler)

.withSockJS();*/

}

@Override

//配置消息代理(Message Broker)

public void configureMessageBroker(MessageBrokerRegistry registry) {

//点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理

registry.enableSimpleBroker("/topic", "/user");

// 全局使用的消息前缀(客户端订阅路径上会体现出来)

//registry.setApplicationDestinationPrefixes("/app");

//点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/

registry.setUserDestinationPrefix("/user");

}

/**

* 采用自定义拦截器,获取connect时候传递的参数

*

* @param registration

*/

@Override

public void configureClientInboundChannel(ChannelRegistration registration) {

registration.interceptors(getHeaderParamInterceptor);

}

}

注:上面的endpointOyzc就是前端的url。后面注册端点,前台链接。

然后注意下configureClientInboundChannel这个方法,这个方法里面注入拦截器就是为了链接时候接收参数的。

/**

* @author : hao

* @description : websocket建立链接的时候获取headeri里认证的参数拦截器。

* @time : 2019/7/3 20:42

*/

@Component

public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter {

@Override

public Message> preSend(Message> message, MessageChannel channel) {

StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

if (StompCommand.CONNECT.equals(accessor.getCommand())) {

Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);

if (raw instanceof Map) {

Object name = ((Map) raw).get("userid");

if (name instanceof LinkedList) {

// 设置当前访问的认证用户

accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString()));

}

}

}

return message;

}

}

/**

* @author : hao

* @description : 自定义的java.security.Principal

* @time : 2019/7/3 20:42

*/

public class JqxxPrincipal implements Principal {

private String loginName;

public JqxxPrincipal(String loginName) {

this.loginName = loginName;

}

@Override

public String getName() {

return loginName;

}

}

这样就存入的前台传的参数。

后台发消息的时候怎么发呢?

/**

* @author : hao

* @description : websocket发送代理,负责发送消息

* @time : 2019/7/4 11:01

*/

@Component

@Slf4j

public class WebsocketSendProxy {

@Autowired

private SimpMessagingTemplate template;

@Autowired

private SimpUserRegistry userRegistry;

@Resource(name = "redisServiceImpl")

private RedisService redisService;

@Value("spring.redis.message.topic-name")

private String topicName;

public void sendMsg(RedisWebsocketMsg redisWebsocketMsg) {

SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());

log.info("发送消息前获取接收方为{},根据Registry获取本节点上这个用户{}", redisWebsocketMsg.getReceiver(), simpUser);

if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {

//2. 获取WebSocket客户端的订阅地址

WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());

if (channelEnum != null) {

//3. 给WebSocket客户端发送消息

template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());

}

} else {

//给其他订阅了主题的节点发消息,因为本节点没有

redisService.convertAndSend(topicName, redisWebsocketMsg);

}

}

}

可以发现上面代码利用了redis监听模型,也就是redis模型的消息队列

/**

* @author : hao

* @description : redis消息监听实现类,接收处理类

* @time : 2019/7/3 14:00

*/

@Component

@Slf4j

public class MessageReceiver {

@Autowired

private SimpMessagingTemplate messagingTemplate;

@Autowired

private SimpUserRegistry userRegistry;

/**

* 处理WebSocket消息

*/

public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {

log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));

//1. 取出用户名并判断是否连接到当前应用节点的WebSocket

SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());

if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {

//2. 获取WebSocket客户端的订阅地址

WebSocketChannelEnum channelEnum = WebSocketChannelEnum.http://fromCode(redisWebsocketMsg.getChannelCode());

if (channelEnum != null) {

//3. 给WebSocket客户端发送消息

messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());

}

}

}

}

redis消息模型只贴部分代码就好了

/**

* 消息监听器

*/

@Bean

MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer){

//消息接收者以及对应的默认处理方法

MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");

//消息的反序列化方式

messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

return messageListenerAdapter;

}

/**

* message listener container

*/

@Bean

RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory

, MessageLIwIJlaYVistenerAdapter messageListenerAdapter){

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

//添加消息监听器

container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));

return container;

}

上面的思路大体如下:客户端简历链接时候,传过来userid保存起来。发消息的时候 通过userRegistry获取,能获取到就证明是跟本节点建立的链接,直接用本节点发消息就好了。

如果不是就利用redis消息队列,把消息推出去。每个节点去判断获取看下是不是本节点的userid。这样就实现了集群的部署。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:服务器接口测试工具(常见接口测试工具)
下一篇:服务器的接口测试工具(服务器端口测试工具)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~