ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
### 导航 - [索引](../genindex.xhtml "总目录") - [模块](../py-modindex.xhtml "Python 模块索引") | - [下一页](subprocess.xhtml "subprocess --- 子进程管理") | - [上一页](concurrent.xhtml "concurrent 包") | - ![](https://box.kancloud.cn/a721fc7ec672275e257bbbfde49a4d4e_16x16.png) - [Python](https://www.python.org/) » - zh\_CN 3.7.3 [文档](../index.xhtml) » - [Python 标准库](index.xhtml) » - [并发执行](concurrency.xhtml) » - $('.inline-search').show(0); | # [`concurrent.futures`](#module-concurrent.futures "concurrent.futures: Execute computations concurrently using threads or processes.") --- 启动并行任务 3\.2 新版功能. **源码:** [Lib/concurrent/futures/thread.py](https://github.com/python/cpython/tree/3.7/Lib/concurrent/futures/thread.py) \[https://github.com/python/cpython/tree/3.7/Lib/concurrent/futures/thread.py\] 和 [Lib/concurrent/futures/process.py](https://github.com/python/cpython/tree/3.7/Lib/concurrent/futures/process.py) \[https://github.com/python/cpython/tree/3.7/Lib/concurrent/futures/process.py\] - - - - - - [`concurrent.futures`](#module-concurrent.futures "concurrent.futures: Execute computations concurrently using threads or processes.") 模块提供异步执行回调高层接口。 异步执行可以由 [`ThreadPoolExecutor`](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 使用线程或由 [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 使用单独的进程来实现。 两者都是实现抽像类 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 定义的接口。 ## 执行器对象 *class* `concurrent.futures.``Executor`抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。 > `submit`(*fn*, *\*args*, *\*\*kwargs*)调度可调用对象 *fn*,以 `fn(*args **kwargs)` 方式执行并返回 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 对像代表可调用对象的执行。: > > > ``` > with ThreadPoolExecutor(max_workers=1) as executor: > future = executor.submit(pow, 323, 1235) > print(future.result()) > > ``` > > > > > `map`(*func*, *\*iterables*, *timeout=None*, *chunksize=1*)类似于 [`map(func, *iterables)`](functions.xhtml#map "map") 除去: > > - 应立即收集 *iterables* 不要延迟再收集; > - *func* 是异步执行的且对 *func* 的调用可以并发执行。 > > 如果 [`__next__()`](stdtypes.xhtml#iterator.__next__ "iterator.__next__") 已被调用且返回的结果在对 [`Executor.map()`](#concurrent.futures.Executor.map "concurrent.futures.Executor.map") 的原始调用经过 *timeout* 秒后还不可用,则已返回的迭代器将引发 [`concurrent.futures.TimeoutError`](#concurrent.futures.TimeoutError "concurrent.futures.TimeoutError")。 *timeout* 可以为 int 或 float 类型。 如果 *timeout* 未指定或为 `None`,则不限制等待时间。 > > 如果 *func* 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。 > > 使用 [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 时,这个方法会将 *iterables* 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 *chunksize* 指定正整数设置。 对很长的迭代器来说,使用大的 *chunksize* 值比默认值 1 能显著地提高性能。 *chunksize* 对 [`ThreadPoolExecutor`](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 没有效果。 > > 在 3.5 版更改: 加入 *chunksize* 参数。 > > > > `shutdown`(*wait=True*)当待执行的期程完成执行后向执行者发送信号,它就会释放正在使用的任何资源。调用 [`Executor.submit()`](#concurrent.futures.Executor.submit "concurrent.futures.Executor.submit") 和 [`Executor.submit()`](#concurrent.futures.Executor.submit "concurrent.futures.Executor.submit") 会在关闭后触发 [`RuntimeError`](exceptions.xhtml#RuntimeError "RuntimeError")。 > > 如果 *wait* 为 `True` 则此方法只有在所有待执行的期程完成执行且释放已分配的资源后才会返回。 如果 *wait* 为 `False`,方法立即返回,所有待执行的期程完成执行后会释放已分配的资源。 不管 *wait* 的值是什么,整个 Python 程序将等到所有待执行的期程完成执行后才退出。 > > 如果使用 [`with`](../reference/compound_stmts.xhtml#with) 语句,你就可以避免显式调用这个方法,它将会停止 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") (就好像 [`Executor.shutdown()`](#concurrent.futures.Executor.shutdown "concurrent.futures.Executor.shutdown") 调用时 *wait* 设为 `True` 一样等待): > > > ``` > import shutil > with ThreadPoolExecutor(max_workers=4) as e: > e.submit(shutil.copy, 'src1.txt', 'dest1.txt') > e.submit(shutil.copy, 'src2.txt', 'dest2.txt') > e.submit(shutil.copy, 'src3.txt', 'dest3.txt') > e.submit(shutil.copy, 'src4.txt', 'dest4.txt') > > ``` ## ThreadPoolExecutor [`ThreadPoolExecutor`](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 是 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 的子类,它使用线程池来异步执行调用。 当回调已关联了一个 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 然后再等待另一个 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 的结果时就会发生死锁情况。 例如: ``` import time def wait_on_b(): time.sleep(5) print(b.result()) # b will never complete because it is waiting on a. return 5 def wait_on_a(): time.sleep(5) print(a.result()) # a will never complete because it is waiting on b. return 6 executor = ThreadPoolExecutor(max_workers=2) a = executor.submit(wait_on_b) b = executor.submit(wait_on_a) ``` And: ``` def wait_on_future(): f = executor.submit(pow, 5, 2) # This will never complete because there is only one worker thread and # it is executing this function. print(f.result()) executor = ThreadPoolExecutor(max_workers=1) executor.submit(wait_on_future) ``` *class* `concurrent.futures.``ThreadPoolExecutor`(*max\_workers=None*, *thread\_name\_prefix=''*, *initializer=None*, *initargs=()*)[`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 的一个子类,使用最多 *max\_workers* 个线程的线程池来异步执行调用。 *initializer* 是在每个工作者线程开始处调用的一个可选可调用对象。 *initargs* 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, *initializer* 都将引发一个异常,当前所有等待的工作都会引发一个 [`BrokenThreadPool`](#concurrent.futures.thread.BrokenThreadPool "concurrent.futures.thread.BrokenThreadPool")。 在 3.5 版更改: 如果 *max\_workers* 为 `None` 或没有指定,将默认为机器处理器的个数,假如 [`ThreadPoolExecutor`](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 则重于 I/O 操作而不是 CPU 运算,那么可以乘以 `5`,同时工作线程的数量可以比 [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 的数量高。 3\.6 新版功能: 添加 *thread\_name\_prefix* 参数允许用户控制由线程池创建的 [`threading.Thread`](threading.xhtml#threading.Thread "threading.Thread") 工作线程名称以方便调试。 在 3.7 版更改: 加入 *initializer* 和\*initargs\* 参数。 ### ProcessPoolExecutor例子 ``` import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) ``` ## ProcessPoolExecutor [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 是 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 的子类,它使用进程池来实现异步执行调用。 [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 使用 [`multiprocessing`](multiprocessing.xhtml#module-multiprocessing "multiprocessing: Process-based parallelism.") 回避 [Global Interpreter Lock](../glossary.xhtml#term-global-interpreter-lock) 但也意味着只可以处理和返回可序列化的对象。 `__main__` 模块必须可以被工作者子进程导入。这意味着 [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 不可以工作在交互式解释器中。 从提交给 [`ProcessPoolExecutor`](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 的回调中调用 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 或 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 方法会导致死锁。 *class* `concurrent.futures.``ProcessPoolExecutor`(*max\_workers=None*, *mp\_context=None*, *initializer=None*, *initargs=()*)[`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 子类使用一个最多可有 *max\_workers* 个进程的进程池异步执行调用。如果 *max\_workers* 为 `None` 或没有指定,默认为机器的处理器个数。如果 *max\_workers* 小于或等于``0``将会引发 [`ValueError`](exceptions.xhtml#ValueError "ValueError") 。 *mp\_context* 可以是一个多进程环境或 `None` 。它将用来启动工作者进程。如果 *mp\_context* 为 `None` 或没有指定,将使用默认多进程环境。 *initializer* 是在每个工作者进程开始处调用的一个可选可调用对象。 *initargs* 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, *initializer* 都将引发一个异常,当前所有等待的工作都会引发一个 `BrokenProcessPool`。 在 3.3 版更改: 如果其中一个工作进程被突然终止,`BrokenProcessPool` 就会马上触发。可预计的行为没有定义,但执行器上的操作或它的期程会被冻结或死锁。 在 3.7 版更改: 添加 *mp\_context* 参数允许用户控制由进程池创建给工作者进程的开始方法 。 加入 *initializer* 和\*initargs\* 参数。 ### ProcessPoolExecutor例子 ``` import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main() ``` ## 期程对象 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 类将可调用对象封装为异步执行。[`Future`](#concurrent.futures.Future "concurrent.futures.Future") 实例由 [`Executor.submit()`](#concurrent.futures.Executor.submit "concurrent.futures.Executor.submit") 创建。 *class* `concurrent.futures.``Future`将可调用对象封装为异步执行。[`Future`](#concurrent.futures.Future "concurrent.futures.Future") 实例由 [`Executor.submit()`](#concurrent.futures.Executor.submit "concurrent.futures.Executor.submit") 创建,除非测试,不应直接创建。 > `cancel`()尝试取消调用。如果调用正在执行而且不能被取消那么方法返回 `False`,否则调用会被取消同时方法返回 `True`。 > > `cancelled`()如果调用成功取消返回 `True`。 > > `running`()如果调用正在执行而且不能被取消那么返回``True``。 > > `done`()如果调用已被取消或正常结束那么返回 `True`。 > > `result`(*timeout=None*)返回调用返回的值。如果调用还没完成那么这个方法将等待 *timeout* 秒。如果在 *timeout* 秒内没有执行完成,[`concurrent.futures.TimeoutError`](#concurrent.futures.TimeoutError "concurrent.futures.TimeoutError") 将会被触发。*timeout* 可以是整数或浮点数。如果 *timeout* 没有指定或为 `None`,那么等待时间就没有限制。 > > 如果 futrue 在完成前被取消则 [`CancelledError`](#concurrent.futures.CancelledError "concurrent.futures.CancelledError") 将被触发。 > > 如果调用引发了一个异常,这个方法也会引发同样的异常。 > > `exception`(*timeout=None*)返回由调用引发的异常。如果调用还没完成那么这个方法将等待 *timeout* 秒。如果在 *timeout* 秒内没有执行完成,[`concurrent.futures.TimeoutError`](#concurrent.futures.TimeoutError "concurrent.futures.TimeoutError") 将会被触发。*timeout* 可以是整数或浮点数。如果 *timeout* 没有指定或为 `None`,那么等待时间就没有限制。 > > 如果 futrue 在完成前被取消则 [`CancelledError`](#concurrent.futures.CancelledError "concurrent.futures.CancelledError") 将被触发。 > > 如果调用正常完成那么返回 `None`。 > > `add_done_callback`(*fn*)附加可调用 *fn* 到期程。当期程被取消或完成运行时,将会调用 *fn*,而这个期程将作为它唯一的参数。 > > 加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 [`Exception`](exceptions.xhtml#Exception "Exception") 子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 [`BaseException`](exceptions.xhtml#BaseException "BaseException") 子类,这个行为没有定义。 > > 如果期程已经完成或已取消,*fn* 会被立即调用。 下面这些 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 方法用于单元测试和 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 实现。 > `set_running_or_notify_cancel`()这个方法只可以在执行关联 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 工作之前由 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 实现调用或由单测试调用。 > > 如果这个方法返回 `False` 那么 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 已被取消,即 [`Future.cancel()`](#concurrent.futures.Future.cancel "concurrent.futures.Future.cancel") 已被调用并返回 `True` 。等待 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 完成 (即通过 [`as_completed()`](#concurrent.futures.as_completed "concurrent.futures.as_completed") 或 [`wait()`](#concurrent.futures.wait "concurrent.futures.wait")) 的线程将被唤醒。 > > 如果这个方法返回 `True` 那么 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 不会被取消并已将它变为正在运行状态,也就是说调用 [`Future.running()`](#concurrent.futures.Future.running "concurrent.futures.Future.running") 时将返回 True。 > > 这个方法只可以被调用一次并且不能在调用 [`Future.set_result()`](#concurrent.futures.Future.set_result "concurrent.futures.Future.set_result") 或 [`Future.set_exception()`](#concurrent.futures.Future.set_exception "concurrent.futures.Future.set_exception") 之后再调用。 > > `set_result`(*result*)设置将 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 关联工作的结果给 *result* 。 > > 这个方法只可以由 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 实现和单元测试使用。 > > `set_exception`(*exception*)设置 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 关联工作的结果给 [`Exception`](exceptions.xhtml#Exception "Exception") *exception* 。 > > 这个方法只可以由 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 实现和单元测试使用。 ## 模块函数 `concurrent.futures.``wait`(*fs*, *timeout=None*, *return\_when=ALL\_COMPLETED*)等待 *fs* 指定的 [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 实例(可能由不同的 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 实例创建)完成。返回一个已被命名的2元元组集合。第一个集合被命名为 `done` ,包含等待完成前已完成的期程(正常结束或被取消)。第二个集合被命名为 `not_done`,包含未完成的期程。 *timeout* 可以用来控制返回前最大的等待秒数。 *timeout* 可以为 int 或 float 类型。 如果 *timeout* 未指定或为 `None` ,则不限制等待时间。 *return\_when* 指定此函数应在何时返回。它必须为以下常数之一: 常数 描述 `FIRST_COMPLETED` 函数将在任意可等待对象结束或取消时返回。 `FIRST_EXCEPTION` 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 `ALL_COMPLETED`。 `ALL_COMPLETED` 函数将在所有可等待对象结束或取消时返回。 `concurrent.futures.``as_completed`(*fs*, *timeout=None*)通过 *fs* 指定的 the [`Future`](#concurrent.futures.Future "concurrent.futures.Future") 实例(可能由不同的 [`Executor`](#concurrent.futures.Executor "concurrent.futures.Executor") 实例创建)返回一个迭代器,当它们完成时(正常结束或被取消)产生期程。任何被 *fs* 指定的重复期程都将会返回一次。首先产生调用 [`as_completed()`](#concurrent.futures.as_completed "concurrent.futures.as_completed") 前已完成的期程。 当 [`__next__()`](stdtypes.xhtml#iterator.__next__ "iterator.__next__") 调用以及从原始调用到 [`as_completed()`](#concurrent.futures.as_completed "concurrent.futures.as_completed") 的时间超过 *timeout* 秒后结果还不可用时返回的迭代器就会引发 [`concurrent.futures.TimeoutError`](#concurrent.futures.TimeoutError "concurrent.futures.TimeoutError") 。 *timeout* 可以为 int 或 float 类型。 如果 *timeout* 未指定或为 `None` ,则不限制等待时间。 参见 [**PEP 3148**](https://www.python.org/dev/peps/pep-3148) \[https://www.python.org/dev/peps/pep-3148\] -- futures - 异步执行指令。该提案描述了Python标准库中包含的这个特性。 ## Exception类 *exception* `concurrent.futures.``CancelledError`future被取消时会触发。 *exception* `concurrent.futures.``TimeoutError`future运算超出给定的超时数值时触发。 *exception* `concurrent.futures.``BrokenExecutor`当执行器被某些原因中断而且不能用来提交或执行新任务时就会被引发派生于 [`RuntimeError`](exceptions.xhtml#RuntimeError "RuntimeError") 的异常类。 3\.7 新版功能. *exception* `concurrent.futures.thread.``BrokenThreadPool`当 `ThreadPoolExecutor` 中的其中一个工作者初始化失败时会引发派生于 [`BrokenExecutor`](#concurrent.futures.BrokenExecutor "concurrent.futures.BrokenExecutor") 的异常类。 3\.7 新版功能. *exception* `concurrent.futures.process.``BrokenProcessPool`当 `ThreadPoolExecutor` 中的其中一个工作者不完整终止时(比如,被外部杀死)会引发派生于 [`BrokenExecutor`](#concurrent.futures.BrokenExecutor "concurrent.futures.BrokenExecutor") ( 原名 [`RuntimeError`](exceptions.xhtml#RuntimeError "RuntimeError") ) 的异常类。 3\.3 新版功能. ### 导航 - [索引](../genindex.xhtml "总目录") - [模块](../py-modindex.xhtml "Python 模块索引") | - [下一页](subprocess.xhtml "subprocess --- 子进程管理") | - [上一页](concurrent.xhtml "concurrent 包") | - ![](https://box.kancloud.cn/a721fc7ec672275e257bbbfde49a4d4e_16x16.png) - [Python](https://www.python.org/) » - zh\_CN 3.7.3 [文档](../index.xhtml) » - [Python 标准库](index.xhtml) » - [并发执行](concurrency.xhtml) » - $('.inline-search').show(0); | © [版权所有](../copyright.xhtml) 2001-2019, Python Software Foundation. Python 软件基金会是一个非盈利组织。 [请捐助。](https://www.python.org/psf/donations/) 最后更新于 5月 21, 2019. [发现了问题](../bugs.xhtml)? 使用[Sphinx](http://sphinx.pocoo.org/)1.8.4 创建。