性能相关异步IO(高频数据 异步性)

网友投稿 230 2022-09-04


性能相关异步IO(高频数据 异步性)

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。

import requestsdef fetch_async(url): response = requests.get(url) return responseurl_list = [''url in url_list: fetch_async(url)1.同步执行

同步

"""可以实现并发但是,请求发送出去后和返回之前,中间时期线程空闲编写方式: - 直接返回处理 - 通过回调函数处理"""########### 编写方式一 ###########"""from concurrent.futures import ThreadPoolExecutorimport requestsimport timedef task(url): response = requests.get(url) print(url,response) # 写正则表达式pool = ThreadPoolExecutor(7)url_list = [ ' ' ' ' ' ' 'url in url_list: pool.submit(task,url)pool.shutdown(wait=True)"""########### 编写方式二 ###########from concurrent.futures import ThreadPoolExecutorimport requestsimport timedef task(url): """ 下载页面 :param url: :return: """ response = requests.get(url) return responsedef done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content)pool = ThreadPoolExecutor(7)url_list = [ ' ' ' ' ' ' 'url in url_list: v = pool.submit(task,url) v.add_done_callback(done)pool.shutdown(wait=True)

多线程执行

"""可以实现并发但是,请求发送出去后和返回之前,中间时期进程空闲编写方式: - 直接返回处理 - 通过回调函数处理"""########### 编写方式一 ###########"""from concurrent.futures import ProcessPoolExecutorimport requestsimport timedef task(url): response = requests.get(url) print(url,response) # 写正则表达式pool = ProcessPoolExecutor(7)url_list = [ ' ' ' ' ' ' 'url in url_list: pool.submit(task,url)pool.shutdown(wait=True)"""########### 编写方式二 ###########from concurrent.futures import ProcessPoolExecutorimport requestsimport timedef task(url): response = requests.get(url) return responsedef done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content)pool = ProcessPoolExecutor(7)url_list = [ ' ' ' ' ' ' 'url in url_list: v = pool.submit(task,url) v.add_done_callback(done)pool.shutdown(wait=True)

多进程

import asyncio"""@asyncio.coroutinedef task(): print('before...task......') yield from asyncio.sleep(5) # 发送Http请求,支持TCP获取结果.. print('end...task......')tasks = [task(), task()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))loop.close()""""""import asyncio@asyncio.coroutinedef task(host, url='/'): print('start',host,url) reader, writer = yield from asyncio.open_connection(host, 80) request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host,) request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print('end',host, url, text) writer.close()tasks = [ task('***.com', '/wupeiqi/'), task('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()""""""import aioasyncio@asyncio.coroutinedef fetch_async(url): print(url) response = yield from aiourl) print(url, response) response.close()tasks = [fetch_async('fetch_async('= asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))event_loop.close()"""import asyncioimport requests@asyncio.coroutinedef task(func, *args): print(func,args) loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) # requests.get(' response = yield from future print(response.url, response.content)tasks = [ task(requests.get, ' task(requests.get, '= asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

asyncio

"""import geventimport requestsfrom gevent import monkeymonkey.patch_all()def task(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url, response.content)# ##### 发送请求 ###### gevent.joinall([# gevent.spawn(task, method='get', url='req_kwargs={}),# gevent.spawn(task, method='get', url='req_kwargs={}),# gevent.spawn(task, method='get', url='req_kwargs={}),# ])# ##### 发送请求(协程池控制最大协程数量) #####from gevent.pool import Poolpool = Pool(5)gevent.joinall([ pool.spawn(task, method='get', url='req_kwargs={}), pool.spawn(task, method='get', url='req_kwargs={}), pool.spawn(task, method='get', url='req_kwargs={}),])"""import grequestsimport grequestsrequest_list = [ grequests.get('timeout=0.001), grequests.get(' grequests.get('##### 执行并获取响应列表 #####response_list = grequests.map(request_list,size=5)print(response_list)

gevent

#!/usr/bin/env python# -*- coding:utf-8 -*-from twisted.internet import deferfrom twisted.web.client import getPagefrom twisted.internet import reactordef one_done(arg): print(arg)def all_done(arg): print('done') reactor.stop()@defer.inlineCallbacksdef task(url): res = getPage(bytes(url, encoding='utf8')) # 发送Http请求 res.addCallback(one_done) yield resurl_list = [ ' ' ' '= [] # [特殊,特殊,特殊(已经向url发送请求)]for url in url_list: v = task(url) defer_list.append(v)d = defer.DeferredList(defer_list)d.addBoth(all_done)reactor.run() # 死循环

twisted

#!/usr/bin/env python# -*- coding:utf-8 -*-from tornado.import AsyncHTTPClientfrom tornado.import HTTPRequestfrom tornado import ioloopCOUNT = 0def handle_response(response): global COUNT COUNT -= 1 if response.error: print("Error:", response.error) else: print(response.body) # 方法同twisted # ioloop.IOLoop.current().stop() if COUNT == 0: ioloop.IOLoop.current().stop()def func(): url_list = [ ' ' ] global COUNT COUNT = len(url_list) for url in url_list: print(url) = AsyncHTTPClient() handle_response)ioloop.IOLoop.current().add_callback(func)ioloop.IOLoop.current().start() # 死循环

tornado

最牛逼自定义异步io框架

import socketimport select# ########################## HTTP请求本质,阻塞 ##########################"""sk = socket.socket()# 1.连接sk.connect(('baidu.com',80,)) # IO阻塞print('连接成功了...')# 2. 连接成功发送消息sk.send(b'GET / HTTP/1.0\r\nHost:baidu.com\r\n\r\n')# sk.send(b'POST / HTTP/1.0\r\nHost:baidu.com\r\n\r\nk1=v1&k2=v2')# 3. 等待着服务端响应data = sk.recv(8096) # IO阻塞print(data)# 关闭连接sk.close()"""# ########################## HTTP请求本质,非阻塞 ##########################"""sk = socket.socket()sk.setblocking(False)# 1.连接try: sk.connect(('baidu.com',80,)) # IO阻塞 print('连接成功了...')except BlockingIOError as e: print(e)# 2. 连接成功发送消息sk.send(b'GET / HTTP/1.0\r\nHost:baidu.com\r\n\r\n')# sk.send(b'POST / HTTP/1.0\r\nHost:baidu.com\r\n\r\nk1=v1&k2=v2')# 3. 等待着服务端响应data = sk.recv(8096) # IO阻塞print(data)# 关闭连接sk.close()"""class HttpRequest: def __init__(self,sk,host,callback): self.socket = sk self.host = host self.callback = callback def fileno(self): return self.socket.fileno()class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b'\r\n\r\n', 1) self.body = body header_list = headers.split(b'\r\n') for h in header_list: h_str = str(h,encoding='utf-8') v = h_str.split(':',1) if len(v) == 2: self.header_dict[v[0]] = v[1]class AsyncRequest: def __init__(self): self.conn = [] self.connection = [] # 用于检测是否已经连接成功 def add_request(self,host,callback): try: sk = socket.socket() sk.setblocking(0) sk.connect((host,80,)) except BlockingIOError as e: pass request = HttpRequest(sk,host,callback) self.conn.append(request) self.connection.append(request) def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) for w in wlist: print(w.host,'连接成功...') # 只要能循环到,表示socket和服务器端已经连接成功 tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" %(w.host,) w.socket.send(bytes(tpl,encoding='utf-8')) self.connection.remove(w) for r in rlist: # r,是HttpRequest recv_data = bytes() while True: try: chunck = r.socket.recv(8096) recv_data += chunck except Exception as e: break response = HttpResponse(recv_data) r.callback(response) r.socket.close() self.conn.remove(r) if len(self.conn) == 0: breakdef f1(response): print('保存到文件',response.header_dict)def f2(response): print('保存到数据库', response.header_dict)url_list = [ {'host':'baidu.com','callback': f1}, {'host':'cn.bing.com','callback': f2}, {'host':'***.com','callback': f2},]req = AsyncRequest()for item in url_list: req.add_request(item['host'],item['callback'])req.run()

异步io

- 傻等

response = requests.get(....)

- 机智

response = requests.get(....)

response = requests.get(....)

response = requests.get(....)

response = requests.get(....)

角色:使用者

- 多线程

- 多线程

- 协程(微线程) + 异步IO =》 1个线程发送N个Http请求

- asyncio

- 示例1:asyncio.sleep(5)

- 示例2:自己封装Http数据包

- 示例3:asyncio+aiopip3 install aio示例4:asyncio+requests

requests模块:封装Http数据包 pip3 install requests

- gevent,greenlet+异步IO

pip3 install greenlet

pip3 install gevent

- 示例1:gevent+requests

- 示例2:gevent(协程池,最多发多少个请求)+requests

- 示例3:gevent+requests => grequests

pip3 install grequests

- Twisted

pip3 install twisted

- Tornado

pip3 install tornado

=====> gevent > Twisted > Tornado > asyncio

角色:NB开发者

1. socket客户端、服务端

连接阻塞

setblocking(0): 无数据(连接无响应;数据未返回)就报错

2. IO多路复用

客户端:

try:

socket对象1.connet()

socket对象2.connet()

socket对象3.connet()

except Ex..

pass

while True:

r,w,e = select.select([socket对象1,socket对象2,socket对象3,],[socket对象1,socket对象2,socket对象3,],[],0.05)

r = [socket对象1,] # 表示有人给我发送数据

xx = socket对象1.recv()

w = [socket对象1,] # 表示我已经和别人创建连接成功:

socket对象1.send('"""GET /index HTTP/1.0\r\nHost: baidu.com\r\n\r\n"""')

3.

class Foo:

def fileno(self):

obj = socket()

return obj.fileno()

r,w,e = select.select([socket对象?,对象?,对象?,Foo()],[],[])

# 对象必须有: fileno方法,并返回一个文件描述符

========

a. select内部:对象.fileno()

b. Foo()内部封装socket文件描述符

IO多路复用:

r,w,e=while 监听多个socket对象 r,w,e接收变化,利用其特性可以开发

异步IO:

非阻塞的socket+IO多路复用


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Celery 分布式任务队列快速入门(celery中文文档)
下一篇:SpringBoot实现接口等幂次校验的示例代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~