Gointerface接口声明实现及作用详解
471
2022-08-28
python异步asyncio(python异步处理)
文章目录
异步概念事件循环进程、线程、协程协程函数与协程对象awaitTask对象asyncio.Future对象异步迭代器异步上下文管理器
异步概念
相对于异步的概念,同步指的是一个程序执行完才会执行另一个程序。 举例来说,对于单进程,普通的函数调用就是同步执行。只有当被调用的函数执行完并且return的时候, 才会继续执行主调函数中剩下的代码。
虽然在异步中通过await等待异步函数执行也是这个流程,但是如果一个进程有多于一个任务时, 当在被调的异步函数中阻塞时,就会去执行其他任务了。
最开始有一个错误的理解:假设有伪代码await func_a,在执行到伪代码await func_a时,产生一个类似线程的东西去执行func_a函数 而不影响当前await func_a代码之后的代码的运行,即立马执行下面的语句。 但是不是这样子,因为有了await,所以需要等待func_a执行完才会继续执行await func_a后面的代码。
还有一个错误的想法:都异步了,为什么还需要等待结果?answer:假设func_a函数处理完会返回一个结果,如果这时候不阻塞在await func_a这一行,而是继续向下执行 那么如果下一行代码需要用到func_a函数返回的结果,那么这是时候用到的这个结果就是一个None
比如一下代码
import asyncioasync def func_a(): await asyncio.sleep(2) print(1)async def func_b(): await func_a() print(2)asyncio.run(func_b())
当前只有一个任务,执行了func_b(),而func_b中等待func_a,所以在func_a中sleep了两秒后 打印1,再返回到func_b中打印2。 这就是因为当前只有一个任务,并且使用await来等待异步函数执行完返回后才会继续执行下面代码。 如果还有其他任务的话,执行到func_a的sleep时才会执行其他任务
事件循环
可以理解成是一个死循环,循环检测并执行某些代码。
伪代码任务列表 = [任务1,任务2,任务3。。。] while True: 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将可执行和已完成的任务返回。 # 注意任务列表中还可能有正在被阻塞的任务,本次查询不会被返回 for 就绪任务 in 可执行的任务列表: 执行已就绪的任务 for 已完成任务 in 已完成的任务列表: 在任务列表中移除 已完成的任务 如果任务列表中的任务都已经完成,任务列表为空,则终止循环。 代码: import asyncio# 生成一个事件循环loop = asyncio.get_event_loop()# 将任务放到任务列表loop.run_until_complete(任务)
进程、线程、协程
进程是指程序的一次运行过程,包括了程序、代码和进程控制块(PCB),是系统进行资源分配的独立单位,也是处理机分派和调度的基本单位。 后来为了让操作系统更好的处理并发,引入了线程。此时把线程作为处理及分派和调度的基本单位。一个进程中的多个线程共享该进程的资源,因此切换上下文会变快。进程和线程都是操作系统级别的概念。 而协程是为了进一步提高上下文切换的效率,由程序员自己通过代码进行调度。
一个进程可以对应多个线程,一个线程可以对应多个协程但是一个进程中的多个线程在某一时刻只能有一个线程在运行,同理一个线程对应的多个协程在某一时刻只能有一个协程在运行。
协程的概念是在一个线程中如果遇到IO或其他等待操作,这个线程会利用空闲时间处理其他任务。但前提是这个线程有多个协程任务。
import asyncioasync def func1(): print(1) await asyncio.sleep(2) # 遇到IO自动切换 print(2)async def func2(): print(3) await asyncio.sleep(2) print(4)def main(): tasks = [ func1(), func2() ] # 下面这两行可以理解为创建了两个任务 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))if __name__ == '__main__': main()
运行结果打印顺序为1324理解首先主线程运行一个协程func1,打印1,然后遇到sleep了,这时挂起func1去执行另一个任务func2打印3 当func1的sleep完成后,不管执行func2的协程执行到哪里都会返回到func1去向下执行,打印2后协程结束 再去执行func2的打印4
协程函数与协程对象
协程函数:定义函数时用async def协程对象:执行协程函数的到协程对象
async def func(): pass result = func() result成为一个协程对象,此时func内部代码不会执行
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncioasync def func(): print(1)result = func()# 生成一个事件循环loop = asyncio.get_event_loop()# 将任务放到任务列表loop.run_until_complete(result)
python3.7后使用asynic.run(任务)import asyncioasync def func(): print(1)result = func()asyncio.run(任务) run包括了生成事件循环列表
await
await + 可等待的对象(协程对象、Future对象、Task对象)
import asyncioasync def others(): print("start") response = await asyncio.sleep(2) print("end") return 'return-value'async def func(): print("执行协程函数内部代码") # 遇到IO操作挂起当前协程(任务),等IO操作完成后再继续往下执行, # 当前协程挂起时,事件循环可以去执行其他协程(任务) response = await others() print("IO请求结束,结果为:", response)asyncio.run(func())运行结果为执行协程函数内部代码startendIO请求结束,结果为: return-value
await就是等待对象的值得到结果之后再继续向下走
Task对象
Task对象的作用是在事件循环中添加多个任务
Task用于并发调度协程,通过asyncio.create_task(协程对象)的 方式创建Task对象,这样可以让协程加入事件循环中等待被调用执行。 除了使用asyncio.create_task()外,还可以用低级loop.create_task() 或ensure_future()函数。
一种不普遍的写法import asyncioasync def func(): print(1) await asyncio.sleep(2) print(2) return 'return-value'async def main(): print('main start') # 创建Task对象,将当前执行func函数的任务添加到事件循环 task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) ret1 = await task1 ret2 = await task2 print('main end') print(ret1, ret2)asyncio.run(main())运行结果main start1122main endreturn-value return-value
普遍的写法import asyncioasync def func(): print(1) await asyncio.sleep(2) print(2) return 'return-value'async def main(): print('main start') # 创建Task对象,将当前执行func函数的任务添加到事件循环 task_list = [ asyncio.create_task(func(), name='t1'), asyncio.create_task(func(), name='t2') ] # 等待列表中的任务完成 done, pending = await asyncio.wait(task_list) # asyncio.wait返回一个二元组 print('main end') print(done)asyncio.run(main()) # 这一步就创建了事件循环运行结果main start1122main end{
也可以这么写import asyncioasync def func(): print(1) await asyncio.sleep(2) print(2) return 'return-value'task_list = [ func(), func()]done, pending = asyncio.run(asyncio.wait(task_list))print(done)
asyncio.Future对象
Task继承自Future,Task对象内部await结果的处理基于Future对象
示例1async def main(): # 获取创建的事件循环 loop = asyncio.get_running_loop() # 创建一个任务(Future对象),什么都不干 fut = loop.create_future() # 等待Future对象,没有结果会一直等下去 await futasyncio.run(main())
示例2import asyncioasync def set_after(fut): await asyncio.sleep(2) fut.set_result('555') async def main(): # 获取创建的事件循环 loop = asyncio.get_running_loop() # 创建一个任务(Future对象),什么都不干 fut = loop.create_future() # 创建一个任务(Task对象),绑定set_after函数,函数内部2s后,会给fut赋值 # 即手动设置future任务最终结果 await loop.create_task(set_after(fut)) # 等待Future对象 data = await fut print(data) asyncio.run(main())
concurrent.futures.Future对象(扩展)
使用线程池、进程池实现异步操作时用到的对象
import timefrom concurrent.futures import Futurefrom concurrent.futures import ThreadPoolExecutorfrom concurrent.futures import ProcessPoolExecutordef func(value): time.sleep(1) print(value) return 123# 创建线程池pool = ThreadPoolExecutor(max_workers=5)# 创建进程池# pool = ProcessPoolExecutor(max_workers=5)for i in range(10): fut = pool.submit(func, i) # print(fut)从输出结果可以看到执行结果无序
案例:asyncio+不支持异步模块import asyncioimport requestsasync def download_image(url): print('start download:', url) loop = asyncio.get_event_loop() # requests模块不支持异步操作,使用线程池配合实现 future = loop.run_in_executor(None, requests.get, url) response = await future print('download finish') file_name = url.rsplit('-')[-1] with open(file_name, mode='wb') as fp: fp.write(response.content) if __name__ == "__main__": url_list = [ 'url1', 'url2', 'url3' ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
异步迭代器
异步迭代器:实现了__aiter__()和__anext__()方法的对象(这两个方法自动执行),必须返回一个awaitable对象。async_for支持处理异步迭代器的__anext__()方法返回的可等待对象,直到引发一个stopAsyncIteration异常
异步可迭代对象:可在async_for语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器)
import asyncioclass Reader(object): # 自定义异步迭代器,同时也是异步可迭代对象 def __init__(self): self.count = 0 async def readline(self): self.count += 1 if self.count == 20: return None return self.count def __aiter__(self): return self async def __anext__(self): val = await self.readline() if val == None: raise StopAsyncIteration return valasync def func(): obj = Reader() async for item in obj: print(item)asyncio.run(func())
异步上下文管理器
通过定义__aenter__()和__aexit__()方法来对async_with语句中的环境进行控制,这两个方法自动执行。
import asyncioclass AsyncContextManager: def __init__(self): self.conn = None async def do_something(self): # 异步操作数据库 return 555 async def __aenter__(self): # 异步连接数据库 self.conn = await asyncio.sleep(1) return self async def __aexit__(self, exc_type, exc_val, exc_tb): # 异步关闭数据库 await asyncio.sleep(1)async def func(): async with AsyncContextManager() as f: result = await f.do_something() print(result)asyncio.run(func())
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~