SpringBoot+Netty+WebSocket实现消息发送的示例代码

网友投稿 370 2022-11-21


SpringBoot+Netty+WebSocket实现消息发送的示例代码

一.导入Netty依赖

io.netty

netty-all

4.1.25.Final

二.搭建websocket服务器

@Component

public class WebSocketServer {

/**

* 主线程池

*/

private EventLoopGroup bossGroup;

/**

* 工作线程池

*/

private EventLoopGroup workerGroup;

/**

* 服务器

*/

private ServerBootstrap server;

/**

* 回调

*/

private ChannelFuture future;

public void start() {

future = server.bind(9001);

System.out.println("netty server - 启动成功");

}

public WebSocketServer() {

bossGroup = new NioEventLoopGroup();

workerGroup = new NioEventLoopGroup();

server = new ServerBootstrap();

server.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new WebsocketInitializer());

}

}

三.初始化Websocket

public class WebsocketInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// ------------------

// 用于支持Http协议

// ------------------

// websocket基于http协议,需要有http的编解码器

pipeline.addLast(new HttpServerCodec());

// 对写大数据流的支持

pipeline.addLast(new ChunkedWriteHandler());

// 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用

//设置单次请求的文件的大小

pipeline.addLast(new HttpObjectAggregator(1024 * 64));

//webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws

pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

// 添加Netty空闲超时检查的支持

// 1. 读空闲超时(超过一定的时间会发送对应的事件消息)

// 2. 写空闲超时

// 3. 读写空闲超时

pipeline.addLast(new IdleStateHandler(4, 8, 12));

//添加心跳处理

pipeline.addLast(new HearBeatHandler());

// 添加自定义的handler

pipeline.addLast(new ChatHandler());

}

}

四.创建Netty监听器

@Component

public class NettyListener implements ApplicationListener {

@Resource

private WebSocketServer websocketServer;

@Override

public void onApplicationEvent(ContextRefreshedEvent event) {

if(event.getApplicationContext().getParent() == null) {

try {

websocketServer.start();

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

五.建立消息通道

public class UserChannelMap {

/**

* 用户保存用户id与通道的Map对象

*/

// private static Map userChannelMap;

/* static {

userChannelMap = new HashMap();

}*/

/**

* 定义一个channel组,管理所有的channel

* GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例

*/

private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**

* 存放用户与Chanel的对应信息,用于给指定用户发送消息

*/

private static ConcurrentHashMap userChannelMap = new ConcurrentHashMap<>();

private UserChannelMap(){}

/**

* 添加用户id与channel的关联

* @param userNum

* @param channel

*/

public static void put(String userNum, Channel channel) {

userChannelMap.put(userNum, channel);

}

/**

* 根据用户id移除用户id与channel的关联

* @param userNum

*/

public static void remove(String userNum) {

userChannelMap.remove(userNum);

}

/**

* 根据通道id移除用户与channel的关联

* @param channelId 通道的id

*/

public static void removeByChannelId(String channelId) {

if(!StringUtils.isNotBlank(channelId)) {

return;

}

for (String s : userChannelMap.keySet()) {

Channel channel = userChannelMap.get(s);

if(channelId.equals(channel.id().asLongText())) {

System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");

userChannelMap.remove(s);

UserService userService = SpringUtil.getBean(UserService.class);

userService.logout(s);

break;

}

}

}

/**

* 打印所有的用户与通道的关联数据

*/

public static void print() {

for (String s : userChannelMap.keySet()) {

System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());

}

}

/**

* 根据好友id获取对应的通道

* @param receiverNum 接收人编号

* @return Netty通道

*/

public static Channel get(String receiverNum) {

return userChannelMap.get(receiverNum);

}

/**

* 获取channel组

* @return

*/

public static ChannelGroup getChannelGroup() {

return channelGroup;

}

/**

* 获取用户channel map

* @return

*/

public static ConcurrentHashMap getUserChannelMap(){

return userChannelMap;

}

}

六.自定义消息类型

public class Message {

/**

* 消息类型

*/

private Integer type;

/**

* 聊天消息

*/

private String message;

/*http://*

* 扩展消息字段

*/

private Object ext;

public Integer getType() {

return type;

}

public void setType(Integer type) {

this.type = type;

}

public MarketChatRecord getChatRecord() {

return marketChatRecord;

}

public void setChatRecord(MarketChatRecord chatRecord) {

this.marketChatRecord = chatRecord;

}

public Object getExt() {

return ext;

}

public void setExt(Object ext) {

this.ext = ext;

}

@Override

public String toString() {

return "Message{" +

"type=" + type +

", marketChatRecord=" + marketChatRecord +

", ext=" + ext +

'}';

}

}

七.创建处理消息的handler

public class ChatHandler extends SimpleChannelInboundHandler {

private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

/**

* 用来保存所有的客户端连接

*/

private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**

*当Channel中有新的事件消息会自动调用

*/

@Override

protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

// 当接收到数据后会自动调用

// 获取客户端发送过来的文本消息

Gson gson = new Gson();

log.info("服务器收到消息:{}",msg.text());

System.out.println("接收到消息数据为:" + msg.text());

Message message = gson.fromjson(msg.text(), Message.class);

//根据业务要求进行消息处理

switch (message.getType()) {

// 处理客户端连接的消息

case 0:

// 建立用户与通道的关联

// 处理客户端发送好友消息

break;

case 1:

// 处理客户端的签收消息

break;

case 2:

// 将消息记录设置为已读

break;

case 3:

// 接收心跳消息

break;

default:

break;

}

}

// 当有新的客户端连接服务器之后,会自动调用这个方法

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());

// 添加到channelGroup 通道组

UserChannelMap.getChannelGroup().add(ctx.channel());

// clients.add(ctx.channel());

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

log.info("{异常:}"+cause.getMessage());

// 删除通道

UserChannelMap.getChannelGroup().remove(ctx.channel());

UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());

ctx.channel().close();

}

@Override

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());

//删除通道

UserChannelMap.getChannelGroup().remove(ctx.channel());

UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());

UserChannelMap.print();

}

}

八.处理心跳

public class HearBeatHandler extends ChannelInboundHandlerAdapter {

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if(evt instanceof IdleStateEvent) {

IdleStateEvent idleStateEvent = (IdleStateEvent)evt;

if(idleStateEvent.state() == IdleState.READER_IDLE) {

System.out.println("读空闲事件触发...");

}

else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {

System.out.println("写空闲事件触发...");

}

else if(idleStateEvent.state() == IdleState.ALL_IDLE) {

System.out.println("---------------");

System.out.println("读写空闲事件触发");

System.out.println("关闭通道资源");

ctx.channel().close();

}

}

}

}

搭建完成后调用测试

1.页面访问http://localhost:9001/ws

 2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。

3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。

消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java中的使用及连接Redis数据库(附源码)
下一篇:讲解ssm框架整合(最通俗易懂)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~