python-- 多进程队列 Queue、生成者和消费者(python编译器)

网友投稿 334 2022-08-25


python-- 多进程队列 Queue、生成者和消费者(python编译器)

多进程队列 Queue

# 栈:先进后出(First In Last Out 简称 FILO)# 队列: 先进先出(First In First Out 简称 FIFO)# # # import queue 不能进行多进程之间的数据传输# from multiprocessing import Queue 借助Queue解决生产者消费者模型# 队列是安全的。自带锁

from multiprocessing import Queueq = Queue(num) # num 队列的最大长度,为一个数字q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待q.put() # 阻塞,如果可以直接往队列中放数据,就直接放,如果不能放,就阻塞等待q.get_nowait() # 不阻塞,如果有数据直接获取,没有数据就报错q.put_nowait() # 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

案例

from multiprocessing import Queueq = Queue(3) # 队列的最大长度q.put(123)q.put("abc")q.put([4, 5, 6])print(q.get())print(q.get())print(q.get())

结果:

123abc[4, 5, 6]

正常执行,队列的长度为3,添加了三个,获取了三个

from multiprocessing import Queueq = Queue(3) # 队列的最大长度q.put(123)q.put("abc")q.put([4, 5, 6])q.put(999)print(q.get())print(q.get())print(q.get())

执行到q.put(999)阻塞,程序一直在等

from multiprocessing import Queueq = Queue(3) # 队列的最大长度q.put(123)q.put("abc")q.put([4, 5, 6])q.put_nowait(999)print(q.get())print(q.get())print(q.get())

执行到q.put(999)报错

from multiprocessing import Queueq = Queue(3) # 队列的最大长度q.put(123)q.put("abc")q.put([4, 5, 6])print(q.get())print(q.get())print(q.get())print(q.get_nowait()) # queue.Empty

执行到print(q.get_nowait())报错

生产者和消费者

生产者消费者模型主要是为解耦,借助队列来实现生产者消费者模型

from multiprocessing import Queue, Processfrom time import sleepdef consumer(q, name): while 1: info = q.get() if info: print('%s 拿走了%s' % (name, info)) else: # 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识 break # 此时消费者结束即可def producer(q, product): for i in range(5): info = '生产了' + product + '版的娃娃%s号' % str(i) q.put(info) print(info) q.put(None) # 让生产者生产完数据后,给消费者一个不再生产数据的标识if __name__ == '__main__': q = Queue(5) pro = Process(target=producer, args=(q, '波多小姐')) con = Process(target=consumer, args=(q, '苍老师')) pro.start() con.start()

结果:

生产了波多小姐版的娃娃0号生产了波多小姐版的娃娃1号生产了波多小姐版的娃娃2号生产了波多小姐版的娃娃3号生产了波多小姐版的娃娃4号苍老师 拿走了生产了波多小姐版的娃娃0号苍老师 拿走了生产了波多小姐版的娃娃1号苍老师 拿走了生产了波多小姐版的娃娃2号苍老师 拿走了生产了波多小姐版的娃娃3号苍老师 拿走了生产了波多小姐版的娃娃4号

改版上版的生产者和消费者

from multiprocessing import Queue, Processimport timedef consumer(q, name, color): while 1: info = q.get() if info: print('%s %s 拿走了%s \033[0m' % (color, name, info)) else: # 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识 break # 此时消费者结束即可def producer(q, product): for i in range(20): info = product + '的娃娃%s号' % str(i) q.put(info)if __name__ == '__main__': q = Queue(10) p_pro1 = Process(target=producer, args=(q, '波多小姐')) p_pro2 = Process(target=producer, args=(q, '苍老师')) p_pro3 = Process(target=producer, args=(q, '小泽老师')) p_con1 = Process(target=consumer, args=(q, '麻老师', '\033[31m')) p_con2 = Process(target=consumer, args=(q, '王老师', '\033[32m')) p_l = [p_con1, p_con2, p_pro1, p_pro2, p_pro3] [i.start() for i in p_l] # 父进程如何感知到生产者子进程不再生产数据了? p_pro1.join() p_pro2.join() p_pro3.join() q.put(None) # 几个消费者就要接受几个结束标识

结果

麻老师 拿走了波多小姐的娃娃0号 麻老师 拿走了波多小姐的娃娃2号 麻老师 拿走了波多小姐的娃娃3号 麻老师 拿走了波多小姐的娃娃4号 王老师 拿走了波多小姐的娃娃1号 麻老师 拿走了波多小姐的娃娃5号 麻老师 拿走了波多小姐的娃娃6号 王老师 拿走了波多小姐的娃娃7号 麻老师 拿走了波多小姐的娃娃8号 王老师 拿走了波多小姐的娃娃9号 麻老师 拿走了波多小姐的娃娃10号 王老师 拿走了波多小姐的娃娃11号 麻老师 拿走了波多小姐的娃娃12号 王老师 拿走了波多小姐的娃娃13号 麻老师 拿走了波多小姐的娃娃14号 王老师 拿走了波多小姐的娃娃15号 麻老师 拿走了波多小姐的娃娃16号 王老师 拿走了波多小姐的娃娃17号 麻老师 拿走了波多小姐的娃娃18号 王老师 拿走了波多小姐的娃娃19号 麻老师 拿走了苍老师的娃娃0号 王老师 拿走了苍老师的娃娃1号 麻老师 拿走了苍老师的娃娃2号 王老师 拿走了苍老师的娃娃3号 麻老师 拿走了苍老师的娃娃4号 王老师 拿走了苍老师的娃娃5号 麻老师 拿走了苍老师的娃娃6号 王老师 拿走了苍老师的娃娃7号 麻老师 拿走了苍老师的娃娃8号 王老师 拿走了苍老师的娃娃9号 麻老师 拿走了苍老师的娃娃10号 王老师 拿走了苍老师的娃娃11号 麻老师 拿走了苍老师的娃娃12号 王老师 拿走了苍老师的娃娃13号 麻老师 拿走了小泽老师的娃娃0号 王老师 拿走了苍老师的娃娃14号 麻老师 拿走了小泽老师的娃娃1号 王老师 拿走了苍老师的娃娃15号 麻老师 拿走了苍老师的娃娃16号 王老师 拿走了苍老师的娃娃17号 麻老师 拿走了苍老师的娃娃18号 王老师 拿走了小泽老师的娃娃2号 麻老师 拿走了苍老师的娃娃19号 王老师 拿走了小泽老师的娃娃3号 麻老师 拿走了小泽老师的娃娃4号 王老师 拿走了小泽老师的娃娃5号 麻老师 拿走了小泽老师的娃娃6号 王老师 拿走了小泽老师的娃娃7号 麻老师 拿走了小泽老师的娃娃8号 王老师 拿走了小泽老师的娃娃9号 麻老师 拿走了小泽老师的娃娃10号 王老师 拿走了小泽老师的娃娃11号 麻老师 拿走了小泽老师的娃娃12号 王老师 拿走了小泽老师的娃娃13号 麻老师 拿走了小泽老师的娃娃14号 王老师 拿走了小泽老师的娃娃15号 王老师 拿走了小泽老师的娃娃16号 麻老师 拿走了小泽老师的娃娃17号 王老师 拿走了小泽老师的娃娃18号 麻老师 拿走了小泽老师的娃娃19号

在改版

from multiprocessing import Process, JoinableQueueq = JoinableQueue()def consumer(q, name, color): while 1: info = q.get() print('%s %s 拿走了%s \033[0m' % (color, name, info)) q.task_done()def producer(q, product): for i in range(20): info = product + '的娃娃%s号' % str(i) q.put(info) q.join() # 记录了生产了20个数据在队列中,此时会阻塞等待消费者消费完队列中所有数据if __name__ == '__main__': q = JoinableQueue(10) p_pro1 = Process(target=producer, args=(q, '波多小姐')) p_con1 = Process(target=consumer, args=(q, '苍老师', '\033[31m')) p_con1.daemon = True # 把消费者进程设为守护进程 p_con1.start() p_pro1.start() p_pro1.join() # 主进程等待生产者进程结束

结果:

苍老师 拿走了波多小姐的娃娃0号 苍老师 拿走了波多小姐的娃娃1号 苍老师 拿走了波多小姐的娃娃2号 苍老师 拿走了波多小姐的娃娃3号 苍老师 拿走了波多小姐的娃娃4号 苍老师 拿走了波多小姐的娃娃5号 苍老师 拿走了波多小姐的娃娃6号 苍老师 拿走了波多小姐的娃娃7号 苍老师 拿走了波多小姐的娃娃8号 苍老师 拿走了波多小姐的娃娃9号 苍老师 拿走了波多小姐的娃娃10号 苍老师 拿走了波多小姐的娃娃11号 苍老师 拿走了波多小姐的娃娃12号 苍老师 拿走了波多小姐的娃娃13号 苍老师 拿走了波多小姐的娃娃14号 苍老师 拿走了波多小姐的娃娃15号 苍老师 拿走了波多小姐的娃娃16号 苍老师 拿走了波多小姐的娃娃17号 苍老师 拿走了波多小姐的娃娃18号 苍老师 拿走了波多小姐的娃娃19号

程序有3个进程,主进程和生产者进程和消费者进程。当主进程执行到p_pro1.join()时,主进程会等待生产进程结束

而生产进程中(q.join())会等待消费者进程把所有数据消费完,生产者进程才结束。

现在的状态就是主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据

所以,把消费者设置为守护进程,当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。整个程序也就能正常结束了。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:python-- 进程的常用属性、守护进程(python读取excel数据)
下一篇:@Autowired 自动注入接口失败的原因及解决
相关文章

 发表评论

暂时没有评论,来抢沙发吧~