Springboot+netty实现Web聊天室

网友投稿 295 2022-09-10


Springboot+netty实现Web聊天室

目录一、项目的创建二、代码编写三、运行效果

一、项目的创建

新建Spring项目:

选择JDK版本:

选择Spring Web:

项目名称和位置的设置:

二、代码编写

导入.jar包:

gson: https://search.maven.org/artifact/com.google.code.gson/gson/2.8.9/jar

DemoApplication:

package com.example.demo;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.ConfigurableApplicationContext;

import org.springframework.core.env.Environment;

import java.net.InetAddress;

import java.net.UnknownHostException;

@SpringBootApplication

public class DemoApplication {

public static void main(String[] args) throws UnknownHostException {

ConfigurableApplicationContext application = SpringApplication.run(DemoApplication.class, args);

Environment env = application.getEnvironment();

String host = InetAddress.getLocalHost().getHostAddress();

String port = env.getProperty("server.port");

System.out.println("[----------------------------------------------------------]");

System.out.println("聊天室启动成功!点击进入:\t http://" + host + ":" + port);

System.out.println("[----------------------------------------------------------");

WebSocketServer.inst().run(53134);

}

}

User:

package com.example.demo;

import java.util.Objects;

public class User {

public String id;

public String nickname;

public User(String id, String nickname) {

super();

this.id = id;

this.nickname = nickname;

}

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getNickname() {

return nickname;

}

public void setNickname(String nickname) {

this.nickname = nickname;

}

@Override

public boolean equals(Object o) {

if (this == o)

return true;

if (o == null || getClass() != o.getClass())

return false;

User user = (User) o;

return id.equals(user.getId());

}

@Override

public int hashCode() {

return Objects.hash(id);

}

public String getUid() {

return id;

}

}

SessionGroup:

package com.example.demo;

import com.google.gson.Gson;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.ChannelGroupFuture;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.util.concurrent.ImmediateEventExecutor;

import org.springframework.util.StringUtils;

import java.util.Iterator;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

public final class SessionGroup {

private static SessionGroup singleInstance = new SessionGroup();

// 组的映射

private ConcurrentHashMap groupMap = new ConcurrentHashMap<>();

public static SessionGroup inst() {

return singleInstance;

}

public void shutdownGracefully() {

Iterator groupIterator = groupMap.values().iterator();

while (groupIterator.hasNext()) {

ChannelGroup group = groupIterator.next();

group.close();

}

}

public void sendToOthers(Map result, SocketSession s) {

// 获取组

ChannelGroup group = groupMap.get(s.getGroup());

if (null == group) {

return;

}

Gson gson=new Gson();

String json = gson.toJson(result);

// 自己发送的消息不返回给自己

// Channel channel = s.getChannel();

// 从组中移除通道

// group.remove(channel);

ChannelGroupFuture future = group.writeAndFlush(new TextWebSocketFrame(json));

future.addListener(f -> {

System.out.println("完成发送:"+json);

// group.add(channel);//发送消息完毕重新添加。

});

}

public void addSession(SocketSession session) {

String groupName = session.getGroup();

if (StringUtils.isEmpty(groupName)) {

// 组为空,直接返回

return;

}

ChannelGroup group = groupMap.get(groupName);

if (null == group) {

group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

groupMap.put(groupName, group);

}

group.add(session.getChannel());

}

/**

* 关闭连接, 关闭前发送一条通知消息

*/

public void closeSession(SocketSession session, String echo) {

ChannelFuture sendFuture = session.getChannel().writeAndFlush(new TextWebSocketFrame(echo));

sendFuture.addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture future) {

System.out.println("关闭连接:"+echo);

future.channel().close();

}

});

}

/**

* 关闭连接

*/

public void closeSession(SocketSession session) {

ChannelFuture sendFuture = session.getChannel().close();

sendFuture.addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture future) {

System.out.println("发送所有完成:"+session.getUser().getNickname());

}

});

}

/**

* 发送消息

http:// * @param ctx 上下文

* @param msg 待发送的消息

*/

public void sendMsg(ChannelHandlerContext ctx, String msg) {

ChannelFuture sendFuture = ctx.writeAndFlush(new TextWebSocketFrame(msg));

sendFuture.addListener(f -> {//发送监听

System.out.println("对所有发送完成:"+msg);

});

}

}

SocketSession:

package com.example.demo;

import io.netty.channel.Channel;

import io.netty.channel.ChannelHandlerContext;

import io.netty.util.AttributeKey;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

public class SocketSession {

public static final AttributeKey SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");

/**

* 用户实现服务端会话管理的核心

*/

// 通道

private Channel channel;

// 用户

private User user;

// session唯一标示

private final String sessionId;

private String group;

/**

* session中存储的session 变量属性值

*/

private Map map = new HashMap();

public SocketSession(Channel channel) {//注意传入参数channel。不同客户端会有不同channel

this.channel = channel;

this.sessionId = buildNewSessionId();

channel.attr(SocketSession.SESSION_KEY).set(this);

}

// 反向导航

public static SocketSession getSession(ChannelHandlerContext ctx) {//注意ctx,不同的客户端会有不同ctx

Channel channel = ctx.channel();

return channel.attr(SocketSession.SESSION_KEY).get();

}

// 反向导航

public static SocketSession getSession(Channel channel) {

return channel.attr(SocketSession.SESSION_KEY).get();

}

public String getId() {

return sessionId;

}

private static String buildNewSessionId() {

String uuid = UUID.randomUUID().toString();

return uuid.replaceAll("-", "");

}

public synchronized void set(String key, Object value) {

map.put(key, value);

}

public synchronized T get(String key) {

return (T) map.get(key);

}

public boolean isValid() {

return getUser() != null ? true : false;

}

public User getUser() {

return user;

}

public void setUser(User user) {

this.user = user;

}

public String getGroup() {

return group;

}

public void setGroup(String group) {

this.group = group;

}

public Channel getChannel() {

return channel;

}

}

WebSocketServer:

package com.example.demo;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;

import io.netty.handler.stream.ChunkedWriteHandler;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class WebSocketServer {

private static WebSocketServer wbss;

private static final int READ_IDLE_TIME_OUT = 60; // 读超时

private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时

private static final int ALL_IDLE_TIME_ONTCfhQRvUT = 0; // 所有超时

public static WebSocketServer inst() {

return wbss = new WebSocketServer();

}

public void run(int port) {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer () {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// Netty自己的http解码器和编码器,报文级别 HTTP请求的解码和编码

pipeline.addLast(new HttpServerCodec());

// ChunkedWriteHandler 是用于大数据的分区传输

// 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的;

// 增加之后就不用考虑这个问题了

pipeline.addLast(new ChunkedWriteHandler());

// HttpObjectAggregator 是完全的解析Http消息体请求用的

// 把多个消息转换为一个单一的完全FullHttpRequest或是FullHttpResponse,

// 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent

pipeline.addLast(new HttpObjectAggregator(64 * 1024));

// WebSocket数据压缩

pipeline.addLast(new WebSocketServerCompressionHandler());

// WebSocketServerProtocolHandler是配置websocket的监听地址/协议包长度限制

pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024));

// 当连接在60秒内没有接收到消息时,就会触发一个 IdleStateEvent 事件,

// 此事件被 HeartbeatHandler 的 userEventTriggered 方法处理到

pipeline.addLast(

new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));

// WebSocketServerHandler、TextWebSocketFrameHandler 是自定义逻辑处理器,

pipeline.addLast(new WebSocketTextHandler());

}

});

Channel ch = b.bind(port).syncUninterruptibly().channel();

ch.closeFuture().syncUninterruptibly();

// 返回与当前Java应用程序关联的运行时对象

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

SessionGroup.inst().shutdownGracefully();

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

});

}

}

WebSocketTextHandler:

package com.example.demo;

import com.google.gson.Gson;

import com.google.gson.reflect.TypeToken;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import java.util.HashMap;

import java.util.Map;

public class WebSocketTextHandler extends SimpleChannelInboundHandler {

//@Override

protected void channelRead(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

SocketSession session = SocketSession.getSession(ctx);

TypeToken> typeToken = new TypeToken>() {

};

Gson gson=new Gson();

java.util.Map map = gson.fromJson(msg.text(), typeToken.getType());

User user = null;

switch (map.get("type")) {

case "msg":

Map result = new HashMap<>();

user = session.getUser();

result.put("type", "msg");

result.put("msg", map.get("msg"));

result.put("sendUser", user.getNickname());

SessionGroup.inst().sendToOthers(result, session);

break;

case "init":

String room = map.get("room");

session.setGroup(room);

String nick = map.get("nick");

user = new User(session.getId(), nick);

session.setUser(user);

SessionGroup.inst().addSession(session);

break;

}

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

// 是否握手成功,升级为 Websocket 协议

if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

// 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息

// 并把握手成功的 Channel 加入到 ChannelGroup 中

new SocketSession(ctx.channel());

} else if (evt instanceof IdleStateEvent) {

IdleStateEvent stateEvent = (IdleStateEvent) evt;

if (stateEvent.state() == IdleState.READER_IDLE) {

System.out.println("bb22");

}

} else {

super.userEventTriggered(ctx, evt);

}

}

@Override

protected void messageReceived(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

}

}

之后项目外创建一个test.html:

群名:



昵称:










版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:电商达人自爆需补税34万(电商补税案例)
下一篇:一口气Ping1000个IP地址,会发生什么事情?(ping ip地址出现一般故障)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~