Python 的异步 IO:Asyncio(python是什么意思)

网友投稿 364 2022-09-04


Python 的异步 IO:Asyncio(python是什么意思)

一直对asyncio这个库比较感兴趣,毕竟这是官网也非常推荐的一个实现高并发的一个模块,python也是在python 3.4中引入了协程的概念。也通过这次整理更加深刻理解这个模块的使用

asyncio 是干什么的?

异步网络操作并发协程

python3.0时代,标准库里的异步网络模块:select(非常底层) python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio:支持TCP,子进程

现在的asyncio,有了很多的模块已经在支持:aio事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

看了上面这些关键字,你可能扭头就走了,其实一开始了解和研究asyncio这个模块有种抵触,自己也不知道为啥,这也导致很长一段时间,这个模块自己也基本就没有关注和使用,但是随着工作上用python遇到各种性能问题的时候,自己告诉自己还是要好好学习学习这个模块。

所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

Asyncio 是并发(concurrency)的一种方式。对 Python 来说,并发还可以通过线程(threading)和多进程(multiprocessing)来实现。

Asyncio 并不能带来真正的并行(parallelism)。当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。

可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 ​​yield from​​ 或 ​​await​​)

定义协程

协程的定义,需要使用 ​​async def​​ 语句。

import asyncioimport timefrom threading import Threadasync def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)

​​do_some_work​​ 便是一个协程。准确来说,​​do_some_work​​ 是一个协程函数,可以通过 ​​asyncio.iscoroutinefunction​​ 来验证:

print(asyncio.iscoroutinefunction(do_some_work))

这个协程什么都没做,我们让它睡眠几秒,以模拟实际的工作量 :

async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)

在解释 ​​await​​ 之前,有必要说明一下协程可以做哪些事。协程可以:

* 等待一个 future 结束

* 等待另一个协程(产生一个结果,或引发一个异常)

* 产生一个结果给正在等它的协程

* 引发一个异常给正在等它的协程

​​asyncio.sleep​​​ 也是一个协程,所以 ​​await asyncio.sleep(x)​​ 就是等待另一个协程。

运行协程

调用协程函数,协程并不会开始运行,只是返回一个协程对象,可以通过 ​​asyncio.iscoroutine​​ 来验证:

print(asyncio.iscoroutine(do_some_work(3))) # True

此处还会引发一条警告:

async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited print(asyncio.iscoroutine(do_some_work(3)))

要让这个协程对象运行的话,有两种方式:

* 在另一个已经运行的协程中用 `await` 等待它

* 通过 `ensure_future` 函数计划它的执行

简单来说,只有 loop 运行了,协程才可能运行。

下面先拿到当前线程缺省的 loop ,然后把协程对象交给 ​​loop.run_until_complete​​,协程对象随后会在 loop 里得到运行。

通过async关键字定义一个协程(coroutine),当然协程不能直接运行,需要将协程加入到事件循环loop中

asyncio.get_event_loop:创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环

loop = asyncio.get_event_loop()#获取一个event_loop

loop.run_until_complete(do_some_work(3))# 阻塞直到所有的tasks完成

​​run_until_complete​​ 是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。

​​run_until_complete​​ 的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,

通过 ​​ensure_future​​ 函数把协程对象包装(wrap)成了 future。所以,我们可以写得更明显一些:

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

完整代码:

import asyncioimport timeasync def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

运行结果:

Waiting 3<三秒钟后程序结束>

create_task 和  ensure_future

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象. task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果,来返回future的执行结果

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print("waiting:", x)start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = loop.create_task(coroutine)print(task)loop.run_until_complete(task)print(task)

print(task.result())

print("Time:",now()-start)

结果为:

>waiting: 2 result=None>Time: 0.0003514289855957031

创建task后,在task加入事件循环之前为pending状态,当完成后,状态为finished

关于上面通过loop.create_task(coroutine)创建task,同样的可以通过 asyncio.ensure_future(coroutine)创建task

task = asyncio.ensure_future(coroutine)

ensure_future方法内部其实也是调用的task的create_task方法,然后返回一个task对象放入loop队列中

协程嵌套(wait,gather)以及as_completed:

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after {}s".format(x)async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print("Task ret:", task.result()) # results = await asyncio.gather(*tasks) # for result in results: # print("Task ret:",result)start = now()loop = asyncio.get_event_loop()loop.run_until_complete(main())print("Time:", now()-start)

View Code

如果我们把上面代码中的:

dones, pendings = await asyncio.wait(tasks) for task in dones: print("Task ret:", task.result())

替换为:

results = await asyncio.gather(*tasks) for result in results: print("Task ret:",result)

这样得到的就是一个结果的列表

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。 将上述的代码更改为:

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after {}s".format(x)async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks)start = now()loop = asyncio.get_event_loop()results = loop.run_until_complete(main())for result in results: print("Task ret:",result)print("Time:", now()-start)

View Code

或者返回使用asyncio.wait方式挂起协程。

将代码更改为:

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after {}s".format(x)async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks)start = now()loop = asyncio.get_event_loop()done,pending = loop.run_until_complete(main())for task in done: print("Task ret:",task.result())print("Time:", now()-start)

View Code

也可以使用asyncio的as_completed方法

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after {}s".format(x)async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task print("Task ret: {}".format(result))start = now()loop = asyncio.get_event_loop()loop.run_until_complete(main())print("Time:", now()-start)

View Code

gather另外一个作用:

#gather和wait的区别 #gather更加high-level group1 = [get_html("for i in range(2)] group2 = [get_html("for i in range(2)] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) group2.cancel() loop.run_until_complete(asyncio.gather(group1, group2))

View Code

回调

假如协程是一个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。

def done_callback(futu): print('Done')futu = asyncio.ensure_future(do_some_work(3))futu.add_done_callback(done_callback)loop.run_until_complete(futu)

多个协程

实际项目中,往往有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,需要借助 ​​asyncio.gather​​ 函数。

loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

或者先把协程存在列表里:

coros = [do_some_work(1), do_some_work(3)]loop.run_until_complete(asyncio.gather(*coros))

运行结果:

Waiting 3Waiting 1<等待三秒钟>Done

这两个协程是并发运行的,所以等待的时间不是 1 + 3 = 4 秒,而是以耗时较长的那个协程为准。

参考函数 ​​gather​​ 的文档:

gather(*coros_or_futures, loop=None, return_exceptions=False)Return a future aggregating results from the given coroutines or futures.

发现也可以传 futures 给它:

futus = [asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3))]loop.run_until_complete(asyncio.gather(*futus))

​​gather​​​ 起聚合的作用,把多个 futures 包装成单个 future,因为 ​​loop.run_until_complete​​ 只接受单个 future。

run_until_complete 和 run_forever

开启事件循环有两种方法,一种方法就是通过调用run_until_complete,另外一种就是调用run_forever。run_until_complete内置add_done_callback,使用run_forever的好处是可以通过自己自定义add_done_callback;

我们一直通过 ​​run_until_complete​​​ 来运行 loop ,等到 future 完成,​​run_until_complete​​ 也就返回了。

async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')loop = asyncio.get_event_loop()coro = do_some_work(3)loop.run_until_complete(coro)

输出:

Waiting 3<等待三秒钟>Done<程序退出>

现在改用 ​​run_forever​​:

async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')loop = asyncio.get_event_loop()coro = do_some_work(3)asyncio.ensure_future(coro)loop.run_forever()

Waiting 3<等待三秒钟>Done<程序没有退出>

三秒钟过后,future 结束,但是程序并不会退出。​​run_forever​​​ 会一直运行,直到 ​​stop​​​ 被调用,但是你不能像下面这样调 ​​stop​​:

loop.run_forever()loop.stop()

​​run_forever​​​ 不返回,​​stop​​​ 永远也不会被调用。所以,只能在协程中调 ​​stop​​:

async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop.stop()

这样并非没有问题,假如有多个协程在 loop 里运行:

asyncio.ensure_future(do_some_work(loop, 1))asyncio.ensure_future(do_some_work(loop, 3))loop.run_forever()

第二个协程没结束,loop 就停止了——被先结束的那个协程给停掉的。

要解决这个问题,可以用 ​​gather​​ 把多个协程合并成一个 future,并添加回调,然后在回调里再去停止 loop。

async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')def done_callback(loop, futu): loop.stop()loop = asyncio.get_event_loop()futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))futus.add_done_callback(functools.partial(done_callback, loop))loop.run_forever()

其实这基本上就是 ​​run_until_complete​​​ 的实现了,​​run_until_complete​​​ 在内部也是调用 ​​run_forever​​。

协程的停止:

uture对象有几个状态:

PendingRunningDoneCacelled

创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

import asyncioimport timenow = lambda :time.time()async def do_some_work(x): print("Waiting:",x) await asyncio.sleep(x) return "Done after {}s".format(x)coroutine1 =do_some_work(1)coroutine2 =do_some_work(2)coroutine3 =do_some_work(2)tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3),]start = now()loop = asyncio.get_event_loop()try: loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever()finally: loop.close()print("Time:",now()-start)

View Code

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:

Waiting: 1Waiting: 2Waiting: 2^C{ result='Done after 1s'>, wait_for= cb=[_wait.._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, wait_for= cb=[_wait.._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, wait_for=>}FalseTrueTrueTrueTime: 1.0707225799560547

True表示cannel成功,loop stop之后还需要再次开启事件循环,最后在close,不然还会抛出异常

循环task,逐个cancel是一种方案,可是正如上面我们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可。

call(延时函数):

call_soon() 即刻执行(在队列中等待到下一个)

call_later()指定时间运行(多长时间后执行)

call_at指定某个时间点运行

call_soon_threadsafe线程安全方法

import asyncioimport timeimport asynciodef callback(sleep_times, loop): print("success time {}".format(loop.time())) loop.stop()# def stoploop(loop):# loop.stop()#call_later, call_atif __name__ == "__main__": loop = asyncio.get_event_loop() now = loop.time() # loop.call_at(now+2, callback, 2, loop) # loop.call_at(now+1, callback, 1, loop) loop.call_at(now+3, callback, 3, loop) #提前注册我们的task,并且也可以根据返回的Handle进行cancel # loop.call_soon(callback,4,loop) # loop.call_later(3,callback,5,loop) # loop.call_soon(stoploop, loop) # loop.call_soon(callback, 4, loop) loop.run_forever() loop.close()

View Code

Close Loop?

以上示例都没有调用 ​​loop.close​​​,好像也没有什么问题。所以到底要不要调 ​​loop.close​​呢?

简单来说,loop 只要不关闭,就还可以再运行。:

loop.run_until_complete(do_some_work(loop, 1))loop.run_until_complete(do_some_work(loop, 3))loop.close()

但是如果关闭了,就不能再运行了:

loop.run_until_complete(do_some_work(loop, 1))loop.close()loop.run_until_complete(do_some_work(loop, 3)) # 此处异常

建议调用 ​​loop.close​​,以彻底清理 loop 对象防止误用。

阻塞和await:

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

import asyncioimport timenow = lambda :time.time()async def do_some_work(x): print("waiting:",x) # await 后面就是调用耗时的操作 await asyncio.sleep(x) return "Done after {}s".format(x)start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)print("Task ret:", task.result())print("Time:", now() - start)

在await asyncio.sleep(x),因为这里sleep了,模拟了阻塞或者耗时操作,这个时候就会让出控制权。 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。

不同线程的事件循环:

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

import asynciofrom threading import Threadimport timenow = lambda :time.time()def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x))start = now()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,))t.start()print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)new_loop.call_soon_threadsafe(more_work, 3)

View Code

启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法, 后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3

新线程协程

import asyncioimport timefrom threading import Threadnow = lambda :time.time()def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x))def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x))start = now()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,))t.start()print('TIME: {}'.format(time.time() - start))asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

View Code

述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。 主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。

动态添加协程:

在实战之前,我们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。

如何实现呢,有两种方法:

主线程是​​同步​​的

import asyncioimport timefrom threading import Threadnow = lambda :time.time()def start_loop(loop): # 一个在后台永远运行的事件循环 asyncio.set_event_loop(loop) loop.run_forever()def do_some_work(x): print('Waiting {}'.format(x)) # await asyncio.sleep(x) time.sleep(x) print('Done after {}s'.format(x))def stoploop(loop): loop.stop()start = now()new_loop = asyncio.new_event_loop()# 定义一个线程,并传入一个事件循环对象t = Thread(target=start_loop, args=(new_loop,))t.start()print('TIME: {}'.format(time.time() - start))# 动态添加两个协程# 这种方法,在主线程是同步的new_loop.call_soon_threadsafe(do_some_work,6)new_loop.call_soon_threadsafe(do_some_work,4)new_loop.call_soon(stoploop, new_loop)

View Code

TIME: 0.002000093460083008Waiting 6Done after 6sWaiting 4Done after 4s进程已结束,退出代码0

主线程是​​异步​​的,这是重点,一定要掌握。。

import asyncioimport timefrom threading import Threadnow = lambda :time.time()def start_loop(loop): # 一个在后台永远运行的事件循环 asyncio.set_event_loop(loop) loop.run_forever()async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x))def stoploop(loop): loop.stop()start = now()new_loop = asyncio.new_event_loop()# 定义一个线程,并传入一个事件循环对象t = Thread(target=start_loop, args=(new_loop,))t.start()# 动态添加两个协程# 这种方法,在主线程是同步的asyncio.run_coroutine_threadsafe(do_some_work(6),new_loop)asyncio.run_coroutine_threadsafe(do_some_work(4),new_loop)print('TIME: {}'.format(time.time() - start))

TIME: 0.002000093460083008Waiting 6Waiting 4Done after 4sDone after 6s

实战:利用redis实现动态添加任务;

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。

import timeimport redisimport asynciofrom queue import Queuefrom threading import Threaddef start_loop(loop): # 一个在后台永远运行的事件循环 asyncio.set_event_loop(loop) loop.run_forever()async def do_sleep(x, queue): await asyncio.sleep(x) queue.put("ok")def get_redis(): connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0) return redis.Redis(connection_pool=connection_pool)def consumer(): while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)if __name__ == '__main__': print(time.ctime()) new_loop = asyncio.new_event_loop() # 定义一个线程,运行一个事件循环对象,用于实时接收新任务 loop_thread = Thread(target=start_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start() # 创建redis连接 rcon = get_redis() queue = Queue() # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务 consumer_thread = Thread(target=consumer) consumer_thread.setDaemon(True) consumer_thread.start() while True: msg = queue.get() print("协程运行完..") print("当前时间:", time.ctime())

View Code

稍微讲下代码

​​loop_thread​​:单独的线程,运行着一个事件对象容器,用于实时接收新任务。

​​consumer_thread​​:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java面试常问的Runnable和Callable的区别
下一篇:Python魔法方法之描述符(python描述符详解)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~