Java中多线程Reactor模式的实现

网友投稿 374 2022-09-11


Java中多线程Reactor模式的实现

目录1、 主服务器2、IO请求handler+线程池3、客户端

多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。

让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度

1、 主服务器

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.NioDemoConfig;

import com.crazymakercircle.util.Logger;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Set;

import java.util.concurrent.atomic.AtomicInteger;

class MultiThreadEchoServerReactor {

ServerSocketChannel serverSocket;

AtomicInteger next = new AtomicInteger(0);

Selector bossSelector = null;

Reactor bossReactor = null;

//selectors集合,引入多个selector选择器

//多个选择器可以更好的提高通道的并发量

Selector[] workSelectors = new Selector[2];

//引入多个子反应器

//如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。

//每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量

Reactor[] workReactors = null;

MultiThreadEchoServerReactor() throws IOException {

bossSelector = Selector.open();

//初始化多个selector选择器

workSelectors[0] = Selector.open();

workSelectors[1] = Selector.open();

serverSocket = ServerSocketChannel.open();

InetSocketAddress address =

new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,

NioDemoConfig.SOCKET_SERVER_PORT);

serverSocket.socket().bind(address);

//非阻塞

serverSocket.configureBlocking(false);

//第一个selector,负责监控新连接事件

SelectionKey sk =

serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);

//附加新连接处理handler处理器到SelectionKey(选择键)

sk.attach(new AcceptorHandler());

//处理新连接的反应器

bossReactor = new Reactor(bossSelector);

//第一个子反应器,一子反应器负责一个选择器

Reactor subReactor1 = new Reactor(workSelectors[0]);

//第二个子反应器,一子反应器负责一个选择器

Reactor subReactor2 = new Reactor(workSelectors[1]);

workReactors = new Reactor[]{subReactor1, subReactor2};

}

private void startService() {

new Thread(bossReactor).start();

// 一子反应器对应一条线程

new Thread(workReactors[0]).start();

new Thread(workReactors[1]).start();

}

//反应器

class Reactor implements Runnable {

//每条线程负责一个选择器的查询

final Selector selector;

public Reactor(Selector selector) {

this.selector = selector;

}

public void run() {

try {

while (!Thread.interrupted()) {

//单位为毫秒

//每隔一秒列出选择器感应列表

selector.select(1000);

Set selectedKeys = selector.selectedKeys();

if (null == selectedKeys || selectedKeys.size() == 0) {

//如果列表中的通道注册事件没有发生那就继续执行

continue;

}

Iterator it = selectedKeys.iterator();

while (it.hasNext()) {

//Reactor负责dispatch收到的事件

SelectionKey sk = it.next();

dispatch(sk);

}

//清楚掉已经处理过的感应事件,防止重复处理

selectedKeys.clear();

}

} catch (IOException ex) {

ex.printStackTrace();

}

}

void dispatch(SelectionKey sk) {

Runnable handler = (Runnable) sk.attachment();

//调用之前attach绑定到选择键的handler处理器对象

if (handler != null) {

handler.run();

}

}

}

// Handler:新连接处理器

class AcceptorHandler implements Runnable {

public void run() {

try {

SocketChannel channel = serverSocket.accept();

Logger.info("接收到一个新的连接");

if (channel != null) {

int index = next.get();

Logger.info("选择器的编号:" + index);

Selector selector = workSelectors[index];

new MultiThreadEchoHandler(selector, channel);

}

} catch (IOException e) {

e.printStackTrace();

}

if (next.incrementAndGet() == workSelectors.length) {

next.set(0);

}

}

}

public static void main(String[] args) throws IOException {

MultiThreadEchoServerReactor server =

new MultiThreadEchoServerReactor();

server.startService();

}

}

按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。

这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。

//第一个selector,负责监控新连接事件

SelectionKey sk =

serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);

//附加新连接处理handler处理器到SelectionKey(选择键)

sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:

主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。

int index = next.get();

Logger.info("选择器的编号:" + index);

Selector selector = workSelectors[index];

new MultiThreadEchoHandler(selector, channel);

2、IO请求handler+线程池

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.util.Logger;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

class MultiThreadEchoHandler implements Runnable {

final SocketChannel channel;

final SelectionKey sk;

final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

static final int RECIEVING = 0, SENDING = 1;

int state = RECIEVING;

//引入线程池

static ExecutorService pool = Executors.newFixedThreadPool(4);

MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {

channel = c;

channel.configureBlocking(false);

//唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss

selector.wakeup();

//仅仅取得选择键,后设置感兴趣的IO事件

sk = channel.register(selector, 0);

//将本Handler作为sk选择键的附件,方便事件dispatch

sk.attach(this);

//向sk选择键注册Read就绪事件

sk.interestOps(SelectionKey.OP_READ);

//唤醒选择,是的OP_READ生效

selector.wakeup();

Logger.info("新的连接 注册完成");

}

public void run() {

//异步任务,在独立的线程池中执行

pool.execute(new AsyncTask());

}

//异步任务,不在Reactor线程中执行

public synchronized void asyncRun() {

try {

if (state == SENDING) {

//写入通道

channel.write(byteBuffer);

//写完后,准备开始从通道读,byteBuffer切换成写模式

byteBuffer.clear();

//写完后,注册read就绪事件

sk.interestOps(SelectionKeeCfaPtKJny.OP_READ);

//写完后,进入接收的状态

state = RECIEVING;

} else if (state == RECIEVING) {

//从通道读

int length = 0;

eCfaPtKJn while ((length = channel.read(byteBuffer)) > 0) {

Logger.info(new String(byteBuffer.array(), 0, length));

}

//读完后,准备开始写入通道,byteBuffer切换成读模式

byteBuffer.flip();

//读完后,注册write就绪事件

sk.interestOps(SelectionKey.OP_WRITE);

//读完后,进入发送的状态

state = SENDING;

}

//处理结束了, 这里不能关闭select key,需要重复使用

//sk.cancel();

} catch (IOException ex) {

ex.printStackTrace();

}

}

//异步任务的内部类

class AsyncTask implements Runnable {

public void run() {

MultiThreadEchoHandler.this.asyncRun();

}

}

}

3、客户端

在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.NioDemoConfig;

import com.crazymakercircle.util.Dateutil;

import com.crazymakercircle.util.Logger;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Scanner;

import java.util.Set;

/**

* create by 尼恩 @ 疯狂创客圈

**/

public class EchoClient {

public void start() throws IOException {

InetSocketAddress address =

new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,

NioDemoConfig.SOCKET_SERVER_PORT);

// 1、获取通道(channel)

SocketChannel socketChannel = SocketChannel.open(address);

Logger.info("客户端连接成功");

// 2、切换成非阻塞模式

socketChannel.configureBlocking(false);

//不断的自旋、等待连接完成,或者做一些其他的事情

while (!socketChannel.finishConnect()) {

}

Logger.tcfo("客户端启动成功!");

//启动接受线程

Processer processer = new Processer(socketChannel);

new Thread(processer).start();

}

static class Processer implements Runnable {

final Selector selector;

final SocketChannel channel;

Processer(SocketChannel channel) throws IOException {

//Reactor初始化

selector = Selector.open();

this.channel = channel;

channel.register(selector,

SelectionKey.OP_READ | SelectionKey.OP_WRITE);

}

public void run() {

try {

while (!Thread.interrupted()) {

selector.select();

Set selected = selector.selectedKeys();

Iterator it = selected.iterator();

while (it.hasNext()) {

SelectionKey sk = it.next();

if (sk.isWritable()) {

ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

Scanner scanner = new Scanner(System.in);

Logger.tcfo("请输入发送内容:");

if (scanner.hasNext()) {

SocketChannel socketChannel = (SocketChannel) sk.channel();

String next = scanner.next();

buffer.put((Dateutil.getNow() + " >>" + next).getBytes());

buffer.flip();

// 操作三:发送数据

socketChannel.write(buffer);

buffer.clear();

}

}

if (sk.isReadable()) {

// 若选择键的IO事件是“可读”事件,读取数据

SocketChannel socketChannel = (SocketChannel) sk.channel();

//读取数据

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

int length = 0;

while ((length = socketChannel.read(byteBuffer)) > 0) {

byteBuffer.flip();

Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));

byteBuffer.clear();

}

}

//处理结束了, 这里不能关闭select key,需要重复使用

//selectionKey.cancel();

}

selected.clear();

}

} catch (IOException ex) {

ex.printStackTrace();

}

}

}

public static void main(String[] args) throws IOException {

new EchoClient().start();

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:netstat命令使用方法(netstat命令的功能和用法)
下一篇:计算机网络学习笔记四:网络层(第四章网络层)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~