17.4. concurrent.futures — 启动并行任务

3.2 新版功能.

源码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模块提供异步执行可调用对象高层接口。

异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口。

17.4.1. Executor 对象

class concurrent.futures.Executor

抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。

  • submit(fn, args, kwargs*)

    调度可调用对象 fn,以 fn(*args **kwargs) 方式执行并返回 Future 对像代表可调用对象的执行。:

    1. with ThreadPoolExecutor(max_workers=1) as executor:
    2. future = executor.submit(pow, 323, 1235)
    3. print(future.result())
  • map(func, *iterables, timeout=None, chunksize=1)

    类似于 map(func, *iterables) 函数,除了以下两点:

    • iterables 是立即执行而不是延迟执行的;

    • func 是异步执行的,对 func 的多个调用可以并发执行。

  1. 如果从原始调用到 [Executor.map()](#concurrent.futures.Executor.map "concurrent.futures.Executor.map") 经过 *timeout* 秒后, [\_\_next\_\_()]($3f6afdcad7d062db.md#iterator.__next__ "iterator.__next__") 已被调用且返回的结果还不可用,那么已返回的迭代器将触发 [concurrent.futures.TimeoutError](#concurrent.futures.TimeoutError "concurrent.futures.TimeoutError") 。 *timeout* 可以是整数或浮点数。如果 *timeout* 没有指定或为 `None` ,则没有超时限制。
  2. 如果 *func* 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。
  3. 使用 [ProcessPoolExecutor](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 时,这个方法会将 *iterables* 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 *chunksize* 指定正整数设置。 对很长的迭代器来说,使用大的 *chunksize* 值比默认值 1 能显著地提高性能。 *chunksize* [ThreadPoolExecutor](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 没有效果。
  4. 3.5 版更改: 加入 *chunksize* 参数。
  • shutdown(wait=True)

    当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。 在关闭后调用 Executor.submit()Executor.map() 将会引发 RuntimeError

    如果 waitTrue 则此方法只有在所有待执行的 future 对象完成执行且释放已分配的资源后才会返回。 如果 waitFalse,方法立即返回,所有待执行的 future 对象完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。

    如果使用 with 语句,你就可以避免显式调用这个方法,它将会停止 Executor (就好像 Executor.shutdown() 调用时 wait 设为 True 一样等待):

    1. import shutil
    2. with ThreadPoolExecutor(max_workers=4) as e:
    3. e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    4. e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    5. e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    6. e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

17.4.2. ThreadPoolExecutor

ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

当回调已关联了一个 Future 然后再等待另一个 Future 的结果时就会发产死锁情况。例如:

  1. import time
  2. def wait_on_b():
  3. time.sleep(5)
  4. print(b.result()) # b will never complete because it is waiting on a.
  5. return 5
  6. def wait_on_a():
  7. time.sleep(5)
  8. print(a.result()) # a will never complete because it is waiting on b.
  9. return 6
  10. executor = ThreadPoolExecutor(max_workers=2)
  11. a = executor.submit(wait_on_b)
  12. b = executor.submit(wait_on_a)

与:

  1. def wait_on_future():
  2. f = executor.submit(pow, 5, 2)
  3. # This will never complete because there is only one worker thread and
  4. # it is executing this function.
  5. print(f.result())
  6. executor = ThreadPoolExecutor(max_workers=1)
  7. executor.submit(wait_on_future)

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=’’)

Executor 子类使用最多 max_workers 个线程的线程池来异步执行调用。

在 3.5 版更改: 如果 max_workersNone 或没有指定,将默认为机器处理器的个数,假如 ThreadPoolExecutor 侧重于 I/O 操作而不是 CPU 运算,那么可以乘以 5,同时工作线程的数量可以比 ProcessPoolExecutor 的数量高。

3.6 新版功能: 添加 thread_name_prefix 参数允许用户控制由线程池创建的 threading.Thread 工作线程名称以方便调试。

17.4.2.1. ThreadPoolExecutor 例子

  1. import concurrent.futures
  2. import urllib.request
  3. URLS = ['http://www.foxnews.com/',
  4. 'http://www.cnn.com/',
  5. 'http://europe.wsj.com/',
  6. 'http://www.bbc.co.uk/',
  7. 'http://some-made-up-domain.com/']
  8. # Retrieve a single page and report the URL and contents
  9. def load_url(url, timeout):
  10. with urllib.request.urlopen(url, timeout=timeout) as conn:
  11. return conn.read()
  12. # We can use a with statement to ensure threads are cleaned up promptly
  13. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  14. # Start the load operations and mark each future with its URL
  15. future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
  16. for future in concurrent.futures.as_completed(future_to_url):
  17. url = future_to_url[future]
  18. try:
  19. data = future.result()
  20. except Exception as exc:
  21. print('%r generated an exception: %s' % (url, exc))
  22. else:
  23. print('%r page is %d bytes' % (url, len(data)))

17.4.3. ProcessPoolExecutor

ProcessPoolExecutorExecutor 的子类,它使用进程池来实现异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 回避 Global Interpreter Lock 但也意味着只可以处理和返回可序列化的对象。

__main__ 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中。

从可调用对象中调用 ExecutorFuture 的方法提交给 ProcessPoolExecutor 会导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

在 3.3 版更改: 如果其中一个工作进程被突然终止,BrokenProcessPool 就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。

17.4.3.1. ProcessPoolExecutor 例子

  1. import concurrent.futures
  2. import math
  3. PRIMES = [
  4. 112272535095293,
  5. 112582705942171,
  6. 112272535095293,
  7. 115280095190773,
  8. 115797848077099,
  9. 1099726899285419]
  10. def is_prime(n):
  11. if n % 2 == 0:
  12. return False
  13. sqrt_n = int(math.floor(math.sqrt(n)))
  14. for i in range(3, sqrt_n + 1, 2):
  15. if n % i == 0:
  16. return False
  17. return True
  18. def main():
  19. with concurrent.futures.ProcessPoolExecutor() as executor:
  20. for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
  21. print('%d is prime: %s' % (number, prime))
  22. if __name__ == '__main__':
  23. main()

17.4.4. Future 对象

Future 类将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。

  • cancel()

    Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

  • cancelled()

    如果调用成功取消返回 True

  • running()

    如果调用正在执行而且不能被取消那么返回 True

  • done()

    如果调用已被取消或正常结束那么返回 True

  • result(timeout=None)

    返回调用返回的值。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。

    如果 futrue 在完成前被取消则 CancelledError 将被触发。

    如果调用引发了一个异常,这个方法也会引发同样的异常。

  • exception(timeout=None)

    返回由调用引发的异常。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。

    如果 futrue 在完成前被取消则 CancelledError 将被触发。

    如果调用正常完成那么返回 None

  • add_done_callback(fn)

    附加可调用 fn 到 future 对象。当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。

    加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 Exception 子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 BaseException 子类,这个行为没有定义。

    如果 future 对象已经完成或已取消,fn 会被立即调用。

下面这些 Future 方法用于单元测试和 Executor 实现。

  • set_running_or_notify_cancel()

    这个方法只可以在执行关联 Future 工作之前由 Executor 实现调用或由单测试调用。

    如果这个方法返回 False 那么 Future 已被取消,即 Future.cancel() 已被调用并返回 True 。等待 Future 完成 (即通过 as_completed()wait()) 的线程将被唤醒。

    如果这个方法返回 True 那么 Future 不会被取消并已将它变为正在运行状态,也就是说调用 Future.running() 时将返回 True。

    这个方法只可以被调用一次并且不能在调用 Future.set_result()Future.set_exception() 之后再调用。

  • set_result(result)

    设置将 Future 关联工作的结果给 result

    这个方法只可以由 Executor 实现和单元测试使用。

  • set_exception(exception)

    设置 Future 关联工作的结果给 Exception exception

    这个方法只可以由 Executor 实现和单元测试使用。

17.4.5. 模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Wait for the Future instances (possibly created by different Executor instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or were cancelled) before the wait completed. The second set, named not_done, contains uncompleted futures.

timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数

描述

FIRST_COMPLETED

函数将在任意可等待对象结束或取消时返回。

FIRST_EXCEPTION

函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED

ALL_COMPLETED

函数将在所有可等待对象结束或取消时返回。

concurrent.futures.as_completed(fs, timeout=None)

Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or were cancelled). Any futures given by fs that are duplicated will be returned once. Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

参见

PEP 3148 – future 对象 - 异步执行指令。

该提案描述了Python标准库中包含的这个特性。

17.4.6. Exception类

exception concurrent.futures.CancelledError

future 对象被取消时会触发。

exception concurrent.futures.TimeoutError

future 对象执行超出给定的超时数值时引发。

exception concurrent.futures.process.BrokenProcessPool

Derived from RuntimeError, this exception class is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion (for example, if it was killed from the outside).

3.3 新版功能.