concurrent.futures —- 启动并行任务

3.2 新版功能.

源码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模块提供异步执行可调用对象高层接口。

异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口。

Availability: not Emscripten, not WASI.

This module does not work or is not available on WebAssembly platforms wasm32-emscripten and wasm32-wasi. See WebAssembly platforms for more information.

Executor 对象

class concurrent.futures.Executor

抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。

  • submit(fn, /, \args, **kwargs*)

    调度可调用对象 fn,以 fn(*args, **kwargs) 方式执行并返回一个代表该可调用对象的执行的 Future 对象。

    1. with ThreadPoolExecutor(max_workers=1) as executor:
    2. future = executor.submit(pow, 323, 1235)
    3. print(future.result())
  • map(func, \iterables, timeout=None, chunksize=1*)

    类似于 map(func, *iterables) 函数,除了以下两点:

    • iterables 是立即执行而不是延迟执行的;

    • func 是异步执行的,对 func 的多个调用可以并发执行。

  1. The returned iterator raises a [TimeoutError]($e69e6dad35dc3610.md#TimeoutError "TimeoutError") if [\_\_next\_\_()]($c2b73e6dbd6fbe32.md#iterator.__next__ "iterator.__next__") is called and the result isn't available after *timeout* seconds from the original call to [Executor.map()](#concurrent.futures.Executor.map "concurrent.futures.Executor.map"). *timeout* can be an int or a float. If *timeout* is not specified or `None`, there is no limit to the wait time.
  2. 如果 *func* 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。
  3. 使用 [ProcessPoolExecutor](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 时,这个方法会将 *iterables* 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 *chunksize* 指定正整数设置。 对很长的迭代器来说,使用大的 *chunksize* 值比默认值 1 能显著地提高性能。 *chunksize* [ThreadPoolExecutor](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 没有效果。
  4. 3.5 版更改: 加入 *chunksize* 参数。
  • shutdown(wait=True, **, cancel_futures=False*)

    当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。 在关闭后调用 Executor.submit()Executor.map() 将会引发 RuntimeError

    如果 waitTrue 则此方法只有在所有待执行的 future 对象完成执行且释放已分配的资源后才会返回。 如果 waitFalse,方法立即返回,所有待执行的 future 对象完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。

    如果 cancel_futuresTrue,此方法将取消所有执行器还未开始运行的挂起的 Future。 任何已完成或正在运行的 Future 将不会被取消,无论 cancel_futures 的值是什么?

    如果 cancel_futureswait 均为 True,则执行器已开始运行的所有 Future 将在此方法返回之前完成。 其余的 Future 会被取消。

    如果使用 with 语句,你就可以避免显式调用这个方法,它将会停止 Executor (就好像 Executor.shutdown() 调用时 wait 设为 True 一样等待):

    1. import shutil
    2. with ThreadPoolExecutor(max_workers=4) as e:
    3. e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    4. e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    5. e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    6. e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

    在 3.9 版更改: 增加了 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

当可调用对象已关联了一个 Future 然后在等待另一个 Future 的结果时就会导致死锁情况。例如:

  1. import time
  2. def wait_on_b():
  3. time.sleep(5)
  4. print(b.result()) # b will never complete because it is waiting on a.
  5. return 5
  6. def wait_on_a():
  7. time.sleep(5)
  8. print(a.result()) # a will never complete because it is waiting on b.
  9. return 6
  10. executor = ThreadPoolExecutor(max_workers=2)
  11. a = executor.submit(wait_on_b)
  12. b = executor.submit(wait_on_a)

与:

  1. def wait_on_future():
  2. f = executor.submit(pow, 5, 2)
  3. # This will never complete because there is only one worker thread and
  4. # it is executing this function.
  5. print(f.result())
  6. executor = ThreadPoolExecutor(max_workers=1)
  7. executor.submit(wait_on_future)

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=’’, initializer=None, initargs=())

Executor 子类使用最多 max_workers 个线程的线程池来异步执行调用。

All threads enqueued to ThreadPoolExecutor will be joined before the interpreter can exit. Note that the exit handler which does this is executed before any exit handlers added using atexit. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that ThreadPoolExecutor not be used for long-running tasks.

initializer 是在每个工作者线程开始处调用的一个可选可调用对象。 initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenThreadPool

在 3.5 版更改: 如果 max_workersNone 或没有指定,将默认为机器处理器的个数,假如 ThreadPoolExecutor 则重于I/O操作而不是CPU运算,那么可以乘以 5 ,同时工作线程的数量可以比 ProcessPoolExecutor 的数量高。

3.6 新版功能: 添加 thread_name_prefix 参数允许用户控制由线程池创建的 threading.Thread 工作线程名称以方便调试。

在 3.7 版更改: 加入 initializer 和*initargs* 参数。

在 3.8 版更改: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。 这个默认值会保留至少 5 个工作线程用于 I/O 密集型任务。 对于那些释放了 GIL 的 CPU 密集型任务,它最多会使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。

现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。

ThreadPoolExecutor 例子

  1. import concurrent.futures
  2. import urllib.request
  3. URLS = ['http://www.foxnews.com/',
  4. 'http://www.cnn.com/',
  5. 'http://europe.wsj.com/',
  6. 'http://www.bbc.co.uk/',
  7. 'http://some-made-up-domain.com/']
  8. # Retrieve a single page and report the URL and contents
  9. def load_url(url, timeout):
  10. with urllib.request.urlopen(url, timeout=timeout) as conn:
  11. return conn.read()
  12. # We can use a with statement to ensure threads are cleaned up promptly
  13. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  14. # Start the load operations and mark each future with its URL
  15. future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
  16. for future in concurrent.futures.as_completed(future_to_url):
  17. url = future_to_url[future]
  18. try:
  19. data = future.result()
  20. except Exception as exc:
  21. print('%r generated an exception: %s' % (url, exc))
  22. else:
  23. print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 类是 Executor 的子类,它使用进程池来异步地执行调用。 ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过 全局解释器锁 但也意味着只可以处理和返回可封存的对象。

__main__ 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中。

从可调用对象中调用 ExecutorFuture 的方法提交给 ProcessPoolExecutor 会导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

异步地执行调用的 Executor 子类使用最多具有 max_workers 个进程的进程池。 如果 max_workersNone 或未给出,它将默认为机器的处理器个数。 如果 max_workers 小于等于 0,则将引发 ValueError。 在 Windows 上,max_workers 必须小于等于 61,否则将引发 ValueError。 如果 max_workersNone,则所选择的默认值最多为 61,即使存在更多的处理器。 mp_context 可以是一个多进程上下文或是 None。 它将被用来启动工作进程。 如果 mp_contextNone 或未给出,则将使用默认的多进程上下文。

initializer 是一个可选的可调用对象,它会在每个工作进程启动时被调用;initargs 是传给 initializer 的参数元组。 如果 initializer 引发了异常,则所有当前在等待的任务以及任何向进程池提交更多任务的尝试都将引发 BrokenProcessPool

max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is None which means worker processes will live as long as the pool. When a max is specified, the “spawn” multiprocessing start method will be used by default in absence of a mp_context parameter. This feature is incompatible with the “fork” start method.

在 3.3 版更改: 如果其中一个工作进程被突然终止,BrokenProcessPool 就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。

在 3.7 版更改: 添加 mp_context 参数允许用户控制由进程池创建给工作者进程的开始方法 。

加入 initializer 和*initargs* 参数。

在 3.11 版更改: The max_tasks_per_child argument was added to allow users to control the lifetime of workers in the pool.

ProcessPoolExecutor 例子

  1. import concurrent.futures
  2. import math
  3. PRIMES = [
  4. 112272535095293,
  5. 112582705942171,
  6. 112272535095293,
  7. 115280095190773,
  8. 115797848077099,
  9. 1099726899285419]
  10. def is_prime(n):
  11. if n < 2:
  12. return False
  13. if n == 2:
  14. return True
  15. if n % 2 == 0:
  16. return False
  17. sqrt_n = int(math.floor(math.sqrt(n)))
  18. for i in range(3, sqrt_n + 1, 2):
  19. if n % i == 0:
  20. return False
  21. return True
  22. def main():
  23. with concurrent.futures.ProcessPoolExecutor() as executor:
  24. for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
  25. print('%d is prime: %s' % (number, prime))
  26. if __name__ == '__main__':
  27. main()

Future 对象

Future 类将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。

  • cancel()

    尝试取消调用。 如果调用正在执行或已结束运行不能被取消则该方法将返回 False,否则调用会被取消并且该方法将返回 True

  • cancelled()

    如果调用成功取消返回 True

  • running()

    如果调用正在执行而且不能被取消那么返回 True

  • done()

    如果调用已被取消或正常结束那么返回 True

  • result(timeout=None)

    Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

    如果 futrue 在完成前被取消则 CancelledError 将被触发。

    如果调用引发了一个异常,这个方法也会引发同样的异常。

  • exception(timeout=None)

    Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

    如果 futrue 在完成前被取消则 CancelledError 将被触发。

    如果调用正常完成那么返回 None

  • add_done_callback(fn)

    附加可调用 fn 到 future 对象。当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。

    加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 Exception 子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 BaseException 子类,这个行为没有定义。

    如果 future 对象已经完成或已取消,fn 会被立即调用。

下面这些 Future 方法用于单元测试和 Executor 实现。

模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 fs 指定的 Future 实例(可能由不同的 Executor 实例创建)完成。 重复传给 fs 的 future 会被移除并将只返回一次。 返回一个由集合组成的具名 2 元组。 第一个集合的名称为 done,包含在等待完成之前已完成的 future(包括正常结束或被取消的 future)。 第二个集合的名称为 not_done,包含未完成的 future(包括挂起的或正在运行的 future)。

timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常量

描述

FIRST_COMPLETED

函数将在任意可等待对象结束或取消时返回。

FIRST_EXCEPTION

函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED

ALL_COMPLETED

函数将在所有可等待对象结束或取消时返回。

concurrent.futures.as_completed(fs, timeout=None)

Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

参见

PEP 3148 — future 对象 - 异步执行指令。

该提案描述了Python标准库中包含的这个特性。

Exception 类

exception concurrent.futures.CancelledError

future 对象被取消时会触发。

exception concurrent.futures.TimeoutError

A deprecated alias of TimeoutError, raised when a future operation exceeds the given timeout.

在 3.11 版更改: This class was made an alias of TimeoutError.

exception concurrent.futures.BrokenExecutor

当执行器被某些原因中断而且不能用来提交或执行新任务时就会被引发派生于 RuntimeError 的异常类。

3.7 新版功能.

exception concurrent.futures.InvalidStateError

当某个操作在一个当前状态所不允许的 future 上执行时将被引发。

3.8 新版功能.

exception concurrent.futures.thread.BrokenThreadPool

ThreadPoolExecutor 中的其中一个工作者初始化失败时会引发派生于 BrokenExecutor 的异常类。

3.7 新版功能.

exception concurrent.futures.process.BrokenProcessPool

ThreadPoolExecutor 中的其中一个工作者不完整终止时(比如,被外部杀死)会引发派生于 BrokenExecutor ( 原名 RuntimeError ) 的异常类。

3.3 新版功能.