IO多路复用深入浅出(io多路复用实现)

网友投稿 264 2022-06-15


Java程序员进阶三条必经之路:数据库、虚拟机、异步通信。

前言

从零单排高性能问题,这次轮到异步通信了。这个领域入门有点难,需要了解UNIX五种IO模型和TCP协议,熟练使用三大异步通信框架:Netty、NodeJS、Tornado。目前所有标榜异步的通信框架用的都不是异步IO模型,而是IO多路复用中的epoll。因为Python提供了对Linux内核API的友好封装,所以我选择Python来学习IO多路复用。

IO多路复用

select

举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

#!/usr/bin/env python # -*- coding: utf-8 -*- '''

Created on Feb 16, 2016

@author: mountain

''' import socket import select from Queue import Queue #AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。 #SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(False) #设置监听的ip和port server_address = ('localhost', 1234)

server.bind(server_address) #设置backlog为5,client向server发起connect,server accept后建立长连接, #backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。 server.listen(5) #注册在socket上的读事件 inputs = [server] #注册在socket上的写事件 outputs = [] #注册在socket上的异常事件 exceptions = [] #每个socket有一个发送消息的队列 msg_queues = {} print "server is listening on %s:%s." % server_address while inputs: #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码 readable, writable, exceptional = select.select(inputs, outputs, exceptions) for sock in readable: #client向server发起connect也是读事件,server accept后产生socket加入读队列中 if sock is server:

conn, addr = sock.accept()

conn.setblocking(False)

inputs.append(conn)

msg_queues[conn] = Queue() print "server accepts a conn." else: #读取client发过来的数据,最多读取1k byte。 data = sock.recv(1024) #将收到的数据返回给client if data:

msg_queues[sock].put(data) if sock not in outputs: #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写 outputs.append(sock) else: #client传过来的消息为空,说明已断开连接 print "server closes a conn." if sock in outputs:

outputs.remove(sock)

inputs.remove(sock)

sock.close() del msg_queues[sock] for sock in writable: if not msg_queues[sock].empty():

sock.send(msg_queues[sock].get_nowait()) if msg_queues[sock].empty():

outputs.remove(sock) for sock in exceptional:

inputs.remove(sock) if sock in outputs:

outputs.remove(sock)

sock.close() del msg_queues[sock]

[mountain@king ~/workspace/wire]$ telnet localhost 1234

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'. 1 1

select有3个缺点:

每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。

每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。

这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

fd数量有限,默认1024。

poll

采用poll重新实现EchoServer,只要搞懂了select,poll也不难,只是api的参数不太一样而已。

#!/usr/bin/env python

# -*- coding: utf-8 -*-

'''

Created on Feb 27, 2016

@author: mountain

'''

import select import socket import sys import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False)

server_address = ('localhost', 1234) server.bind(server_address) server.listen(5)

print 'server is listening on %s port %s' % server_address

msg_queues = {} timeout = 1000 * 60 #POLLIN: There is data to read #POLLPRI: There is urgent data to read #POLLOUT: Ready for output #POLLERR: Error condition of some sort #POLLHUP: Hung up

#POLLNVAL: Invalid request: descriptor not open READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR

READ_WRITE = READ_ONLY | select.POLLOUT

poller = select.poll()

#注册需要监听的事件

poller.register(server, READ_ONLY)

#文件描述符和socket映射

fd_to_socket = { server.fileno(): server} while True: events = poller.poll(timeout) for fd, flag in events:

sock = fd_to_socket[fd] if flag & (select.POLLIN | select.POLLPRI): if sock is server: conn, client_address = sock.accept() conn.setblocking(False)

fd_to_socket[conn.fileno()] = conn poller.register(conn, READ_ONLY)

msg_queues[conn] = Queue.Queue() else: data = sock.recv(1024) if data:

msg_queues[sock].put(data)

poller.modify(sock, READ_WRITE) else:

poller.unregister(sock)

sock.close()

del msg_queues[sock]

elif flag & select.POLLHUP:

poller.unregister(sock)

sock.close()

del msg_queues[sock]

elif flag & select.POLLOUT: if not msg_queues[sock].empty():

msg = msg_queues[sock].get_nowait()

sock.send(msg) else:

poller.modify(sock, READ_ONLY)

elif flag & select.POLLERR:

poller.unregister(sock)

sock.close()

del msg_queues[sock]

poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性,它的linux内核api是这样的:

int poll (struct pollfd *fds, unsigned int nfds, int timeout); struct pollfd { int fd; /* file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ };

epoll

用法与poll几乎一样。

#!/usr/bin/env python

# -*- coding: utf-8 -*-

'''

Created on Feb 28, 2016

@author: mountain

'''

import select import socket import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False)

server_address = ('localhost', 1234) server.bind(server_address) server.listen(5)

print 'server is listening on %s port %s' % server_address

msg_queues = {} timeout = 60 READ_ONLY = select.EPOLLIN | select.EPOLLPRI

READ_WRITE = READ_ONLY | select.EPOLLOUT

epoll = select.epoll()

#注册需要监听的事件

epoll.register(server, READ_ONLY)

#文件描述符和socket映射

fd_to_socket = { server.fileno(): server} while True: events = epoll.poll(timeout) for fd, flag in events:

sock = fd_to_socket[fd] if flag & READ_ONLY: if sock is server: conn, client_address = sock.accept() conn.setblocking(False)

fd_to_socket[conn.fileno()] = conn epoll.register(conn, READ_ONLY)

msg_queues[conn] = Queue.Queue() else: data = sock.recv(1024) if data:

msg_queues[sock].put(data)

epoll.modify(sock, READ_WRITE) else:

epoll.unregister(sock)

sock.close()

del msg_queues[sock]

elif flag & select.EPOLLHUP:

epoll.unregister(sock)

sock.close()

del msg_queues[sock]

elif flag & select.EPOLLOUT: if not msg_queues[sock].empty():

msg = msg_queues[sock].get_nowait()

sock.send(msg) else:

epoll.modify(sock, READ_ONLY)

elif flag & select.EPOLLERR:

epoll.unregister(sock)

sock.close()

del msg_queues[sock]

epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。为了更好地理解epoll,我们来看一下linux内核api的用法。

int epoll_create(int size)//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)//注册事件,每个fd只拷贝一次。 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)/*等待IO事件,事件发生时,

内核调用回调函数,把就绪fd放入就绪链表中,并唤醒epoll_wait,epoll_wait只需要遍历就绪链表即可,

而select和poll都是遍历所有fd,这效率高下立判。*/

文/MountainKing(简书作者)

原文链接:http://jianshu.com/p/1020c11f016c


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:编码过程中,需要注意的地方(编码时尽可能使用)
下一篇:Lambda表达式的意义(Lambda 表达式)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~