Java实现NIO聊天室的示例代码(群聊+私聊)

网友投稿 266 2022-10-25


Java实现NIO聊天室的示例代码(群聊+私聊)

功能介绍

功能:群聊+私发+上线提醒+下线提醒+查询在线用户

文件

Utils

需要用maven导入下面两个包

org.projectlombok

lombok

1.16.18

ch.qos.logback

logback-classic

1.2.3

package moremorechat_nio;

import lombok.extern.slf4j.Slf4j;

import java.io.*;

/**

* @author mazouri

* @create 2021-05-09 22:26

*/

@Slf4j

public class Utils {

/**

* 将二进制数据转为对象

*

* @param buf

* @return

* @throws IOException

* @throws ClassNotFoundException

*/

public static Message decode(byte[] buf) throws IOException, ClassNotFoundException {

ByteArrayInputStream bais = new ByteArrayInputStream(buf);

ObjectInputStream ois = new ObjectInputStream(bais);

return (Message) ois.readObject();

}

/**

* 将对象转为二进制数据

*

* @param message

* @return

*/

public static byte[] encode(Message message) throws IOException {

ByteArrayOutputStream baos = new ByteArrayOutputStream();

ObjectOutputStream oos = new ObjectOutputStream(baos);

oos.writeObject(message);

oos.flush();

return baos.toByteArray();

}

}

FinalValue

package moremorechat_nio;

/**

* @author mazouri

* @create 2021-05-05 21:00

*/

public final class FinalValue {

/**

* 系统消息

*/

public static final int MSG_SYSTEM = 0;

/**

* 群发消息

*/

public static final int MSG_GROUP = 1;

/**

* 私发消息

*/

public static final int MSG_PRIVATE = 2;

/**

* 客户端请求在线人员

*/

public static final int MSG_ONLINE = 3;

/**

* 客户端将用户名称发送给服务端

*/

public static final int MSG_NAME = 4;

}

Message

package moremorechat_nio;

import java.io.Serializable;

/**

* @author mazouri

* @create 2021-05-05 21:00

*/

public class Message implements Serializable {

public int type;

public String message;

public Message() {

}

public Message(String message) {

this.message = message;

}

public Message(int type, String message) {

this.type = type;

this.message = message;

}

@Override

public String toString() {

return "Message{" +

"type=" + type +

", message='" + message + '\'' +

'}';

}

}

NioServer

package moremorechat_nio;

import lombok.extern.slf4j.Slf4j;

import java.io.*;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.ArrayList;

import java.util.Iterator;

import java.util.Set;

import java.util.concurrent.atomic.AtomicBoolean;

import java.util.stream.Collectors;

import static moremorechat_nio.FinalValue.*;

/**

* ctrl+f12 方法

* ctrl+alt+左键

* @author mazouri

* @create 2021-05-09 19:24

*/

@Slf4j

public class NioServer {

private Selector selector;

private ServerSocketChannel ssc;

public NioServer() {

try {

// 创建 selector, 管理多个 channel

selector = Selector.open();

//打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父通道

ssc = ServerSocketChannel.open();

ssc.bind(new InetSocketAddress(8888));

//设置连接为非堵塞模式

ssc.configureBlocking(false);

// 2. 建立 selector 和 channel 的联系(注册)

// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件

//将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件

ssc.register(selector, SelectionKey.OP_ACCEPT);

} catch (IOException e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

NioServer server = new NioServer();

log.debug("server启动完成,等待用户连接...");

try {

server.listen();

} catch (Exception e) {

log.debug("发生了一些问题");

}

}

/**

* 监听用户的连接

*

* @throws Exception

*/

private void listen() throws Exception {

while (true) {

// select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行, 通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)

//通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)

// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理

selector.select();

// 处理事件, selectedKeys 内部包含了所有发生的事件

Iterator iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题

iterator.remove();

// 区分事件类型

if (key.isAcceptable()) {

ServerSocketChannel channel = (ServerSocketChannel) key.channel();

SocketChannel sc = channel.accept();

sc.configureBlocking(false);

sc.register(selector, SelectionKey.OP_READ);

} else if (key.isReadable()) {

dealReadEvent(key);

}

}

}

}

/**

* 处理读事件

*

* @param key

*/

private void dealReadEvent(SelectionKey key) {

SocketChannel channel = null;

try {

channel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);

int read = channel.read(buffer);

// 如果是正常断开,read 的方法的返回值是 -1

if (read == -1) {

//cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

key.cancel();

} else {

buffer.flip();

Message msg = Utils.decode(buffer.array());

log.debug(msg.toString());

dealMessage(msg, key, channel);

}

} catch (IOException | ClassNotFoundException e) {

System.out.println((key.attachment() == null ? "匿名用户" : key.attachment())http:// + " 离线了..");

dealMessage(new Message(MSG_SYSTEM, key.attachment() + " 离线了.."), key, channel);

//取消注册

key.cancel();

//关闭通道

try {

channel.close();

} catch (IOException ioException) {

ioException.printStackTrace();

}

}

}

/**

* 处理各种消息,并发送给客户端

*

* @param msg

* @param key

* @param channel

*/

private void dealMessage(Message msg, SelectionKey key, SocketChannel channel) {

switch (msg.type) {

case MSG_NAME:

key.attach(msg.message);

log.debug("用户{}已上线", msg.message);

getConnectedChannel(channel).forEach(selectionKey -> {

SocketChannel sc = (SocketChannel) selectionKey.channel();

sendMsgToClient(new Message("收到一条系统消息: " + msg.message + "已上线"), sc);

});

break;

case MSG_GROUP:

getConnectedChannel(channel).forEach(selectionKey -> {

SocketChannel sc = (SocketChannel) selectionKey.channel();

sendMsgToClient(new Message(key.attachment() + "给大家发送了一条消息: " + msg.message), sc);

});

break;

case MSG_PRIVATE:

String[] s = msg.message.split("_");

AtomicBoolean flag = new AtomicBoolean(false);

getConnectedChannel(channel).stream().filter(sk -> s[0].equals(sk.attachment())).forEach(selectionKey -> {

SocketChannel sc = (SocketChannel) selectionKey.channel();

sendMsgToClient(new Message(key.attachment() + "给你发送了一条消息: " + s[1]), sc);

flag.set(true);

});

if (!flag.get()){

sendMsgToClient(new Message(s[1]+"用户不存在,请重新输入!!!"), channel);

}

break;

case MSG_ONLINE:

ArrayList onlineList = new ArrayList<>();

onlineList.add((String) key.attachment());

getConnectedChannel(channel).forEach(selectionKey -> onlineList.add((String) selectionKey.attachment()));

sendMsgToClient(new Message(onlineList.toString()), channel);

break;

case MSG_SYSTEM:

getConnectedChannel(channel).forEach(selectionKey -> {

SocketChannel sc = (SocketChannel) selectionKey.channel();

sendMsgToClient(new Message("收到一条系统消息: " + msg.message), sc);

});

break;

default:

break;

}

}

/**

* 发送消息给客户端

*

* @param msg

* @param sc

*/

private void sendMsgToClient(Message msg, SocketChannel sc) {

try {

byte[] bytes = Utils.encode(msg);

sc.write(ByteBuffer.wrap(bytes));

} catch (IOException e) {

log.debug("sendMsgToClient出现了一些问题");

}

}

/**

* 获取所有channel,除去调用者

*

* @param channel

* @return

*/

private Set getConnectedChannel(SocketChannel channel) {

return selector.keys().stream()

.filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen() && item.channel() != channel)

.collect(Collectors.toSet());

}

}

NioClient

package moremorechat_nio;

import lombok.extern.slf4j.Slf4j;

import java.io.*;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Scanner;

import static moremorechat_nio.FinalValue.*;

/**

* @author mazouri

* @create 2021-04-29 12:02

*/

@Slf4j

public class NioClient {

private Selector selector;

private SocketChannel socketChannel;

private String username;

private static Scanner input;

public NioClient() throws IOException {

selector = Selector.open();

socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

log.debug("client启动完成......");

log.debug("请输入你的名字完成注册");

input = new Scanner(System.in);

username = input.next();

log.debug("欢迎{}来到聊天系统", username);

}

public static void main(String[] args) throws IOException {

System.out.println("tips: \n1. 直接发送消息会发给当前的所有用户 \n2. @用户名:消息 会私发给你要发送的用户 \n3. 输入 查询在线用户 会显示当前的在线用户");

NioClient client = new NioClient();

//启动一个子线程接受服务器发送过来的消息

new Thread(() -> {

try {

client.acceptMessageFromServer();

} catch (Exception e) {

e.printStackTrace();

}

}, "receiveClientThread").start();

//调用sendMessageToServer,发送消息到服务端

client.sendMessageToServer();

}

/**

* 将消息发送到服务端

*

* @throws IOException

*/

private void sendMessageToServer() throws IOException {

//先把用户名发给客户端

Message message = new Message(MSG_NAME, username);

byte[] bytes = Utils.encode(message);

socketChannel.write(ByteBuffer.wrap(bytes));

while (input.hasNextLine()) {

String msgStr = input.next();

Message msg;

boolean isPrivate = msgStr.startsWith("@");

if (isPrivate) {

int idx = msgStr.indexOf(":");

String targetName = msgStr.substring(1, idx);

msgStr = msgStr.substring(idx + 1);

msg = new Message(MSG_PRIVATE, targetName + "_" + msgStr);

} else if ("查询在线用户".equals(msgStr)) {

msg = new Message(MSG_ONLINE, "请求在线人数");

} else {

msg = new Message(MSG_GROUP, msgStr);

}

byte[] bytes1 = Utils.encode(msg);

socketChannel.write(ByteBuffer.wrap(bytes1));

}

}

/**

* 接受从服务器发送过来的消息

*/

private void acceptMessageFromServer() throws Exception {

while (selector.select() > 0) {

Iterator iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

iterator.remove();

if (key.isReadable()) {

SocketChannel sc = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);

sc.read(buffer);

Message message = Utils.decode(buffer.array());

log.debug(String.valueOf(message.message));

}

}

}

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:H3C和CISCO快速端口比较
下一篇:网络编程socket模块
相关文章

 发表评论

评论列表