Java实战之用springboot+netty实现简单的一对一聊天

网友投稿 361 2022-10-27


Java实战之用springboot+netty实现简单的一对一聊天

一、引入pom

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.chat.info

chat-server

1.0-SNAPSHOT

org.springframework.boot

spring-boot-starter-parent

2.1.4.RELEASE

UTF-8

1.8

org.springframework.boot

spring-boot-starter-web

io.netty

netty-all

4.1.33.Final

org.projectlombok

lombok

com.alibaba

fastjson

1.2.56

org.springframework.boot

spring-boot-starter-thymeleaf

org.springframework.boot

spring-boot-maven-plugin

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.chat.info

chat-server

1.0-SNAPSHOT

org.springframework.boot

spring-boot-starter-parent

2.1.4.RELEASE

UTF-8

1.8

org.springframework.boot

spring-boot-starter-web

io.netty

netty-all

4.1.33.Final

org.projectlombok

lombok

com.alibaba

fastjson

1.2.56

org.springframework.boot

spring-boot-starter-thymeleaf

org.springframework.boot

spring-boot-maven-plugin

二、创建netty 服务端

package com.chat.server;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;

@Component

@Slf4j

public class ChatServer {

private EventLoopGroup bossGroup;

private EventLoopGroup workGroup;

private void run() throws Exception {

log.info("开始启动聊天服务器");

bossGroup = new NioEventLoopGroup(1);

workGroup = new NioEventLoopGroup();

try {

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChatServerInitializer());

//启动服务器

ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();

log.info("开始启动聊天服务器结束");

channelFuture.channel().closeFuture().sync();

} finally {

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

/**

* 初始化服务器

*/

@PostConstruct()

public void init() {

new Thread(() -> {

try {

run();

} catch (Exception e) {

e.printStackTrace();

}

}).start();

}

@PreDestroy

public void destroy() throws InterruptedException {

if (bossGroup != null) {

bossGroup.shutdownGracefully().sync();

}

if (workGroup != null) {

workGroup.shutdownGracefully().sync();

}

}

}

package com.chat.server;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.stream.ChunkedWriteHandler;

public class ChatServerInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

//使用http的编码器和解码器

pipeline.addLast(new HttpServerCodec());

//添加块处理器

pipeline.addLast(new ChunkedWriteHandler());

pipeline.addLast(new HttpObjectAggregator(8192));

pipeline.addLast(new WebSocketServerProtocolHandler("/chat"));

//自定义handler,处理业务逻辑

pipeline.addLast(new ChatServerHandler());

}

}

package com.chat.server;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.chat.config.ChatConfig;

import io.netty.channel.Channel;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.util.AttributeKey;

import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;

@Slf4j

public class ChatServerHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

//传过来的是json字符串

String text = textWebSocketFrame.text();

JSONObject jsonObject = JSON.parseObject(text);

//获取到发送人的用户id

Object msg = jsonObject.get("msg");

String userId = (String) jsonObject.get("userId");

Channel channel = channelHandlerContext.channel();

if (msg == null) {

//说明是第一次登录上来连接,还没有开始进行聊天,将uid加到map里面

register(userId, channel);

} else {

//有消息了,开始聊天了

sendMsg(msg, userId);

}

}

/**

* 第一次登录进来

*

* @param userId

* @param channel

*/

private void register(String userId, Channel channel) {

if (!ChatConfig.concurrentHashMap.containsKey(userId)) { //没有指定的userId

ChatConfig.concurrentHashMap.put(userId, channel);

// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID

AttributeKey key = AttributeKey.valueOf("userId");

channel.attr(key).setIfAbsent(userId);

}

}

/**

* 开发发送消息,进行聊天

*

* @param msg

* @param userId

*/

private void sendMsg(Object msg, String userId) {

Channel channel1 = ChatConfig.concurrentHashMap.get(userId);

if (channel1 != null) {

channel1.writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg));

}

}

/**

* 一旦客户端连接上来,该方法被执行

*

* @param ctx

* @throws Exception

*/

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

log.info("handlerAdded 被调用" + ctx.channel().id().asLongText());

}

/**

* 断开连接,需要移除用户

*

* @param ctx

* @throws Exception

*/

@Override

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

removeUserId(ctx);

}

/**

* 移除用户

*

* @param ctx

*/

private void removeUserId(ChannelHandlerContext ctx) {

Channel channel = ctx.channel();

AttributeKey key = AttributeKey.valueOf("userId");

String userId = channel.attr(key).get();

ChatConfig.concurrentHashMap.remove(userId);

log.info("用户下线,userId:{}", userId);

}

/**

* 处理移除,关闭通道

*

* @param ctx

* @param cause

* @throws Exception

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

三、存储用户channel 的map

package com.chat.config;

import io.netty.channel.Channel;

import java.util.concurrent.ConcurrentHashMap;

public class ChatConfig {

public static ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();

}

四、客户端html

五、controller 模拟用户登录以及要发送信息给谁

package com.chat.controller;

import com.chat.config.ChatConfig;

import io.netty.channel.Channel;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import org.springframework.stereotype.Controller;

import org.springframework.ui.Model;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestParam;

@Controller

public class ChatController {

@GetMapping("login")

public String login(Model model, @RequestParam("userId") String userId, @RequestParam("sendId") String sendId) {

model.addAttribute("userId", userId);

model.addAttribute("sendId", sendId);

return "chat";

}

@GetMapping("sendMsg")

public String login(@RequestParam("sendId") String sendId) throws InterruptedException {

while (true) {

Channel channel = ChatConfig.concurrentHashMap.get(sendId);

if (channel != null) {

channel.writeAndFlush(new TextWebSocketFrame("test"));

Thread.sleep(1000);

}

}

}

}

六、测试

登录成功要发消息给bbb

登录成功要发消息给aaa


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:OSPF相关知识梳理
下一篇:tcp-ip第一章
相关文章

 发表评论

暂时没有评论,来抢沙发吧~