企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 进程池与线程池统一的方法 我们知道multiprocessing 线程模块里面有线程池,但是由于threading模块被开发得较早,所以里面没有池的概念和方法 后来python官方推出了包含的进程池和线程池等和池有关的所有工具的全新方法**concurrent.futures模块** ### **concurrent.futures模块介绍** 官网:https://docs.python.org/dev/library/concurrent.futures.html concurrent.futures模块提供了高度封装的异步调用接口,线程池和进程池使用方法一致 * ThreadPoolExecutor:线程池,提供异步调用 * ProcessPoolExecutor: 进程池,提供异步调用 ### **基本方法** 1. submit(fn, *args, **kwargs) 异步提交任务 2. map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 3. shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 4. result(timeout=None) 取得结果 5. add_done_callback(fn) 回调函数 6. done() 判断某一个线程是否完成 7. cancle() 取消某个任务 ## 进程池/线程池用法举例 线程池和进程池的用法完全一样 ### 进程池案例 ~~~ from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(6): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result()) ~~~ 执行结果: ``` 9312 is runing 9960 is runing 2720 is runing 2720 is runing 2720 is runing 9312 is runing +++> 0 1 4 9 16 25 ``` ### 线程池案例: ~~~ from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor import os,time,random def noah(n): print('线程:%s'%n) time.sleep(random.randint(1,5)) if __name__ == '__main__': thread=ThreadPoolExecutor(3) for i in range(6): thread.submit(noah,i) print('主.......') ~~~ 执行结果 ``` 线程0 线程1 线程2 主....... 线程3 线程4 线程5 ``` ## map方法和回调函数 ### map方法 将前面的for循环改为用map方法来实现 ~~~ from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor import os,time,random def noah(n): print('线程:%s'%n) time.sleep(random.randrange(0,2)) if __name__ == '__main__': thread=ThreadPoolExecutor(3) thread.map(noah,range(6)) thread.map(noah, range(6)) thread.shutdown() print('主.......') ~~~ 说明: 之前的Process模块的map方法自带jion方法,会等待子进程结束再运行后面的主进程,但是这个新模块的map方法,不自带shutdown功能,也就是说可以多次提交map后,用一个shutdown方法一起提交等待 ### 回调函数 可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数 ~~~ from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor import os,time,random def noah(n): # print('线程:%s'%n) time.sleep(random.randrange(0,2)) return n*n def call(m): print(m.result()) #拿到的是一个对象obj,需要用obj.result()拿到结果 if __name__ == '__main__': thread=ThreadPoolExecutor(3) for i in range(6): thread.submit(noah,i).add_done_callback(call) print('主.......') ~~~ ## **对比总结:** * **multiprocessing.Pool模块** apply_async异步提交任务 得join,close之后才能维护主进程与池之间的同步 map自带join,close效果 获取执行结果,get 回调函数 指定callback参数,由主进程执行 * **concurrent.futures模块** submit 异步提交任务 shutdown 同步 map是不自带shutdown 获取执行结果 result 回调函数 直接调用add_done_callback方法,由子线程/子进程执行 * **concurrent.futures优势** 用concurrent.futures,可以轻松的在进程和线程之间切换 并发程序,线程池进程池都要用,只需要导入一个模块 concurrent.futures是一个新的模块,统一了线程池和进程池的使用方式,对一些操作进行了更合理的规划 * 线程池一般的线程个数 :**CPU个数*5** * 进程池一般的进程个数:**CPU个数+1**