Gointerface接口声明实现及作用详解
481
2022-09-04
multiprocessing:这样做才能重复利用 Python 的多核(use_multiprocessing)
平时也就使用 python 的处理数据的几个包做数据处理,很少使用 python 的一些其他的功能,之前一直想搞懂 python 自带的多线程/多进程包,反复看了很多次,也不太会用。
今天在家好好研究了一遍,不负有心人,终于可以用起来了:大概知道这个包怎么用,怎么按照我的想法用。我一直看的都是 python 的 multiprocessing 包。这次也是我的个人学习积累。喜欢记得收藏、点赞、关注。
【注】完整代码、数据、技术交流,文末获取。
什么是 multiprocessing
python 的官方文档是这么说的:multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它在 Unix 和 Windows 上均可运行。
multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool
其实在我眼里,我不在乎multiprocessing是多线程或者多进程,只要能充分利用cpu多核性能、提高我的计算效率就行。
使用 multiprocessing 目的
我们用这个包要实现什么功能
我想更快的计算;我想把计算的结果保存起来;我想把计算过程的进度可以反馈给我;如果遇到异常,能不能不要终止程序,继续跑;如果程序超时,可不可以终止;
上面就是我的目的,和我的需求;如果都达到了,那我就觉得是完成了。
创建一个场景
我们现在有一个非常耗时的函数,是计算一个值的平方:2 => 2 * 2,我们设置一个函数,每次计算都要休息1秒。那么在python中可以是这么写:
def myf(x): time.sleep(1) return x * x
如果我们不使用pandas、numpy包。我们可能需要这样写:
# 在不使用numpy的时候,进行简单的并行计算import timefrom tqdm import tqdmdef myf(x): time.sleep(1) return x * xif __name__ == '__main__': listx = range(10) listy = [] for i in tqdm(listx): temp_value = myf(x=i) listy.append(temp_value) print('final y:') print(listy)
一个简单的并行计算
我们使用multiprocessing的Pool。Pool对象可以自动的将数据分享给不同进程。啥都不用管,只要传递进入数据就行。在下面的代码中,我需要计算0到19的平方,我就设置了一个Pool对象,这个对象有4个进程,我把需要的计算的函数传递进去,需要被计算的可迭代的对象传递进去。然后数据就出来了。
⚠️:这里使用的是Pool的map函数
# 遇到一个大的数据,一个比较复杂的计算,使用并行,可以极大的提高计算效率from multiprocessing import Poolimport timefrom tqdm import tqdmdef myf(x): time.sleep(1) return x * xif __name__ == '__main__': print('-- start run ---') value_x = range(20) P = Pool(processes=4) value_y = P.map(func=myf, iterable=value_x) print(value_y)
实际上,计算还是很快的,但是我中间遇到一个情况,就是我不知道这个计算到什么程度了,是计算到50%了,还是计算到80%?这个不清楚。
我想要展示计算进度
为了展示多核计算的进度条,可以这么做:
⚠️:这里使用的是Pool的imap_unordered函数
# 相对于02,这个可以有个变动的进度条,进度条不断的衍生,但是缺点也很明显,缺少进度完成情况。from multiprocessing import Poolimport timefrom tqdm import tqdmdef myf(x): time.sleep(1) return x * xif __name__ == '__main__': # print('-- start run ---') value_x = range(20) P = Pool(processes=4) value_y = list(tqdm(P.imap_unordered(func=myf, iterable=value_x))) print(value_y)
这个虽然可以看到进度条不断的完成,但是没有办法看到完成率。
展示计算完成率
⚠️:这里使用的是Pool的apply_async函数
这里的apply_async需要的就不是一个迭代对象了,和上面的map、imap_unordered就有明显的差距,我们需要把一个迭代对象拿出来,放到apply_async里面,然后再将apply_async放到列表里面,然后在把这个东西从列表里面取出来。通过get调用他。
from multiprocessing import Poolfrom tqdm import tqdmimport timedef myf(x): time.sleep(1) return x * xif __name__ == '__main__': value_x= range(200) P = Pool(processes=4) # 这里计算很快 res = [P.apply_async(func=myf, args=(i, )) for i in value_x] # 主要是看这里 result = [i.get(timeout=2) for i in tqdm(res)] print(result)
当然,我看还有人用apply_async的callback参数,和上面的对比,感觉就有点麻烦,这里展示给各位:
from multiprocessing import Poolfrom tqdm import tqdmimport timedef myf(x): time.sleep(1) return x * xif __name__ == '__main__': value_x= range(200) P = Pool(processes=20) pbar = tqdm(total=len(value_x)) # 这里计算很快 res = [P.apply_async(func=myf, args=(i, ), callback= lambda _: pbar.update(1)) for i in value_x] # 主要是看这里 result = [i.get(timeout=2) for i in res] print(result)
处理异常
多核计算最经常遇到的就是遇到一个错误,然后就跳出来,这怎么可以忍。就拿最常见的错误来说,函数运行超时怎么解决?
⚠️:这里使用的是Pool的apply_async函数和get来解决
from multiprocessing import Pool, TimeoutErrorimport timefrom tqdm import tqdmdef myf(x): if x % 5 == 0: time.sleep(20.2) else: time.sleep(0.3) return x * xdef safely_get(value, timeout=2): try: data = value.get(timeout=timeout) except TimeoutError: data = 0 return dataif __name__ == '__main__': P = Pool(processes=10) value = range(100) pbar = tqdm(total=len(value))# way 2 res_temp = [P.apply_async(func=myf, args=(i,), callback=lambda _: pbar.update(1)) for i in value] # result = [res.get(timeout=3) for res in res_temp] result = [safely_get(res, timeout=1) for res in res_temp]# way 1# res_temp = [P.apply_async(func=myf, args=(i,)) for i in value]# result = [safely_get(res, timeout=1) for res in tqdm(res_temp)] time.sleep(1) print(result)
我把这个myf函数做了简单的处理,如果是5的倍速,就需要设置休息20.2秒,但是我的容忍度是每个函数运行时间不能超过1秒,所以我写了一饿过safely_get函数,这个函数里面有try,可以破获错误,如果超时,那么整个函数也不跳出,并且把结果返回为0。
多个进程修改同一个数据
来都来了,也把这个问题说一下:
由于python的GIL,导致同一时间,不同的进程不能同时修改同一个数据。但是使用multiprocessing包的Manager的dict,list之类的就可以。
下面这两个代码就是用来将子进程的相关信息保存到一个列表里面。然后保存到pandas里面。
使用Process做的:代码如下:
# multi sub process write data to one data (python dict)from multiprocessing import Process, Managerimport osimport timeimport pandas as pdfrom tqdm import tqdmdef worker(id, save_data): time.sleep(1) save_data[id] = { '子进程': [os.getpid()], '父进程': [os.getppid()], '进程id': [id] }if __name__ == "__main__": finaldata = Manager().dict() subprocess_list = [] for i in tqdm(range(200)): p = Process(target=worker, args=(i, finaldata)) subprocess_list.append(p) p.start() [p.join() for p in tqdm(subprocess_list)] finaldata = pd.concat([pd.DataFrame(value) for (key, value) in finaldata.items()]) print(finaldata)
使用Pool做的:
# multi sub process write data to one data (python dict)# use Poolfrom multiprocessing import Pool, Managerimport osimport timeimport pandas as pdfrom tqdm import tqdmdef worker(id, save_data): time.sleep(1) save_data[id] = { '子进程': [os.getpid()], '父进程': [os.getppid()], '进程id': [id] }if __name__ == "__main__": finaldata = Manager().dict() P = Pool(processes=20) # reslist = [] # for i in tqdm(range(200)): # res = P.apply_async(func=worker, args=(i, finaldata)) # reslist.append(res) reslist = [P.apply_async(func=worker, args=(i, finaldata)) for i in range(200)] [res.get(timeout=200) for res in tqdm(reslist)] finaldata = pd.concat([pd.DataFrame(value) for (key, value) in finaldata.items()]) print(finaldata)
技术交流
目前开通了技术交流群,群友已超过2000人,添加时最好的备注方式为:来源+兴趣方向,方便找到志同道合的朋友
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~