Java 高并发八:NIO和AIO详解

网友投稿 185 2023-07-06


Java 高并发八:NIO和AIO详解

IO感觉上和多线程并没有多大关系,但是NIO改变了线程在应用层面使用的方式,也解决了一些实际的困难。而AIO是异步IO和前面的系列也有点关系。在此,为了学习和记录,也写一篇文章来介绍NIO和AIO。

1. 什么是NIO

NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套java I/O标 准。它是在Java 1.4中被纳入到JDK中的,并具有以下特性:

NIO是基于块(Block)的,它以块为基本单位处理数据 (硬盘上存储的单位也是按Block来存储,这样性能上比基于流的方式要好一些)

为所有的原始类型提供(Buffer)缓存支持

增加通道(Channel)对象,作为新的原始 I/O 抽象

支持锁(我们在平时使用时经常能看到会出现一些.lock的文件,这说明有线程正在使用这把锁,当线程释放锁时,会把这个文件删除掉,这样其他线程才能继续拿到这把锁)和内存映射文件的文件访问接口

提供了基于Selector的异步网络I/O

所有的从通道中的读写操作,都要经过Buffer,而通道就是io的抽象,通道的另一端就是操纵的文件。

2. Buffer

Java中Buffer的实现。基本的数据类型都有它对应的Buffer

Buffer的简单使用例子:

package test;

import java.io.File;

import java.io.FileInputStream;

import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;

public class Test {

public static void main(String[] args) throws Exception {

FileInputStream fin = new FileInputStream(new File(

"d:\\temp_buffer.tmp"));

FileChannel fc = fin.getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

fc.read(byteBuffer);

fc.close();

byteBuffer.flip();//读写转换

}

}

总结下使用的步骤是:

1. 得到Channel

2. 申请Buffer

3. 建立Channel和Buffer的读/写关系

4. 关闭

下面的例子是使用NIO来复制文件:

public static void nioCopyFile(String resource, String destination)

throws IOException {

FileInputStream fis = new FileInputStream(resource);

FileOutputStream fos = new FileOutputStream(destination);

FileChannel readChannel = fis.getChannel(); // 读文件通道

FileChannel writeChannel = fos.getChannel(); // 写文件通道

ByteBuffer buffer = ByteBuffer.allocate(1024); // 读入数据缓存

while (true) {

buffer.clear();

int len = readChannel.read(buffer); // 读入数据

if (len == -1) {

break; // 读取完毕

}

buffer.flip();

writeChannel.write(buffer); // 写入文件

}

readChannel.close();

writeChannel.close();

}

Buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)

这里要区别下容量和上限,比如一个Buffer有10KB,那么10KB就是容量,我将5KB的文件读到Buffer中,那么上限就是5KB。

下面举个例子来理解下这3个重要的参数:

public static void main(String[] args) throws Exception {

ByteBuffer b = ByteBuffer.allocate(15); // 15个字节大小的缓冲区

System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()

+ " position=" + b.position());

for (int i = 0; i < 10; i++) {

// 存入10个字节数据

b.put((byte) i);

}

System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()

+ " position=" + b.position());

b.flip(); // 重置position

System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()

+ " position=" + b.position());

for (int i = 0; i < 5; i++) {

System.out.print(b.get());

}

System.out.println();

System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()

+ " position=" + b.position());

b.flip();

System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()

+ " position=" + b.position());

}

整个过程如图:

此时position从0到10,capactiy和limit不变。

该操作会重置position,通常,将buffer从写模式转换为读 模式时需要执行此方法 flip()操作不仅重置了当前的position为0,还将limit设置到当前position的位置 。

limit的意义在于,来确定哪些数据是有意义的,换句话说,从position到limit之间的数据才是有意义的数据,因为是上次操作的数据。所以flip操作往往是读写转换的意思。

意义同上。

而Buffer中大多数的方法都是去改变这3个参数来达到某些功能的:

public final Buffer rewind()

将position置零,并清除标志位(mark)

public final Buffer clear()

将position置零,同时将limit设置为capacity的大小,并清除了标志mark

public final Buffer flip()

先将limit设置到position所在位置,然后将position置零,并清除标志位mark,通常在读写转换时使用

文件映射到内存

public static void main(String[] args) throws Exception {

RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");

FileChannel fc = raf.getChannel();

// 将文件映射到内存中

MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0,

raf.length());

while (mbb.hasRemaining()) {

System.out.print((char) mbb.get());

}

mbb.put(0, (byte) 98); // 修改文件

raf.close();

}

对MappedByteBuffer的修改就相当于修改文件本身,这样操作的速度是很快的。

3. Channel

多线程网络服务器的一般结构:

简单的多线程服务器:

public static void main(String[] args) throws Exception {

ServerSocket echoServer = null;

Socket clientSocket = null;

try {

echoServer = new ServerSocket(8000);

} catch (IOException e) {

System.out.println(e);

}

while (true) {

try {

clientSocket = echoServer.accept();

System.out.println(clientSocket.getRemoteSocketAddress()

+ " connect!");

tp.execute(new HandleMsg(clientSocket));

} catch (IOException e) {

System.out.println(e);

}

}

}

功能就是服务器端读到什么数据,就向客户端回写什么数据。

这里的tp是一个线程池,HandleMsg是处理消息的类。

static class HandleMsg implements Runnable{

省略部分信息

public void run(){

try {

is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));

os = new PrintWriter(clientSocket.getOutputStream(), true);

// 从InputStream当中读取客户端所发送的数据

String inputLine = null;

long b=System. currentTimeMillis ();

while ((inputLine = is.readLine()) != null)

{

os.println(inputLine);

}

long e=System. currentTimeMillis ();

System. out.println ("spend:"+(e - b)+" ms ");

} catch (IOException e) {

e.printStackTrace();

}finally

{

关闭资源

}

}

}

客户端:

public static void main(String[] args) throws Exception {

Socket client = null;

PrintWriter writer = null;

BufferedReader reader = null;

try {

client = new Socket();

client.connect(new InetSocketAddress("localhost", 8000));

writer = new PrintWriter(client.getOutputStream(), true);

writer.println("Hello!");

writer.flush();

reader = new BufferedReader(new InputStreamReader(

client.getInputStream()));

System.out.println("from server: " + reader.readLine());

} catch (Exception e) {

} finally {

// 省略资源关闭

}

}

以上的网络编程是很基本的,使用这种方式,会有一些问题:

为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。此时,如果客户端数量众多,可能会消耗大量的系统资源。

解决方案:

使用非阻塞的NIO (读取数据不等待,数据准备好了再工作)

为了体现NIO使用的高效。

这里先模拟一个低效的客户端来模拟因网络而延时的情况:

private static ExecutorService tp= Executors.newCachedThreadPool();

private static final int sleep_time=1000*1000*1000;

public static class EchoClient implements Runnable{

public void run(){

try {

client = new Socket();

client.connect(new InetSocketAddress("localhost", 8000));

writer = new PrintWriter(client.getOutputStream(), true);

writer.print("H");

LockSupport.parkNanos(sleep_time);

writer.print("e");

LockSupport.parkNanos(sleep_time);

writer.print("l");

LockSupport.parkNanos(sleep_time);

writer.print("l");

LockSupport.parkNanos(sleep_time);

writer.print("o");

LockSupport.parkNanos(sleep_time);

writer.print("!");

LockSupport.parkNanos(sleep_time);

writer.println();

writer.flush();

}catch(Exception e)

{

}

}

}

服务器端输出:

spend:6000ms

spend:6000ms

spend:6000ms

spend:6001ms

spend:6002ms

spend:6002ms

spend:6002ms

spend:6002ms

spend:6003ms

spend:6003ms

因为

while ((inputLine = is.readLine()) != null)

是阻塞的,所以时间都花在等待中。

如果用NIO来处理这个问题会怎么做呢?

NIO有一个很大的特点就是:把数据准备好了再通知我

而Channel有点类似于流,一个Channel可以和文件或者网络Socket对应 。

selector是一个选择器,它可以选择某一个Channel,然后做些事情。

一个线程可以对应一个selector,而一个selector可以轮询多个Channel,而每个Channel对应了一个Socket。

与上面一个线程对应一个Socket相比,使用NIO后,一个线程可以轮询多个Sockehttp://t。

当selector调用select()时,会查看是否有客户端准备好了数据。当没有数据被准备好时,select()会阻塞。平时都说NIO是非阻塞的,但是如果没有数据被准备好还是会有阻塞现象。

当有数据被准备好时,调用完select()后,会返回一个SelectionKey,SelectionKey表示在某个selector上的某个Channel的数据已经被准备好了。

只有在数据准备好时,这个Channel才会被选择。

这样NIO实现了一个线程来监控多个客户端。

而刚刚模拟的网络延迟的客户端将不会影响NIO下的线程,因为某个Socket网络延迟时,数据还未被准备好,selector是不会选择它的,而会选择其他准备好的客户端。

selectNow()与select()的区别在于,selectNow()是不阻塞的,当没有客户端准备好数据时,selectNow()不会阻塞,将返回0,有客户端准备好数据时,selectNow()返回准备好的客户端的个数。

主要代码:

package test;

import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.net.Socket;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.nio.channels.spi.AbstractSelector;

import java.nio.channels.spi.SelectorProvider;

import java.util.HashMap;

import java.util.Iterator;

import java.util.LinkedList;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class MultiThreadNIOEchoServer {

public static Map geym_time_stat = new HashMap();

class EchoClient {

private LinkedList outq;

EchoClient() {

outq = new LinkedList();

}

public LinkedList getOutputQueue() {

return outq;

}

public void enqueue(ByteBuffer bb) {

outq.addFirst(bb);

}

}

class HandleMsg implements Runnable {

SelectionKey sk;

ByteBuffer bb;

public HandleMsg(SelectionKey sk, ByteBuffer bb) {

super();

this.sk = sk;

this.bb = bb;

}

@Override

public void run() {

// TODO Auto-generated method stub

EchoClient echoClient = (EchoClient) sk.attachment();

echoClient.enqueue(bb);

sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);

selector.wakeup();

}

}

private Selector selector;

private ExecutorService tp = Executors.newCachedThreadPool();

private void startServer() throws Exception {

selector = SelectorProvider.provider().openSelector();

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.configureBlocking(false);

InetSocketAddress isa = new InetSocketAddress(8000);

ssc.socket().bind(isa);

// 注册感兴趣的事件,此处对accpet事件感兴趣

SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);

for (;;) {

selector.select();

Set readyKeys = selector.selectedKeys();

Iterator i = readyKeys.iterator();

long e = 0;

while (i.hasNext()) {

SelectionKey sk = (SelectionKey) i.next();

i.remove();

if (sk.isAcceptable()) {

doAccept(sk);

} else if (sk.isValid() && sk.isReadable()) {

if (!geym_time_stat.containsKey(((SocketChannel) sk

.channel()).socket())) {

geym_time_stat.put(

((SocketChannel) sk.channel()).socket(),

System.currentTimeMillis());

}

doRead(sk);

} else if (sk.isValid() && sk.isWritable()) {

doWrite(sk);

e = System.currentTimeMillis();

long b = geym_time_stat.remove(((SocketChannel) sk

.channel()).socket());

System.out.println("spend:" + (e - b) + "ms");

}

}

}

}

private vohttp://id doWrite(SelectionKey sk) {

// TODO Auto-generated method stub

SocketChannel channel = (SocketChannel) sk.channel();

EchoClient echoClient = (EchoClient) sk.attachment();

LinkedList outq = echoClient.getOutputQueue();

ByteBuffer bb = outq.getLast();

try {

int len = channel.write(bb);

if (len == -1) {

disconnect(sk);

return;

}

if (bb.remaining() == 0) {

outq.removeLast();

}

} catch (Exception e) {

// TODO: handle exception

disconnect(sk);

}

if (outq.size() == 0) {

sk.interestOps(SelectionKey.OP_READ);

}

}

private void doRead(SelectionKey sk) {

// TODO Auto-generated method stub

SocketChannel channel = (SocketChannel) sk.channel();

ByteBuffer bb = ByteBuffer.allocate(8192);

int len;

try {

len = channel.read(bb);

if (len < 0) {

disconnect(sk);

return;

}

} catch (Exception e) {

// TODO: handle exception

disconnect(sk);

return;

}

bb.flip();

tp.execute(new HandleMsg(sk, bb));

}

private void disconnect(SelectionKey sk) {

// TODO Auto-generated method stub

//省略略干关闭操作

}

private void doAccept(SelectionKey sk) {

// TODO Auto-generated method stub

ServerSocketChannel server = (ServerSocketChannel) sk.channel();

SocketChannel clientChannel;

try {

clientChannel = server.accept();

clientChannel.configureBlocking(false);

SelectionKey clientKey = clientChannel.register(selector,

SelectionKey.OP_READ);

EchoClient echoClinet = new EchoClient();

clientKey.attach(echoClinet);

InetAddress clientAddress = clientChannel.socket().getInetAddress();

System.out.println("Accepted connection from "

+ clientAddress.getHostAddress());

} catch (Exception e) {

// TODO: handle exception

}

}

public static void main(String[] args) {

// TODO Auto-generated method stub

MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();

try {

echoServer.startServer();

} catch (Exception e) {

// TODO: handle exception

}

}

}

代码仅作参考,主要的特点是,对不同事件的感兴趣来做不同的事。

当用之前模拟的那个延迟的客户端时,这次的时间消耗就在2ms到11ms之间了。性能提升是很明显的。

总结:

1. NIO会将数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去。

2. 节省数据准备时间(因为Selector可以复用)

5. AIO

AIO的特点:

1. 读完了再通知我

2. 不会加快IO,只是在读完后进行通知

3. 使用回调函数,进行业务处理

AIO的相关代码:

AsynchronousServerSocketChannel

server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));

使用server上的accept方法

public abstract void accept(A attachment,CompletionHandler handler);

CompletionHandler为回调接口,当有客户端accept之后,就做handler中的事情。

示例代码:

server.accept(null,

new CompletionHandler() {

final ByteBuffer buffer = ByteBuffer.allocate(1024);

public void completed(AsynchronousSocketChannel result,

Object attachment) {

System.out.println(Thread.currentThread().getName());

Future writeResult = null;

try {

buffer.clear();

result.read(buffer).get(100, TimeUnit.SECONDS);

buffer.flip();

writeResult = result.write(buffer);

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

} finally {

try {

server.accept(null, this);

writeResult.get();

result.close();

} catch (Exception e) {

System.out.println(e.toString());

}

}

}

@Override

public void failed(Throwable exc, Object attachment) {

System.out.println("failed: " + exc);

}

});

这里使用了Future来实现即时返回,关于Future请参考上一篇

在理解了NIO的基础上,看AIO,区别在于AIO是等读写过程完成后再去调用回调函数。

NIO是同步非阻塞的

AIO是异步非阻塞的

由于NIO的读写过程依然在应用线程里完成,所以对于那些读写过程时间长的,NIO就不太适合。

而AIO的读写过程完成后才被通知,所以AIO能够胜任那些重量级,读写过程长的任务。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java enum关键字不识别的快速解决办法
下一篇:Java 高并发一:前言
相关文章

 发表评论

暂时没有评论,来抢沙发吧~