手写一个NIO群聊系统

网友投稿 234 2022-10-13


手写一个NIO群聊系统

一、浅谈NIO

1. 什么是NIO?

SelectionKey

SelectionKey,表示Selector和网络通道的注册关系,共四种: int OP_ACCEPT:有新的网络连接可以 accept,值为 16 int OP_CONNECT:代表连接已经建立,值为 8 int OP_READ:代表读操作,值为 1 int OP_WRITE​​​:代表写操作,值为 ​​​4​​ SelectionKey 相关方法

ServerSocketChannel

二、群聊系统开发

1. 原理理解模型

服务端:

package com.fyp.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; /** * @Auther: fyp * @Date: 2022/2/5 * @Description: NIO服务器 * @Package: com.fyp.nio * @Version: 1.0 */ public class NIOServer { public static void main(String[] args) throws IOException { //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selector对象 Selector selector = Selector.open(); //绑定一个端口6666,在服务端监听 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置为非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector, 关注事件 为 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //循环等待客户端连接 while (true) { //这里我们等待1秒,如果没有事件发生(连接事件) if (selector.select(1000) == 0) {//没有事件发生 System.out.println("服务器等待了1秒,无连接"); continue; } //如果返回的 > 0, 就获取到相关的 selectionKey集合 //1. 如果返回的 > 0, 表示已经获取到关注的事件 //2. selector.selectedKeys() 返回关注事件的集合 //通过 selectionKeys 反向获取通道 Set selectionKeys = selector.selectedKeys(); //遍历 Set, 使用迭代器 Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //获取到selectionKey SelectionKey key = keyIterator.next(); //根据key 对应的通道发生的事件做相应处理 if (key.isAcceptable()) { //如果是OP_ACCEPT, 表示新的客户端连接 //给该客户端生成一个 SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客户端连接成功!生成了一个socketChannel " + socketChannel.hashCode()); //将 SocketChannel 设置为非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到 selector上, 关注事件为OP_READ, 同时给socketChannel //关联一个Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); } if (key.isReadable()) { //发生OP_READ // 通过key 反向获取对应的channel SocketChannel channel = (SocketChannel) key.channel(); //获取该channel关联的 buffer,在与客户端连接就已经创建好了 ByteBuffer buffer = (ByteBuffer) key.attachment(); channel.read(buffer); System.out.println("from 客户端: " + new String(buffer.array())); } //手动从集合中移动当前的selectionKey, 防止重复操作 keyIterator.remove(); } } } }

客户端:

package com.fyp.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * @Auther: fyp * @Date: 2022/2/6 * @Description: 客户端 * @Package: com.fyp.nio * @Version: 1.0 */ public class NIOClient { public static void main(String[] args) throws IOException { //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务端的 ip 和 端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); /* 连接服务器 1. 为非阻塞模式时,即不会等到方法执行完毕再返回,会立即返回,如果返回前已经连接成功,则返回true 返回false 时,说明未连接成功,所以需要再通过while循环地finishConnect()完成最终的连接 2. 为阻塞模式时,直到连接建立或抛出异常 不会返回false,连接不上就抛异常,不需要借助finishConnect() */ if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作"); } } //如果连接成功,就发送数据 String str = "hello, 尚硅谷"; ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); //发送数据,将buffer 写入 channel socketChannel.write(buffer); System.in.read(); } }

2. 开发模型

实现要求:

服务器端:可以监测用户上线,离线,并实现消息转发功能 客户端:通过 Channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到)

服务端:

package com.fyp.nio.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; /** * @Auther: fyp * @Date: 2022/2/6 * @Description: 群聊系统服务端 * @Package: com.fyp.nio.groupchat * @Version: 1.0 */ public class GroupChatServer { private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; public GroupChatServer() { try { selector = Selector.open(); listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(PORT)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public void listen() { try { while (true) { // count 获取的是 在阻塞过程中 同时发生的 事件 数,直到有事件 发生,才会执行,否则一直阻塞 /* select 方法在 没有 客户端发起连接时, 会一直阻塞,至少有一个客户端连接,其他 客户端再 发起连接 不再阻塞,会立即返回 */ int count = selector.select(); System.out.println(count); if(count > 0) {// 有事件 处理 // 遍历得到 SelectionKey 集合 Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //取出SelectionKey SelectionKey key = iterator.next(); //监听到accept if (key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); // 监听到 客户端 的 SocketChannel 总是默认为阻塞方式,需要重新设置 sc.configureBlocking(false); //将该 sc 注册到 selector sc.register(selector, SelectionKey.OP_READ); System.out.println(sc.getRemoteAddress() + " 上线 "); } if (key.isReadable()) { // 通道发送 read 事件, 即通道是可读状态 //处理读 readData(key); } /* 每次 监听到 客户端后, selector会将 连接上的 客户端 选中, 并添加到 selectionKeys 中 要注册到 selector 上,使用该方法,selector 将 不再选中 如果没有移除,selector 不能选中 其他的 客户端连接 iterator.remove() 移除后,将释放 selector 中的 selectionKeys */ iterator.remove(); } } else { //System.out.println("等待...."); } } } catch (IOException e) { e.printStackTrace(); } finally { // 发送异常处理 } } public void readData(SelectionKey key) { // 取到关联的 channel SocketChannel channel = null; try { // 得到 channel channel = (SocketChannel) key.channel(); // 创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); // 根据 count 的值 做 处理 if (count > 0) { // 把 缓冲区 的 数据 转成 字符串 String msg = new String(buffer.array()); // 输出该消息 System.out.println("from 客户端: " + msg); //想其他客户端转发消息,专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } } catch (IOException e) { //e.printStackTrace(); try { System.out.println(channel.getRemoteAddress() + "离线了"); // 取消 注册 key.cancel(); // 关闭通道 channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } // 转发消息给其他客户端(通道) private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException { System.out.println("服务器转发消息中...."); // 遍历所有 注册到 selector 上 的 SockChannel, 并排除 self for (SelectionKey key : selector.keys()) { // 通过 key 取出 对应的 SocketChannel Channel targetChannel = key.channel(); // 排除自己 if (targetChannel instanceof SocketChannel && targetChannel != self) { // 转型 SocketChannel dest = (SocketChannel) targetChannel; // 将 msg 存储到 buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); // 将 buffer 的数据 写入 通道 dest.write(buffer); } } } public static void main(String[] args) { // 创建 服务器 对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }

客户端:

package com.fyp.nio.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; /** * @Auther: fyp * @Date: 2022/2/7 * @Description: 群聊系统客户端 * @Package: com.fyp.nio.groupchat * @Version: 1.0 */ public class GroupChatClient { //定义 相关 属性 private final String HOST = "127.0.0.1"; private final int PORT = 6667; private Selector selector; private SocketChannel socketChannel; private String username; // 构造器,完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); // 连接 服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); // 设置 非阻塞 socketChannel.configureBlocking(false); // 将 channel 注册到 selector socketChannel.register(selector, SelectionKey.OP_READ); // 得到 username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok...."); } // 向 服务器 发送 消息 public void sendInfo(String info) { info = username + " 说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (Exception e) { } } // 读取从 服务器 端 回复的 消息 public void readInfo() { try { int readChannels = selector.select(); if (readChannels > 0) {// 有可以用的 通道 Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { // 得到 相关的 通道 SocketChannel sc = (SocketChannel) key.channel(); // 得到一个 Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取 sc.read(buffer); // 把缓存区的数据 转成 字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove();// 删除当前的selectionKey, 防止重复操作 } else { //System.out.println("没有可以用的通道...."); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { // 启动 客户端 GroupChatClient chatClient = new GroupChatClient(); // 启动一个线程,每隔3秒, 读取从 服务器 发送过来的数据 new Thread() { @Override public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } }.start(); // 发送数据给 服务端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }

三、结束语

与CSDN同步,没有找到所要相关可通过在 CSDN 嗝屁小孩纸 主页搜索相关文章,或者可以转告我,不理解的可以互相探讨! 评论区可留言,可私信,可互相交流学习,共同进步,欢迎各位给出意见或评价,本人致力于做到优质文章,希望能有幸拜读各位的建议!

专注品质,热爱生活。交流技术,奢求同志。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:idea一招搞定同步所有配置(导入或导出所有配置)
下一篇:EasyDSS平台无法登录Web页面的排查与解决方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~