Gointerface接口声明实现及作用详解
349
2022-09-08
在 Python 中使用 Pool 进行多处理(在线电影)
为什么要引入线程池
如果在程序中经常要用到线程,频繁的创建和销毁线程会浪费很多硬件资源,所以需要把线程和任务分离。线程可以反复利用,省去了重复创建的麻烦。在 Process 类中,我们必须显式地创建流程。但是,Pool 类更方便,您不必手动管理它。创建池对象的语法是 multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild, context) 。所有参数都是可选的。
processes 表示您要创建的工作进程的数量。默认值通过 os.cpu_count() 获取。initializer第二个初始化器参数是一个用于初始化的函数。initargs 是传递给它的参数。maxtasksperchild 表示分配给每个子进程的任务数。在完成该数量的任务之后,该进程将被一个新的工作进程替换。指定它的好处是任何未使用的资源都将被释放。如果未提供任何内容,则只要池存在,进程就会存在。
import timefrom multiprocessing import Pooldef square(x): print(f"start process:{x}") square = x * x print(f"square {x}:{square}") time.sleep(1) print(f"end process:{x}")if __name__ == "__main__": starttime = time.time() pool = Pool() pool.map(square, range(0, 5)) pool.close() endtime = time.time() print(f"Time taken {endtime-starttime} seconds")
结果为:
start process:0start process:1square 1:1square 0:0end process:1start process:2end process:0start process:3square 2:4square 3:9end process:3end process:2start process:4square 4:16end process:4Time taken 3.0474610328674316 seconds
在这里,我们从多处理模块中导入 Pool 类。在主函数中,我们创建了一个 Pool 类的对象。 pool.map() 将我们想要并行化的函数和一个可迭代的函数作为参数。它在可迭代的每个项目上运行给定的函数。它还接受一个可选的 chunksize 参数,它将可迭代对象拆分为等于给定大小的块,并将每个块作为单独的任务传递。 pool.close() 用于拒绝新任务。我们可以看到花费的时间大约是 3 秒。pool.imap() 与 pool.map() 方法几乎相同。不同的是,每个项目的结果都是在准备好后立即收到的,而不是等待所有项目都完成。此外, map() 方法将可迭代对象转换为列表(如果不是)。但是, imap() 方法没有。
来看下一个例子:
import timefrom multiprocessing import Pooldef square(x): print(f"start process {x}") square = x * x time.sleep(1) print(f"end process {x}") return squareif __name__ == "__main__": pool = Pool() a = pool.map(square, range(0, 5)) print(a)
运行结果:
start process 0start process 1end process 0start process 2end process 1start process 3end process 2start process 4end process 3end process 4[0, 1, 4, 9, 16]
from concurrent.futures import ThreadPoolExecutordef say_hello(): print("Hello") executor = ThreadPoolExecutor(50)for i in range(0, 10): executor.submit(say_hello)
练习
利用 Python 多线程模拟商品秒杀过程,不可以出现超买和超卖的情况。假设A商品有50件参与秒杀活动,10分钟秒杀自动结束。
kill_total 商品总数kill_num 成功抢购数kill_flag 有效标志位kill_user 成功抢购的用户ID
from redis_db import poolimport redisimport randomfrom concurrent.futures import ThreadPoolExecutors = set()while True: if len(s) == 1000: break num = random.randint(10000, 100000) s.add(num)print(s)con = redis.Redis( connection_pool=pool)try: con.delete("kill_total", "kill_num", "kill_flag", "kill_user") con.set("kill_total", 50) con.set("kill_num", 0) con.set("kill_flag", 1) con.expire("kill_flag", 600) except Exception as e: print(e)finally: del con executor = ThreadPoolExecutor(200)def buy(): connection = redis.Redis( connection_pool=pool ) pipline = connection.pipline() try: if connection.exists("kill_flag") == 1: pipline.watch("kill_num", "kill_user") total = pipline.get("kill_total") num = int(pipline.get("kill_num").decode("utf-8")) if num < total: pipline.multi() pipline.incr("kill_num") user_id = s.pop() pipline.rpush("kill_user", user_id) pipline.execute() except Exception as e: print(e) finally: if "pipline" in dir(): pipline.reset() del connection for i in range(0, 1000): executor.submit(buy)print("秒杀活动已经结束")
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~