multiprocessing —- 基于进程的并行

源代码Lib/multiprocessing/


概述

multiprocessing 是一个用于产生进程的包,具有与 threading 模块相似API。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多核。可运行于 Unix 和 Windows 。

multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool

  1. from multiprocessing import Pool
  2.  
  3. def f(x):
  4. return x*x
  5.  
  6. if __name__ == '__main__':
  7. with Pool(5) as p:
  8. print(p.map(f, [1, 2, 3]))

将在标准输出中打印

  1. [1, 4, 9]

Process 类

multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。 Processthreading.Thread API 相同。 一个简单的多进程程序示例是:

  1. from multiprocessing import Process
  2.  
  3. def f(name):
  4. print('hello', name)
  5.  
  6. if __name__ == '__main__':
  7. p = Process(target=f, args=('bob',))
  8. p.start()
  9. p.join()

要显示所涉及的各个进程ID,这是一个扩展示例:

  1. from multiprocessing import Process
  2. import os
  3.  
  4. def info(title):
  5. print(title)
  6. print('module name:', __name__)
  7. print('parent process:', os.getppid())
  8. print('process id:', os.getpid())
  9.  
  10. def f(name):
  11. info('function f')
  12. print('hello', name)
  13.  
  14. if __name__ == '__main__':
  15. info('main line')
  16. p = Process(target=f, args=('bob',))
  17. p.start()
  18. p.join()

关于为什么 if name == 'main' 部分是必需的解释,请参见 编程指导

上下文和启动方法

根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法

spawn

父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。

可在Unix和Windows上使用。 Windows上的默认设置。

fork

父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。

只存在于Unix。Unix中的默认值。

forkserver

程序启动并选择 forkserver 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。

可在Unix平台上使用,支持通过Unix管道传递文件描述符。

在 3.8 版更改: 对于 macOS,spawn 启动方式是默认方式。 因为 fork 可能导致subprocess崩溃,被认为是不安全的,查看 bpo-33725

在 3.4 版更改: 在所有unix平台上添加支持了 spawn ,并且为一些unix平台添加了 forkserver 。在Windows上子进程不再继承所有可继承的父进程句柄。

在 Unix 上通过 spawnforkserver 方式启动多进程会同时启动一个 资源追踪 进程,负责追踪当前程序的进程产生的、并且不再被使用的命名系统资源(如命名信号量以及 SharedMemory 对象)。当所有进程退出后,资源追踪会负责释放这些仍被追踪的的对象。通常情况下是不会有这种对象的,但是假如一个子进程被某个信号杀死,就可能存在这一类资源的“泄露”情况。(泄露的信号量以及共享内存不会被释放,直到下一次系统重启,对于这两类资源来说,这是一个比较大的问题,因为操作系统允许的命名信号量的数量是有限的,而共享内存也会占据主内存的一片空间)

要选择一个启动方法,你应该在主模块的 if name == 'main' 子句中调用 set_start_method() 。例如:

  1. import multiprocessing as mp
  2.  
  3. def foo(q):
  4. q.put('hello')
  5.  
  6. if __name__ == '__main__':
  7. mp.set_start_method('spawn')
  8. q = mp.Queue()
  9. p = mp.Process(target=foo, args=(q,))
  10. p.start()
  11. print(q.get())
  12. p.join()

在程序中 set_start_method() 不应该被多次调用。

或者,你可以使用 get_context() 来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法。:

  1. import multiprocessing as mp
  2.  
  3. def foo(q):
  4. q.put('hello')
  5.  
  6. if __name__ == '__main__':
  7. ctx = mp.get_context('spawn')
  8. q = ctx.Queue()
  9. p = ctx.Process(target=foo, args=(q,))
  10. p.start()
  11. print(q.get())
  12. p.join()

请注意,关联到不同上下文的对象和进程之前可能不兼容。特别是,使用 fork 上下文创建的锁不能传递给使用 spawnforkserver 启动方法启动的进程。

想要使用特定启动方法的库应该使用 get_context() 以避免干扰库用户的选择。

警告

'spawn''forkserver' 启动方法当前不能在Unix上和“冻结的”可执行内容一同使用(例如,有类似 PyInstallercx_Freeze 的包产生的二进制文件)。 'fork' 启动方法可以使用。

在进程之间交换对象

multiprocessing 支持进程之间的两种通信通道:

队列

Queue 类是一个近似 queue.Queue 的克隆。 例如:

  1. from multiprocessing import Process, Queuedef f(q): q.put([42, None, 'hello'])if name == 'main': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()

队列是线程和进程安全的。

管道

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

  1. from multiprocessing import Process, Pipedef f(conn): conn.send([42, None, 'hello']) conn.close()if name == 'main': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send()recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

进程间同步

对于所有在 threading 存在的同步原语,multiprocessing 中都有类似的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

  1. from multiprocessing import Process, Lock
  2.  
  3. def f(l, i):
  4. l.acquire()
  5. try:
  6. print('hello world', i)
  7. finally:
  8. l.release()
  9.  
  10. if __name__ == '__main__':
  11. lock = Lock()
  12.  
  13. for num in range(10):
  14. Process(target=f, args=(lock, num)).start()

不使用锁的情况下,来自于多进程的输出很容易产生混淆。

进程间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。例如,以下代码:

  1. from multiprocessing import Process, Value, Arraydef f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i]if name == 'main': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])

将打印

  1. 3.1415927[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务进程

Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型: listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 。例如

  1. from multiprocessing import Process, Managerdef f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse()if name == 'main': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)

将打印

  1. {0.25: None, 1: '1', '2': 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

使用工作进程

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

例如:

  1. from multiprocessing import Pool, TimeoutError
  2. import time
  3. import os
  4.  
  5. def f(x):
  6. return x*x
  7.  
  8. if __name__ == '__main__':
  9. # start 4 worker processes
  10. with Pool(processes=4) as pool:
  11.  
  12. # print "[0, 1, 4,..., 81]"
  13. print(pool.map(f, range(10)))
  14.  
  15. # print same numbers in arbitrary order
  16. for i in pool.imap_unordered(f, range(10)):
  17. print(i)
  18.  
  19. # evaluate "f(20)" asynchronously
  20. res = pool.apply_async(f, (20,)) # runs in *only* one process
  21. print(res.get(timeout=1)) # prints "400"
  22.  
  23. # evaluate "os.getpid()" asynchronously
  24. res = pool.apply_async(os.getpid, ()) # runs in *only* one process
  25. print(res.get(timeout=1)) # prints the PID of that process
  26.  
  27. # launching multiple evaluations asynchronously *may* use more processes
  28. multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
  29. print([res.get(timeout=1) for res in multiple_results])
  30.  
  31. # make a single worker sleep for 10 secs
  32. res = pool.apply_async(time.sleep, (10,))
  33. try:
  34. print(res.get(timeout=1))
  35. except TimeoutError:
  36. print("We lacked patience and got a multiprocessing.TimeoutError")
  37.  
  38. print("For the moment, the pool remains available for more work")
  39.  
  40. # exiting the 'with'-block has stopped the pool
  41. print("Now the pool is closed and no longer available")

请注意,进程池的方法只能由创建它的进程使用。

注解

这个包中的功能要求子进程可以导入 main 模块。虽然这在 编程指导 中有描述,但还是需要提前说明一下。这意味着一些示例在交互式解释器中不起作用,比如 multiprocessing.pool.Pool 示例。例如:

  1. >>> from multiprocessing import Pool
  2. >>> p = Pool(5)
  3. >>> def f(x):
  4. ... return x*x
  5. ...
  6. >>> p.map(f, [1,2,3])
  7. Process PoolWorker-1:
  8. Process PoolWorker-2:
  9. Process PoolWorker-3:
  10. Traceback (most recent call last):
  11. AttributeError: 'module' object has no attribute 'f'
  12. AttributeError: 'module' object has no attribute 'f'
  13. AttributeError: 'module' object has no attribute 'f'

(如果尝试执行上面的代码,它会以一种半随机的方式将三个完整的堆栈内容交替输出,然后你只能以某种方式停止父进程。)

参考

multiprocessing 包主要复制了 threading 模块的API。

Process 和异常

  • class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 进程对象表示在单独进程中运行的活动。 Process 类拥有和 threading.Thread 等价的大部分方法。

应始终使用关键字参数调用构造函数。 group 应该始终是 None ;它仅用于兼容 threading.Threadtarget 是由 run() 方法调用的可调用对象。它默认为 None ,意味着什么都没有被调用。 name 是进程名称(有关详细信息,请参阅 name )。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。如果提供,则键参数 daemon 将进程 daemon 标志设置为 TrueFalse 。如果是 None (默认值),则该标志将从创建的进程继承。

默认情况下,不会将任何参数传递给 target

如果子类重写构造函数,它必须确保它在对进程执行任何其他操作之前调用基类构造函数( Process.init() )。

在 3.3 版更改: 加入 daemon 参数。

  • run()
  • 表示进程活动的方法。

你可以在子类中重载此方法。标准 run() 方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 argskwargs 参数中获取顺序和关键字参数。

  • start()
  • 启动进程活动。

这个方法每个进程对象最多只能调用一次。它会将对象的 run() 方法安排在一个单独的进程中调用。

  • join([timeout])
  • 如果可选参数 timeoutNone (默认值),则该方法将阻塞,直到调用 join() 方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回 None 。检查进程的 exitcode 以确定它是否终止。

一个进程可以被 join 多次。

进程无法join自身,因为这会导致死锁。尝试在启动进程之前join进程是错误的。

  • name
  • 进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。

初始名称由构造器设定。 如果没有为构造器提供显式名称,则会构造一个形式为 'Process-N1:N2:…:Nk' 的名称,其中每个 Nk 是其父亲的第 N 个孩子。

  • is_alive()
  • 返回进程是否还活着。

粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。

  • daemon
  • 进程的守护标志,一个布尔值。这必须在 start() 被调用之前设置。

初始值继承自创建进程。

当进程退出时,它会尝试终止其所有守护进程子进程。

请注意,不允许守护进程创建子进程。否则,守护进程会在子进程退出时终止其子进程。 另外,这些 不是 Unix守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。

除了 threading.Thread API ,Process 对象还支持以下属性和方法:

  • pid
  • 返回进程ID。在生成该进程之前,这将是 None

  • exitcode

  • 子进程的退出代码。如果进程尚未终止,这将是 None 。负值 -N 表示子进程被信号 N 终止。

  • authkey

  • 进程的身份验证密钥(字节字符串)。

multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串。

当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管可以通过将 authkey 设置为另一个字节字符串来更改。

参见 认证密码

  • sentinel
  • 系统对象的数字句柄,当进程结束时将变为 "ready" 。

如果要使用 multiprocessing.connection.wait() 一次等待多个事件,可以使用此值。否则调用 join() 更简单。

在Windows上,这是一个操作系统句柄,可以与 WaitForSingleObjectWaitForMultipleObjects 系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自 select 模块的原语。

3.3 新版功能.

  • terminate()
  • 终止进程。 在Unix上,这是使用 SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等。

请注意,进程的后代进程将不会被终止 —— 它们将简单地变成孤立的。

警告

如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。

  • kill()
  • terminate() 相同,但在Unix上使用 SIGKILL 信号。

3.7 新版功能.

  • close()
  • 关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError 。一旦 close() 成功返回, Process 对象的大多数其他方法和属性将引发 ValueError

3.7 新版功能.

注意 start()join()is_alive()terminate()exitcode 方法只能由创建进程对象的进程调用。

Process 一些方法的示例用法:

  1. >>> import multiprocessing, time, signal
  2. >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
  3. >>> print(p, p.is_alive())
  4. <Process ... initial> False
  5. >>> p.start()
  6. >>> print(p, p.is_alive())
  7. <Process ... started> True
  8. >>> p.terminate()
  9. >>> time.sleep(0.1)
  10. >>> print(p, p.is_alive())
  11. <Process ... stopped exitcode=-SIGTERM> False
  12. >>> p.exitcode == -signal.SIGTERM
  13. True
  • exception multiprocessing.ProcessError
  • 所有 multiprocessing 异常的基类。

  • exception multiprocessing.BufferTooShort

  • 当提供的缓冲区对象太小而无法读取消息时, Connection.recv_bytes_into() 引发的异常。

如果 e 是一个 BufferTooShort 实例,那么 e.args[0] 将把消息作为字节字符串给出。

  • exception multiprocessing.AuthenticationError
  • 出现身份验证错误时引发。

  • exception multiprocessing.TimeoutError

  • 有超时的方法超时时引发。

管道和队列

使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用同步原语,例如锁。

消息机制包含: Pipe() (可以用于在两个进程间传递消息),以及队列(能够在多个生产者和消费者之间通信)。

The Queue, SimpleQueue and JoinableQueue typesare multi-producer, multi-consumer FIFOqueues modelled on the queue.Queue class in thestandard library. They differ in that Queue lacks thetask_done() and join() methods introducedinto Python 2.5's queue.Queue class.

如果你使用了 JoinableQueue ,那么你必须对每个已经移出队列的任务调用 JoinableQueue.task_done() 。不然的话用于统计未完成任务的信号量最终会溢出并抛出异常。

另外还可以通过使用一个管理器对象创建一个共享队列,详见 管理器

注解

multiprocessing 使用了普通的 queue.Emptyqueue.Full 异常去表示超时。 你需要从 queue 中导入它们,因为它们并不在 multiprocessing 的命名空间中。

注解

当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用pickle序列化,并将序列化后的数据通过一个底层管道的管道传递到队列中。这种做法会有点让人惊讶,但一般不会出现什么问题。如果它们确实妨碍了你,你可以使用一个由管理器 manager 创建的队列替换它。

  • 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty() 才会返回 False 。而 get_nowait() 可以不抛出 queue.Empty 直接返回。

  • 如果有多个进程同时将对象放入队列,那么在队列的另一端接受到的对象可能是无序的。但是由同一个进程放入的多个对象的顺序在另一端输出时总是一样的。

警告

如果一个进程在尝试使用 Queue 期间被 Process.terminate()os.kill() 调用终止了,那么队列中的数据很可能被破坏。 这可能导致其他进程在尝试使用该队列时发生异常。

警告

正如刚才提到的,如果一个子进程将一些对象放进队列中 (并且它没有用 JoinableQueue.cancel_join_thread 方法),那么这个进程在所有缓冲区的对象被刷新进管道之前,是不会终止的。

这意味着,除非你确定所有放入队列中的对象都已经被消费了,否则如果你试图等待这个进程,你可能会陷入死锁中。相似地,如果该子进程不是后台进程,那么父进程可能在试图等待所有非后台进程退出时挂起。

注意用管理器创建的队列不存在这个问题,详见 编程指导

示例 展示了如何使用队列实现进程间通信。

  • multiprocessing.Pipe([duplex])
  • 返回一对 Connection对象 ``(conn1, conn2) , 分别表示管道的两端。

如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。

  • class multiprocessing.Queue([maxsize])
  • 返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。

一旦超时,将抛出标准库 queue 模块中常见的异常 queue.Emptyqueue.Full

除了 task_done()join() 之外,Queue 实现了标准库类 queue.Queue 中所有的方法。

  • qsize()
  • 返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。

注意,在 Unix 平台上,例如 Mac OS X ,这个方法可能会抛出 NotImplementedError 异常,因为该平台没有实现 sem_getvalue()

  • empty()
  • 如果队列是空的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。

  • full()

  • 如果队列是满的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。

  • put(obj[, block[, timeout]])

  • 将 obj 放入队列。如果可选参数 blockTrue (默认值) 而且 timeoutNone (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之 (blockFalse 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。

在 3.8 版更改: 如果队列已经关闭,会抛出 ValueError 而不是 AssertionError

  • putnowait(_obj)
  • 相当于 put(obj, False)

  • get([block[, timeout]])

  • 从队列中取出并返回对象。如果可选参数 blockTrue (默认值) 而且 timeoutNone (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (blockFalse 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。

在 3.8 版更改: 如果队列已经关闭,会抛出 ValueError 而不是 OSError

  • get_nowait()
  • 相当于 get(False)

multiprocessing.Queue 类有一些在 queue.Queue 类中没有出现的方法。这些方法在大多数情形下并不是必须的。

  • close()
  • 指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用。

  • join_thread()

  • 等待后台线程。这个方法仅在调用了 close() 方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中。

默认情况下,如果一个不是队列创建者的进程试图退出,它会尝试等待这个队列的后台线程。这个进程可以使用 cancel_join_thread()join_thread() 方法什么都不做直接跳过。

  • cancel_join_thread()
  • 防止 join_thread() 方法阻塞当前进程。具体而言,这防止进程退出时自动等待后台线程退出。详见 join_thread()

可能这个方法称为”allow_exit_without_flush()“ 会更好。这有可能会导致正在排队进入队列的数据丢失,大多数情况下你不需要用到这个方法,仅当你不关心底层管道中可能丢失的数据,只是希望进程能够马上退出时使用。

注解

该类的功能依赖于宿主操作系统具有可用的共享信号量实现。否则该类将被禁用,任何试图实例化一个 Queue 对象的操作都会抛出 ImportError 异常,更多信息详见 bpo-3770 。后续说明的任何专用队列对象亦如此。

  • class multiprocessing.SimpleQueue
  • 这是一个简化的 Queue 类的实现,很像带锁的 Pipe

    • empty()
    • 如果队列为空返回 True ,否则返回 False

    • get()

    • 从队列中移出并返回一个对象。

    • put(item)

    • item 放入队列。
  • class multiprocessing.JoinableQueue([maxsize])

  • JoinableQueue 类是 Queue 的子类,额外添加了 task_done()join() 方法。

    • task_done()
    • 指出之前进入队列的任务已经完成。由队列的消费者进程使用。对于每次调用 get() 获取的任务,执行完成后调用 task_done() 告诉队列该任务已经处理完成。

如果 join() 方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用 put() 放进队列中的所有对象都已经返回了对应的 task_done() ) 。

如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。

  • join()
  • 阻塞至队列中所有的元素都被接收和处理完毕。

当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。

杂项

  • multiprocessing.active_children()
  • 返回当前进程存活的子进程的列表。

调用该方法有“等待”已经结束的进程的副作用。

  • multiprocessing.cpu_count()
  • 返回系统的CPU数量。

该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。

可能引发 NotImplementedError

参见

os.cpu_count()

  • multiprocessing.current_process()
  • 返回与当前进程相对应的 Process 对象。

threading.current_thread() 相同。

  • multiprocessing.parent_process()
  • 返回父进程 Process 对象,和父进程调用 current_process() 返回的对象一样。如果一个进程已经是主进程, parent_process 会返回 None.

3.8 新版功能.

  • multiprocessing.freeze_support()
  • 为使用了 multiprocessing 的程序,提供冻结以产生 Windows 可执行文件的支持。(在 py2exe, PyInstallercx_Freeze 上测试通过)

需要在 main 模块的 if name == 'main' 该行之后马上调用该函数。例如:

  1. from multiprocessing import Process, freeze_support
  2.  
  3. def f():
  4. print('hello world!')
  5.  
  6. if __name__ == '__main__':
  7. freeze_support()
  8. Process(target=f).start()

如果没有调用 freeze_support() 在尝试运行被冻结的可执行文件时会抛出 RuntimeError 异常。

freeze_support() 的调用在非 Windows 平台上是无效的。如果该模块在 Windows 平台的 Python 解释器中正常运行 (该程序没有被冻结), 调用freeze_support() 也是无效的。

  • multiprocessing.get_all_start_methods()
  • 返回支持的启动方法的列表,该列表的首项即为默认选项。可能的启动方法有 'fork', 'spawn'&#39;forkserver&#39;<code>。在 Windows 中,只有'spawn' 是可用的。Unix平台总是支持&#39;fork&#39;&#39;spawn&#39;<code>,且'fork' 是默认值。

3.4 新版功能.

  • multiprocessing.getcontext(_method=None)
  • 返回一个 Context 对象。该对象具有和 multiprocessing 模块相同的API。

如果 method 设置成 None 那么将返回默认上下文对象。否则 method 应该是 'fork', 'spawn', 'forkserver' 。 如果指定的启动方法不存在,将抛出 ValueError 异常。

3.4 新版功能.

  • multiprocessing.getstart_method(_allow_none=False)
  • 返回启动进程时使用的启动方法名。

如果启动方法已经固定,并且 allow_none 被设置成 False ,那么启动方法将被固定为默认的启动方法,并且返回其方法名。如果启动方法没有设定,并且 allow_none 被设置成 True ,那么将返回 None

返回值将为 'fork' , 'spawn' , 'forkserver' 或者 None'fork'是 Unix 的默认值,'spawn' 是 Windows 的默认值。

3.4 新版功能.

  • multiprocessing.set_executable()
  • 设置在启动子进程时使用的 Python 解释器路径。 ( 默认使用 sys.executable ) 嵌入式编程人员可能需要这样做:
  1. set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

以使他们可以创建子进程。

在 3.4 版更改: 现在在 Unix 平台上使用 'spawn' 启动方法时支持调用该方法。

  • multiprocessing.setstart_method(_method)
  • 设置启动子进程的方法。 method 可以是 'fork' , 'spawn' 或者 'forkserver'

注意这最多只能调用一次,并且需要藏在 main 模块中,由 if name == 'main' 进行保护。

3.4 新版功能.

注解

multiprocessing 并没有包含类似 threading.active_count() , threading.enumerate() , threading.settrace() , threading.setprofile(), threading.Timer , 或者 threading.local 的方法和类。

连接(Connection)对象

Connection 对象允许收发可以序列化的对象或字符串。它们可以看作面向消息的连接套接字。

通常使用 Pipe 创建 Connection 对象。详见 : 监听者及客户端.

  • class multiprocessing.connection.Connection
    • send(obj)
    • 将一个对象发送到连接的另一端,可以用 recv() 读取。

发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发 ValueError 异常。

  • recv()
  • 返回一个由另一端使用 send() 发送的对象。该方法会一直阻塞直到接收到对象。 如果对端关闭了连接或者没有东西可接收,将抛出 EOFError 异常。

  • fileno()

  • 返回由连接对象使用的描述符或者句柄。

  • close()

  • 关闭连接对象。

当连接对象被垃圾回收时会自动调用。

  • poll([timeout])
  • 返回连接对象中是否有可以读取的数据。

如果未指定 timeout ,此方法会马上返回。如果 timeout 是一个数字,则指定了最大阻塞的秒数。如果 timeoutNone ,那么将一直等待,不会超时。

注意通过使用 multiprocessing.connection.wait() 可以一次轮询多个连接对象。

  • sendbytes(_buffer[, offset[, size]])
  • 从一个 bytes-like object (字节类对象)对象中取出字节数组并作为一条完整消息发送。

如果由 offset 给定了在 buffer 中读取数据的位置。 如果给定了 size ,那么将会从缓冲区中读取多个字节。 过大的缓冲区 ( 接近 32MiB+ ,此值依赖于操作系统 ) 有可能引发 ValueError 异常。

  • recvbytes([_maxlength])
  • 以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出 EOFError 异常。

如果给定了 maxlength 并且消息短于 maxlength 那么将抛出 OSError 并且该连接对象将不再可读。

在 3.3 版更改: 曾经该函数抛出 IOError ,现在这是 OSError 的别名。

  • recvbytes_into(_buffer[, offset])
  • 将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出 EOFError 异常。

buffer must be a writable bytes-like object. Ifoffset is given then the message will be written into the buffer fromthat position. Offset must be a non-negative integer less than thelength of buffer (in bytes).

如果缓冲区太小,则将引发 BufferTooShort 异常,并且完整的消息将会存放在异常实例 ee.args[0] 中。

在 3.3 版更改: 现在连接对象自身可以通过 Connection.send()Connection.recv() 在进程之间传递。

3.3 新版功能: 连接对象现已支持上下文管理协议 — 参见 see 上下文管理器类型enter() 返回连接对象, exit() 会调用 close()

例如:

  1. >>> from multiprocessing import Pipe
  2. >>> a, b = Pipe()
  3. >>> a.send([1, 'hello', None])
  4. >>> b.recv()
  5. [1, 'hello', None]
  6. >>> b.send_bytes(b'thank you')
  7. >>> a.recv_bytes()
  8. b'thank you'
  9. >>> import array
  10. >>> arr1 = array.array('i', range(5))
  11. >>> arr2 = array.array('i', [0] * 10)
  12. >>> a.send_bytes(arr1)
  13. >>> count = b.recv_bytes_into(arr2)
  14. >>> assert count == len(arr1) * arr1.itemsize
  15. >>> arr2
  16. array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

The Connection.recv() method automatically unpickles the data itreceives, which can be a security risk unless you can trust the processwhich sent the message.

因此, 除非连接对象是由 Pipe() 产生的,在通过一些认证手段之前你应该只使用 recv()send() 方法。参考 认证密码

警告

如果一个进程在试图读写管道时被终止了,那么管道中的数据很可能是不完整的,因为此时可能无法确定消息的边界。

同步原语

通常来说同步愿意在多进程环境中并不像它们在多线程环境中那么必要。参考 threading 模块的文档。

注意可以使用管理器对象创建同步原语,参考 管理器

  • class multiprocessing.Barrier(parties[, action[, timeout]])
  • 类似 threading.Barrier 的栅栏对象。

3.3 新版功能.

一个小小的不同在于,它的 acquire 方法的第一个参数名是和 Lock.acquire() 一样的 block

注解

在 Mac OS X 平台上, 该对象于 Semaphore 不同在于 sem_getvalue() 方法并没有在该平台上实现。

指定的 lock 参数应该是 multiprocessing 模块中的 Lock 或者 RLock 对象。

在 3.3 版更改: 新增了 wait_for() 方法。

  • class multiprocessing.Event
  • A clone of threading.Event.

  • class multiprocessing.Lock

  • 原始锁(非递归锁)对象,类似于 threading.Lock 。一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。任何进程或线程都可以释放锁。除非另有说明,否则 multiprocessing.Lock 用于进程或者线程的概念和行为都和 threading.Lock 一致。

注意 Lock 实际上是一个工厂函数。它返回由默认上下文初始化的 multiprocessing.synchronize.Lock 对象。

Lock supports the context manager protocol and thus may beused in with statements.

  • acquire(block=True, timeout=None)
  • 获得锁,阻塞或非阻塞的。

如果 block 参数被设为 True ( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回 True 。需要注意的是第一个参数名与 threading.Lock.acquire() 的不同。

如果 block 参数被设置成 False ,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回 False ; 否则将锁设置成锁住状态,并返回 True

timeout 是一个正浮点数时,会在等待锁的过程中最多阻塞等待 timeout 秒,当 timeout 是负数时,效果和 timeout 为0时一样,当 timeoutNone (默认值)时,等待时间是无限长。需要注意的是,对于 timeout 参数是负数和 None 的情况, 其行为与 threading.Lock.acquire() 是不一样的。当 block 参数 为 False 时, timeout 并没有实际用处,会直接忽略。否则,函数会在拿到锁后返回 True 或者 超时没拿到锁后返回 False

  • release()
  • 释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。

当尝试释放一个没有被持有的锁时,会抛出 ValueError 异常,除此之外其行为与 threading.Lock.release() 一样。

  • class multiprocessing.RLock
  • 递归锁对象: 类似于 threading.RLock 。递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。

注意 RLock 是一个工厂函数,调用后返回一个使用默认 context 初始化的 multiprocessing.synchronize.RLock 实例。

RLock 支持 context manager,所以可在 with 语句内使用。

  • acquire(block=True, timeout=None)
  • 获得锁,阻塞或非阻塞的。

block 参数设置为 True 时,会一直阻塞直到锁处于空闲状态(没有被任何进程、线程拥有),除非当前进程或线程已经拥有了这把锁。然后当前进程/线程会持有这把锁(在锁没有其他持有者的情况下),锁内的递归等级加一,并返回 True . 注意, 这个函数第一个参数的行为和 threading.RLock.acquire() 的实现有几个不同点,包括参数名本身。

block 参数是 False , 将不会阻塞,如果此时锁被其他进程或者线程持有,当前进程、线程获取锁操作失败,锁的递归等级也不会改变,函数返回 False , 如果当前锁已经处于释放状态,则当前进程、线程则会拿到锁,并且锁内的递归等级加一,函数返回 True

timeout 参数的使用方法及行为与 Lock.acquire() 一样。但是要注意 timeout 的其中一些行为和 threading.RLock.acquire() 中实现的行为是不同的。

  • release()
  • 释放锁,使锁内的递归等级减一。如果释放后锁内的递归等级降低为0,则会重置锁的状态为释放状态(即没有被任何进程、线程持有),重置后如果有有其他进程和线程在等待这把锁,他们中的一个会获得这个锁而继续运行。如果释放后锁内的递归等级还没到达0,则这个锁仍将保持未释放状态且当前进程和线程仍然是持有者。

只有当前进程或线程是锁的持有者时,才允许调用这个方法。如果当前进程或线程不是这个锁的拥有者,或者这个锁处于已释放的状态(即没有任何拥有者),调用这个方法会抛出 AssertionError 异常。注意这里抛出的异常类型和 threading.RLock.release() 中实现的行为不一样。

一个小小的不同在于,它的 acquire 方法的第一个参数名是和 Lock.acquire() 一样的 block

注解

在 Mac OS X 上,不支持 sem_timedwait ,所以,调用 acquire() 时如果使用 timeout 参数,会通过循环sleep来模拟这个函数的行为。

注解

假如信号 SIGINT 是来自于 Ctrl-C ,并且主线程被 BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire()Condition.wait() 阻塞,则调用会立即中断同时抛出 KeyboardInterrupt 异常。

这和 threading 的行为不同,此模块中当执行对应的阻塞调用时,SIGINT 会被忽略。

注解

这个包的某些功能依赖于宿主机系统的共享信号量的实现,如果系统没有这个特性, multiprocessing.synchronize 会被禁用,尝试导入这个模块会引发 ImportError 异常,详细信息请查看 bpo-3770

共享 ctypes 对象

在共享内存上创建可被子进程继承的共享对象时是可行的。

  • multiprocessing.Value(typecode_or_type, *args, lock=True)
  • 返回一个从共享内存上创建的 ctypes 对象。默认情况下返回的对象实际上是经过了同步器包装过的。可以通过 Valuevalue 属性访问这个对象本身。

typecode_or_type 指明了返回的对象类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 *args 会透传给这个类的构造函数。

如果 lock 参数是 True (默认值), 将会新建一个递归锁用于同步对于此值的访问操作。 如果 lockLock 或者 RLock 对象,那么这个传入的锁将会用于同步对这个值的访问操作,如果 lockFalse , 那么对这个对象的访问将没有锁保护,也就是说这个变量不是进程安全的。

诸如 += 这类的操作会引发独立的读操作和写操作,也就是说这类操作符并不具有原子性。所以,如果你想让递增共享变量的操作具有原子性,仅仅以这样的方式并不能达到要求:

  1. counter.value += 1

共享对象内部关联的锁是递归锁(默认情况下就是)的情况下, 你可以采用这种方式

  1. with counter.get_lock():
  2. counter.value += 1

注意 lock 只能是命名参数。

  • multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
  • 从共享内存中申请并返回一个具有ctypes类型的数组对象。默认情况下返回值实际上是被同步器包装过的数组对象。

typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。

如果 lockTrue (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。

请注意 lock 是一个仅限关键字参数。

请注意 ctypes.c_char 的数组具有 valueraw 属性,允许被用来保存和提取字符串。

multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供了一些函数,用于分配来自共享内存的、可被子进程继承的 ctypes 对象。

注解

虽然可以将指针存储在共享内存中,但请记住它所引用的是特定进程地址空间中的位置。 而且,指针很可能在第二个进程的上下文中无效,尝试从第二个进程对指针进行解引用可能会导致崩溃。

  • multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
  • 从共享内存中申请并返回一个 ctypes 数组。

typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中使用的类型字符。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。

注意对元素的访问、赋值操作可能是非原子操作 - 使用 Array() , 从而借助其中的锁保证操作的原子性。

  • multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
  • 从共享内存中申请并返回一个 ctypes 对象。

typecode_or_type 指明了返回的对象类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 *args 会透传给这个类的构造函数。

注意对 value 的访问、赋值操作可能是非原子操作 - 使用 Value() ,从而借助其中的锁保证操作的原子性。

请注意 ctypes.c_char 的数组具有 valueraw 属性,允许被用来保存和提取字符串 - 请查看 ctypes 文档。

  • multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
  • 返回一个纯 ctypes 数组, 或者在此之上经过同步器包装过的进程安全的对象,这取决于 lock 参数的值,除此之外,和 RawArray() 一样。

如果 lockTrue (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。

注意 lock 只能是命名参数。

  • multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
  • 返回一个纯 ctypes 数组, 或者在此之上经过同步器包装过的进程安全的对象,这取决于 lock 参数的值,除此之外,和 RawArray() 一样。

如果 lockTrue (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。

注意 lock 只能是命名参数。

  • multiprocessing.sharedctypes.copy(obj)
  • 从共享内存中申请一片空间将 ctypes 对象 obj 过来,然后返回一个新的 ctypes 对象。

  • multiprocessing.sharedctypes.synchronized(obj[, lock])

  • 将一个 ctypes 对象包装为进程安全的对象并返回,使用 lock 同步对于它的操作。如果 lockNone (默认值) ,则会自动创建一个 multiprocessing.RLock 对象。

同步器包装后的对象会在原有对象基础上额外增加两个方法: get_obj() 返回被包装的对象, get_lock() 返回内部用于同步的锁。

需要注意的是,访问包装后的ctypes对象会比直接访问原来的纯 ctypes 对象慢得多。

在 3.5 版更改: 同步器包装后的对象支持 context manager 协议。

下面的表格对比了创建普通ctypes对象和基于共享内存上创建共享ctypes对象的语法。(表格中的 MyStructctypes.Structure 的子类)

ctypes使用类型的共享ctypes使用 typecode 的共享 ctypes
c_double(2.4)RawValue(c_double, 2.4)RawValue('d', 2.4)
MyStruct(4, 6)RawValue(MyStruct, 4, 6)
(c_short 7)()RawArray(c_short, 7)RawArray('h', 7)
(c_int 3)(9, 2, 8)RawArray(c_int, (9, 2, 8))RawArray('i', (9, 2, 8))

下面是一个在子进程中修改多个ctypes对象的例子。

  1. from multiprocessing import Process, Lock
  2. from multiprocessing.sharedctypes import Value, Array
  3. from ctypes import Structure, c_double
  4.  
  5. class Point(Structure):
  6. _fields_ = [('x', c_double), ('y', c_double)]
  7.  
  8. def modify(n, x, s, A):
  9. n.value **= 2
  10. x.value **= 2
  11. s.value = s.value.upper()
  12. for a in A:
  13. a.x **= 2
  14. a.y **= 2
  15.  
  16. if __name__ == '__main__':
  17. lock = Lock()
  18.  
  19. n = Value('i', 7)
  20. x = Value(c_double, 1.0/3.0, lock=False)
  21. s = Array('c', b'hello world', lock=lock)
  22. A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
  23.  
  24. p = Process(target=modify, args=(n, x, s, A))
  25. p.start()
  26. p.join()
  27.  
  28. print(n.value)
  29. print(x.value)
  30. print(s.value)
  31. print([(a.x, a.y) for a in A])

输出如下

  1. 49
  2. 0.1111111111111111
  3. HELLO WORLD
  4. [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。

  • multiprocessing.Manager()
  • 返回一个已启动的 SyncManager 管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已经启动的子进程,并且拥有一系列方法可以用于创建共享对象、返回对应的代理。

当管理器被垃圾回收或者父进程退出时,管理器进程会立即退出。管理器类定义在 multiprocessing.managers 模块:

  • class multiprocessing.managers.BaseManager([address[, authkey]])
  • 创建一个 BaseManager 对象。

一旦创建,应该及时调用 start() 或者 get_server().serve_forever() 以确保管理器对象对应的管理进程已经启动。

address 是管理器服务进程监听的地址。如果 addressNone ,则允许和任意主机的请求建立连接。

authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkeyNone, 则会使用 currentprocess().authkey , 否则,就使用 _authkey , 需要保证它必须是 byte 类型的字符串。

  • start([initializer[, initargs]])
  • 为管理器开启一个子进程,如果 initializer 不是 None , 子进程在启动时将会调用 initializer(*initargs)

  • get_server()

  • 返回一个 Server 对象,它是管理器在后台控制的真实的服务。 Server 对象拥有 serve_forever() 方法。
  1. >>> from multiprocessing.managers import BaseManager
  2. >>> manager = BaseManager(address=('', 50000), authkey=b'abc')
  3. >>> server = manager.get_server()
  4. >>> server.serve_forever()

Server 额外拥有一个 address 属性。

  • connect()
  • 将本地管理器对象连接到一个远程管理器进程:
  1. >>> from multiprocessing.managers import BaseManager
  2. >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
  3. >>> m.connect()
  • shutdown()
  • 停止管理器的进程。这个方法只能用于已经使用 start() 启动的服务进程。

它可以被多次调用。

  • register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
  • 一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。

typeid 是一种 "类型标识符",用于唯一表示某种共享对象类型,必须是一个字符串。

callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用 connect() 方法连接到服务器,或者 create_method 参数为 False,那么这里可留下 None

proxytypeBaseProxy 的子类,可以根据 typeid 为共享对象创建一个代理,如果是 None , 则会自动创建一个代理类。

exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用 BaseProxy._callmethod() 代理。(如果 exposedNone, 则会在 proxytype.exposed 存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有 call() 方法并且不是以 '_' 开头的属性)

method_to_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeidNone, 则 proxytype.method_to_typeid 会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是 None ,则方法返回的对象会是一个值拷贝。

create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为 True

BaseManager 实例也有一个只读属性。

  • address
  • 管理器所用的地址。

在 3.3 版更改: 管理器对象支持上下文管理协议 - 查看 上下文管理器类型enter() 启动服务进程(如果它还没有启动)并且返回管理器对象, exit() 会调用 shutdown()

在之前的版本中,如果管理器服务进程没有启动, enter() 不会负责启动它。

  • class multiprocessing.managers.SyncManager
  • BaseManager 的子类,可用于进程的同步。这个类型的对象使用 multiprocessing.Manager() 创建。

它拥有一系列方法,可以为大部分常用数据类型创建并返回 代理对象 代理,用于进程间同步。甚至包括共享列表和字典。

  • Barrier(parties[, action[, timeout]])
  • 创建一个共享的 threading.Barrier 对象并返回它的代理。

3.3 新版功能.

如果提供了 lock 参数,那它必须是 threading.Lockthreading.RLock 的代理对象。

在 3.3 版更改: 新增了 wait_for() 方法。

  • Event()
  • 创建一个共享的 threading.Event 对象并返回它的代理。

  • Lock()

  • 创建一个共享的 threading.Lock 对象并返回它的代理。

  • Namespace()

  • 创建一个共享的 Namespace 对象并返回它的代理。

  • Queue([maxsize])

  • 创建一个共享的 queue.Queue 对象并返回它的代理。

  • RLock()

  • 创建一个共享的 threading.RLock 对象并返回它的代理。

  • Semaphore([value])

  • 创建一个共享的 threading.Semaphore 对象并返回它的代理。

  • Array(typecode, sequence)

  • 创建一个数组并返回它的代理。

  • Value(typecode, value)

  • 创建一个具有可写 value 属性的对象并返回它的代理。

  • dict()

  • dict(mapping)
  • dict(sequence)
  • 创建一个共享的 dict 对象并返回它的代理。

  • list()

  • list(sequence)
  • 创建一个共享的 list 对象并返回它的代理。

在 3.6 版更改: 共享对象能够嵌套。例如, 共享的容器对象如共享列表,可以包含另一个共享对象,他们全都会在 SyncManager 中进行管理和同步。

  • class multiprocessing.managers.Namespace
  • 一个可以注册到 SyncManager 的类型。

命名空间对象没有公共方法,但是拥有可写的属性。直接print会显示所有属性的值。

值得一提的是,当对命名空间对象使用代理的时候,访问所有名称以 '_' 开头的属性都只是代理器上的属性,而不是命名空间对象的属性。

  1. >>> manager = multiprocessing.Manager()
  2. >>> Global = manager.Namespace()
  3. >>> Global.x = 10
  4. >>> Global.y = 'hello'
  5. >>> Global._z = 12.3 # this is an attribute of the proxy
  6. >>> print(Global)
  7. Namespace(x=10, y='hello')

自定义管理器

要创建一个自定义的管理器,需要新建一个 BaseManager 的子类,然后使用这个管理器类上的 register() 类方法将新类型或者可调用方法注册上去。例如:

  1. from multiprocessing.managers import BaseManager
  2.  
  3. class MathsClass:
  4. def add(self, x, y):
  5. return x + y
  6. def mul(self, x, y):
  7. return x * y
  8.  
  9. class MyManager(BaseManager):
  10. pass
  11.  
  12. MyManager.register('Maths', MathsClass)
  13.  
  14. if __name__ == '__main__':
  15. with MyManager() as manager:
  16. maths = manager.Maths()
  17. print(maths.add(4, 3)) # prints 7
  18. print(maths.mul(7, 8)) # prints 56

使用远程管理器

可以将管理器服务运行在一台机器上,然后使用客户端从其他机器上访问。(假设它们的防火墙允许)

运行下面的代码可以启动一个服务,此付包含了一个共享队列,允许远程客户端访问:

  1. >>> from multiprocessing.managers import BaseManager
  2. >>> from queue import Queue
  3. >>> queue = Queue()
  4. >>> class QueueManager(BaseManager): pass
  5. >>> QueueManager.register('get_queue', callable=lambda:queue)
  6. >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
  7. >>> s = m.get_server()
  8. >>> s.serve_forever()

远程客户端可以通过下面的方式访问服务:

  1. >>> from multiprocessing.managers import BaseManager
  2. >>> class QueueManager(BaseManager): pass
  3. >>> QueueManager.register('get_queue')
  4. >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
  5. >>> m.connect()
  6. >>> queue = m.get_queue()
  7. >>> queue.put('hello')

也可以通过下面的方式:

  1. >>> from multiprocessing.managers import BaseManager
  2. >>> class QueueManager(BaseManager): pass
  3. >>> QueueManager.register('get_queue')
  4. >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
  5. >>> m.connect()
  6. >>> queue = m.get_queue()
  7. >>> queue.get()
  8. 'hello'

本地进程也可以访问这个队列,利用上面的客户端代码通过远程方式访问:

  1. >>> from multiprocessing import Process, Queue
  2. >>> from multiprocessing.managers import BaseManager
  3. >>> class Worker(Process):
  4. ... def __init__(self, q):
  5. ... self.q = q
  6. ... super(Worker, self).__init__()
  7. ... def run(self):
  8. ... self.q.put('local hello')
  9. ...
  10. >>> queue = Queue()
  11. >>> w = Worker(queue)
  12. >>> w.start()
  13. >>> class QueueManager(BaseManager): pass
  14. ...
  15. >>> QueueManager.register('get_queue', callable=lambda: queue)
  16. >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
  17. >>> s = m.get_server()
  18. >>> s.serve_forever()

代理对象

代理是一个 指向 其他共享对象的对象,这个对象(很可能)在另外一个进程中。共享对象也可以说是代理 指涉 的对象。多个代理对象可能指向同一个指涉对象。

代理对象代理了指涉对象的一系列方法调用(虽然并不是指涉对象的每个方法都有必要被代理)。通过这种方式,代理的使用方法可以和它的指涉对象一样:

  1. >>> from multiprocessing import Manager
  2. >>> manager = Manager()
  3. >>> l = manager.list([i*i for i in range(10)])
  4. >>> print(l)
  5. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  6. >>> print(repr(l))
  7. <ListProxy object, typeid 'list' at 0x...>
  8. >>> l[4]
  9. 16
  10. >>> l[2:5]
  11. [4, 9, 16]

注意,对代理使用 str() 函数会返回指涉对象的字符串表示,但是 repr() 却会返回代理本身的内部字符串表示。

被代理的对象很重要的一点是必须可以被序列化,这样才能允许他们在进程间传递。因此,指涉对象可以包含 代理对象 。这允许管理器中列表、字典或者其他 代理对象 对象之间的嵌套。

  1. >>> a = manager.list()
  2. >>> b = manager.list()
  3. >>> a.append(b) # referent of a now contains referent of b
  4. >>> print(a, b)
  5. [<ListProxy object, typeid 'list' at ...>] []
  6. >>> b.append('hello')
  7. >>> print(a[0], b)
  8. ['hello'] ['hello']

类似地,字典和列表代理也可以相互嵌套:

  1. >>> l_outer = manager.list([ manager.dict() for i in range(2) ])
  2. >>> d_first_inner = l_outer[0]
  3. >>> d_first_inner['a'] = 1
  4. >>> d_first_inner['b'] = 2
  5. >>> l_outer[1]['c'] = 3
  6. >>> l_outer[1]['z'] = 26
  7. >>> print(l_outer[0])
  8. {'a': 1, 'b': 2}
  9. >>> print(l_outer[1])
  10. {'c': 3, 'z': 26}

如果指涉对象包含了普通 listdict 对象,对这些内部可变对象的修改不会通过管理器传播,因为代理无法得知被包含的值什么时候被修改了。但是把存放在容器代理中的值本身是会通过管理器传播的(会触发代理对象中的 setitem )从而有效修改这些对象,所以可以把修改过的值重新赋值给容器代理:

  1. # create a list proxy and append a mutable object (a dictionary)
  2. lproxy = manager.list()
  3. lproxy.append({})
  4. # now mutate the dictionary
  5. d = lproxy[0]
  6. d['a'] = 1
  7. d['b'] = 2
  8. # at this point, the changes to d are not yet synced, but by
  9. # updating the dictionary, the proxy is notified of the change
  10. lproxy[0] = d

在大多是使用情形下,这种实现方式并不比嵌套 代理对象 方便,但是依然演示了对于同步的一种控制级别。

注解

multiprocessing 中的代理类并没有提供任何对于代理值比较的支持。所以,我们会得到如下结果:

  1. >>> manager.list([1,2,3]) == [1,2,3]
  2. False

当需要比较值的时候,应该替换为使用指涉对象的拷贝。

  • class multiprocessing.managers.BaseProxy
  • 代理对象是 BaseProxy 派生类的实例。

    • callmethod(_methodname[, args[, kwds]])
    • 调用指涉对象的方法并返回结果。

如果 proxy 是一个代理且其指涉的是 obj , 那么下面的表达式:

  1. proxy._callmethod(methodname, args, kwds)

相当于求取以下表达式的值:

  1. getattr(obj, methodname)(*args, **kwds)

于管理器进程。

返回结果会是一个值拷贝或者一个新的共享对象的代理 - 见函数 BaseManager.register() 中关于参数 method_to_typeid 的文档。

如果这个调用熬出了异常,则这个异常会被 _callmethod() 透传出来。如果是管理器进程本身抛出的一些其他异常,则会被 _callmethod() 转换为 RemoteError 异常重新抛出。

特别注意,如果 methodname 没有 暴露 出来,将会引发一个异常。

_callmethod() 的一个使用示例:

  1. >>> l = manager.list(range(10))
  2. >>> l._callmethod('__len__')
  3. 10
  4. >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
  5. [2, 3, 4, 5, 6]
  6. >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20]
  7. Traceback (most recent call last):
  8. ...
  9. IndexError: list index out of range
  • _getvalue()
  • 返回指涉对象的一份拷贝。

如果指涉对象无法序列化,则会抛出一个异常。

  • repr()
  • 返回代理对象的内部字符串表示。

  • str()

  • 返回指涉对象的内部字符串表示。

清理

代理对象使用了一个弱引用回调函数,当它被垃圾回收时,会将自己从拥有此指涉对象的管理器上反注册,

当共享对象没有被任何代理器引用时,会被管理器进程删除。

进程池

可以创建一个进程池,它将使用 Pool 类执行提交给它的任务。

  • class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • 一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。

processes 是要使用的工作进程数目。如果 processesNone,则使用 os.cpu_count() 返回的值。

如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)

maxtasksperchild 是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。默认的 maxtasksperchildNone,意味着工作进程寿与池齐。

context 可被用于指定启动的工作进程的上下文。通常一个进程池是使用函数 multiprocessing.Pool() 或者一个上下文对象的 Pool() 方法创建的。在这两种情况下, context 都是适当设置的。

注意,进程池对象的方法只有创建它的进程能够调用。

3.2 新版功能: maxtasksperchild

3.4 新版功能: context

注解

通常来说,Pool 中的 Worker 进程的生命周期和进程池的工作队列一样长。一些其他系统中(如 Apache, modwsgi 等)也可以发现另一种模式,他们会让工作进程在完成一些任务后退出,清理、释放资源,然后启动一个新的进程代替旧的工作进程。 Pool 的 _maxtasksperchild 参数给用户提供了这种能力。

  • apply(func[, args[, kwds]])
  • 使用 args 参数以及 kwds 命名参数调用 func , 它会返回结果前阻塞。这种情况下,apply_async() 更适合并行化工作。另外 func 只会在一个进程池中的一个工作进程中执行。

  • applyasync(_func[, args[, kwds[, callback[, error_callback]]]])

  • apply() 方法的一个变种,返回一个结果对象。

如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback

如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。

回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。

  • map(func, iterable[, chunksize])
  • 内置 map() 函数(它只支持一个 可迭代 参数)的并行版本,它会阻塞直到返回结果。

这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小可以。

注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用 imap()imap_unordered() 并且显示指定 chunksize 以提升效率。

  • mapasync(_func, iterable[, chunksize[, callback[, error_callback]]])
  • map() 方法类似,但是返回一个可用于获取结果的对象。

如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback

如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。

回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。

  • imap(func, iterable[, chunksize])
  • map() 的延迟执行版本。

chunksize 参数的作用和 map() 方法的一样。对于很长的迭代器,给 chunksize 设置一个很大的值会比默认值 1 极大 地加快执行速度。

同样,如果 chunksize1 , 那么 imap() 方法所返回的迭代器的 next() 方法拥有一个可选的 timeout 参数: 如果无法在 timeout 秒内执行得到结果,则next(timeout) 会抛出 multiprocessing.TimeoutError 异常。

  • imapunordered(_func, iterable[, chunksize])
  • imap() 相同,只不过通过迭代器返回的结果是任意的。(当进程池中只有一个工作进程的时候,返回结果的顺序才能认为是"有序"的)

  • starmap(func, iterable[, chunksize])

  • map() 类似,不过 iterable 中的每一项会被解包再作为函数参数。

比如可迭代对象 [(1,2), (3, 4)] 会转化为等价于 [func(1,2), func(3,4)] 的调用。

3.3 新版功能.

  • starmapasync(_func, iterable[, chunksize[, callback[, error_callback]]])
  • 相当于 starmap()map_async() 的结合,迭代 iterable 的每一项,解包作为 func 的参数并执行,返回用于获取结果的对象。

3.3 新版功能.

  • close()
  • 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。

  • terminate()

  • 不必等待未完成的任务,立即停止工作进程。当进程池对象呗垃圾回收时, 会立即调用 terminate()

  • join()

  • 等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate()

3.3 新版功能: 进程池对象现在支持上下文管理器协议 - 参见 上下文管理器类型enter() 返回进程池对象, exit() 会调用 terminate()

  • class multiprocessing.pool.AsyncResult
  • Pool.apply_async()Pool.map_async() 返回对象所属的类。

    • get([timeout])
    • 用于获取执行结果。如果 timeout 不是 None 并且在 timeout 秒内仍然没有执行完得到结果,则抛出 multiprocessing.TimeoutError 异常。如果远程调用发生异常,这个异常会通过 get() 重新抛出。

    • wait([timeout])

    • 阻塞,直到返回结果,或者 timeout 秒后超时。

    • ready()

    • 返回执行状态,是否已经完成。

    • successful()

    • 判断调用是否已经完成并且无异常,如果没有执行完成会抛出 AssertionError 异常。

在 3.7 版更改: 如果没有执行完,会抛出 ValueError 异常而不是 AssertionError

下面的例子演示了进程池的用法:

  1. from multiprocessing import Pool
  2. import time
  3.  
  4. def f(x):
  5. return x*x
  6.  
  7. if __name__ == '__main__':
  8. with Pool(processes=4) as pool: # start 4 worker processes
  9. result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
  10. print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
  11.  
  12. print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
  13.  
  14. it = pool.imap(f, range(10))
  15. print(next(it)) # prints "0"
  16. print(next(it)) # prints "1"
  17. print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
  18.  
  19. result = pool.apply_async(time.sleep, (10,))
  20. print(result.get(timeout=1)) # raises multiprocessing.TimeoutError

监听者及客户端

通常情况下,进程间通过队列或者 Pipe() 返回的 Connection 传递消息。

不过,multiprocessing.connection 模块其实提供了一些更灵活的特性。最基础的用法是通过它抽象出来的高级API来操作socket或者Windows命名管道。也提供一些高级用法,如通过 hmac 模块来支持 摘要认证,以及同时监听多个管道连接。

  • multiprocessing.connection.deliverchallenge(_connection, authkey)
  • 发送一个随机生成的消息到另一端,并等待回复。

如果收到的回复与使用 authkey 作为键生成的信息摘要匹配成功,就会发送一个欢迎信息给管道另一端。否则抛出 AuthenticationError 异常。

  • multiprocessing.connection.answerchallenge(_connection, authkey)
  • 接收一条信息,使用 authkey 作为键计算信息摘要,然后将摘要发送回去。

如果没有收到欢迎消息,就抛出 AuthenticationError 异常。

  • multiprocessing.connection.Client(address[, family[, authkey]])
  • 尝试使用 address 地址上的监听者建立一个连接,返回 Connection

连接的类型取决于 family 参数,但是通常可以省略,因为可以通过 address 的格式推导出来。(查看 地址格式 )

如果提供了 authkey 参数并且不是 None,那它必须是一个字符串并且会被当做基于 HMAC 认证的密钥。如果 authkey 是None 则不会有认证行为。认证失败抛出 AuthenticationError 异常,请查看 See 认证密码

  • class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
  • 可以监听连接请求,是对于绑定套接字或者 Windows 命名管道的封装。

address 是监听器对象中的绑定套接字或命名管道使用的地址。

注解

如果使用 '0.0.0.0' 作为监听地址,那么在Windows上这个地址无法建立连接。想要建立一个可连接的端点,应该使用 '127.0.0.1' 。

family 是套接字(或者命名管道)使用的类型。它可以是以下一种: 'AFINET' ( TCP 套接字类型), 'AF_UNIX' ( Unix 域套接字) 或者 'AF_PIPE' ( Windows 命名管道)。其中只有第一个保证各平台可用。如果 _familyNone ,那么 family 会根据 address 的格式自动推导出来。如果 address 也是 None , 则取默认值。默认值为可用类型中速度最快的。见 地址格式 。注意,如果 family'AF_UNIX' 而address是None ,套接字会在一个 tempfile.mkstemp() 创建的私有临时目录中创建。

如果监听器对象使用了套接字,backlog (默认值为1) 会在套接字绑定后传递给它的 listen() 方法。

如果提供了 authkey 参数并且不是 None,那它必须是一个字符串并且会被当做基于 HMAC 认证的密钥。如果 authkey 是None 则不会有认证行为。认证失败抛出 AuthenticationError 异常,请查看 See 认证密码

  • accept()
  • 接受一个连接并返回一个 Connection 对象,其连接到的监听器对象已绑定套接字或者命名管道。如果已经尝试过认证并且失败了,则会抛出 AuthenticationError 异常。

  • close()

  • 关闭监听器对象上的绑定套接字或者命名管道。此函数会在监听器被垃圾回收后自动调用。不过仍然建议显式调用函数关闭。

监听器对象拥有下列只读属性:

  • address
  • 监听器对象使用的地址。

  • last_accepted

  • 最后一个连接所使用的地址。如果没有的话就是 None

3.3 新版功能: 监听器对象现在支持了上下文管理协议 - 见 上下文管理器类型enter() 返回一个监听器对象, exit() 会调用 close()

  • multiprocessing.connection.wait(object_list, timeout=None)
  • 一直等待直到 object_list 中某个对象处于就绪状态。返回 object_list 中处于就绪状态的对象。如果 timeout 是一个浮点型,该方法会最多阻塞这么多秒。如果 timeoutNone ,则会允许阻塞的事件没有限制。timeout为负数的情况下和为0的情况相同。

对于 Unix 和 Windows ,满足下列条件的对象可以出现在 object_list

当一个连接或者套接字对象拥有有效的数据可被读取的时候,或者另一端关闭后,这个对象就处于就绪状态。

Unix: wait(object_list, timeout)select.select(object_list, [], [], timeout) 几乎相同。差别在于,如果 select.select() 被信号中断,它会抛出一个附带错误号为 EINTROSError 异常,而 wait() 不会。

Windows: object_list 中的元素必须是一个表示为整数的可等待的句柄(按照 Win32 函数 WaitForMultipleObjects() 的文档中所定义) 或者一个拥有 fileno() 方法的对象,这个对象返回一个套接字句柄或者管道句柄。(注意管道和套接字两种句柄 不是 可等待的句柄)

3.3 新版功能.

示例

下面的服务代码创建了一个使用 'secret password' 作为认证密码的监听器。它会等待连接然后发送一些数据给客户端:

  1. from multiprocessing.connection import Listener
  2. from array import array
  3.  
  4. address = ('localhost', 6000) # family is deduced to be 'AF_INET'
  5.  
  6. with Listener(address, authkey=b'secret password') as listener:
  7. with listener.accept() as conn:
  8. print('connection accepted from', listener.last_accepted)
  9.  
  10. conn.send([2.25, None, 'junk', float])
  11.  
  12. conn.send_bytes(b'hello')
  13.  
  14. conn.send_bytes(array('i', [42, 1729]))

下面的代码连接到服务然后从服务器上j接收一些数据:

  1. from multiprocessing.connection import Client
  2. from array import array
  3.  
  4. address = ('localhost', 6000)
  5.  
  6. with Client(address, authkey=b'secret password') as conn:
  7. print(conn.recv()) # => [2.25, None, 'junk', float]
  8.  
  9. print(conn.recv_bytes()) # => 'hello'
  10.  
  11. arr = array('i', [0, 0, 0, 0, 0])
  12. print(conn.recv_bytes_into(arr)) # => 8
  13. print(arr) # => array('i', [42, 1729, 0, 0, 0])

下面的代码使用了 wait() ,以便在同时等待多个进程发来消息。

  1. import time, random
  2. from multiprocessing import Process, Pipe, current_process
  3. from multiprocessing.connection import wait
  4.  
  5. def foo(w):
  6. for i in range(10):
  7. w.send((i, current_process().name))
  8. w.close()
  9.  
  10. if __name__ == '__main__':
  11. readers = []
  12.  
  13. for i in range(4):
  14. r, w = Pipe(duplex=False)
  15. readers.append(r)
  16. p = Process(target=foo, args=(w,))
  17. p.start()
  18. # We close the writable end of the pipe now to be sure that
  19. # p is the only process which owns a handle for it. This
  20. # ensures that when p closes its handle for the writable end,
  21. # wait() will promptly report the readable end as being ready.
  22. w.close()
  23.  
  24. while readers:
  25. for r in wait(readers):
  26. try:
  27. msg = r.recv()
  28. except EOFError:
  29. readers.remove(r)
  30. else:
  31. print(msg)

地址格式

  • 'AFINET' 地址是 (hostname, port) 形式的元组类型,其中 _hostname 是一个字符串,port 是整数。

  • 'AF_UNIX' 地址是文件系统上文件名的字符串。

    • 'AF_PIPE' 是这种格式的字符串
    • r'.\pipe{PipeName}' 。如果要用 Client() 连接到一个名为 ServerName 的远程命名管道,应该替换为使用 r'\ServerName\pipe{PipeName}' 这种格式。

注意,使用两个反斜线开头的字符串默认被当做 'AF_PIPE' 地址而不是 'AF_UNIX'

认证密码

当使用 Connection.recv 接收数据时,数据会自动被反序列化。不幸的是,对于一个不可信的数据源发来的数据,反序列化是存在安全风险的。所以 ListenerClient() 之间使用 hmac 模块进行摘要认证。

认证密钥是一个 byte 类型的字符串,可以认为是和密码一样的东西,连接建立好后,双方都会要求另一方证明知道认证密钥。(这个证明过程不会通过连接发送密钥)

如果要求认证但是没有指定认证密钥,则会使用 current_process().authkey 的返回值 (参见 Process )。 这个值会被当前进程所创建的任何 Process 对象自动继承。 这意味着(默认情况下)当一个多进程程序的所有进程在彼此之间建立连接的时候,会共享同一个认证密钥。

os.urandom() 也可以用来生成合适的认证密钥。

日志

当前模块也提供了一些对 logging 的支持。注意, logging 模块本身并没有使用进程间共享的锁,所以来自于多个进程的日志可能(具体取决于使用的日志 handler 类型)相互覆盖或者混杂。

  • multiprocessing.get_logger()
  • 返回 multiprocessing 使用的 logger,必要的话会创建一个新的。

如果创建的首个 logger 日志级别为 logging.NOTSET 并且没有默认 handler。通过这个 logger 打印的消息不会传递到根 logger。

注意在 Windows 上,子进程只会继承父进程 logger 的日志级别 - 对于logger的其他自定义项不会继承。

  • multiprocessing.log_to_stderr()
  • 此函数会调用 get_logger() 但是会在返回的 logger 上增加一个 handler,将所有输出都使用 '[%(levelname)s/%(processName)s] %(message)s' 的格式发送到 sys.stderr

下面是一个在交互式解释器中打开日志功能的例子:

  1. >>> import multiprocessing, logging
  2. >>> logger = multiprocessing.log_to_stderr()
  3. >>> logger.setLevel(logging.INFO)
  4. >>> logger.warning('doomed')
  5. [WARNING/MainProcess] doomed
  6. >>> m = multiprocessing.Manager()
  7. [INFO/SyncManager-...] child process calling self.run()
  8. [INFO/SyncManager-...] created temp directory /.../pymp-...
  9. [INFO/SyncManager-...] manager serving at '/.../listener-...'
  10. >>> del m
  11. [INFO/MainProcess] sending shutdown message to manager
  12. [INFO/SyncManager-...] manager exiting with exitcode 0

要查看日志等级的完整列表,见 logging 模块。

multiprocessing.dummy 模块

multiprocessing.dummy 复制了 multiprocessing 的 API,不过是在 threading 模块之上包装了一层。

编程指导

使用 multiprocessing 时,应遵循一些指导原则和习惯用法。

所有start方法

下面这些适用于所有start方法。

避免共享状态

应该尽可能避免在进程间传递大量数据,越少越好。

最好坚持使用队列或者管道进行进程间通信,而不是底层的同步原语。

可序列化

保证所代理的方法的参数是可以序列化的。

代理的线程安全性

不要在多线程中同时使用一个代理对象,除非你用锁保护它。

(而在不同进程中使用 相同 的代理对象却没有问题。)

使用 Join 避免僵尸进程

在 Unix 上,如果一个进程执行完成但是没有被 join,就会变成僵尸进程。一般来说,僵尸进程不会很多,因为每次新启动进程(或者 active_children() 被调用)时,所有已执行完成且没有被 join 的进程都会自动被 join,而且对一个执行完的进程调用 Process.is_alive 也会 join 这个进程。尽管如此,对自己启动的进程显式调用 join 依然是最佳实践。

继承优于序列化、反序列化

当使用 spawn 或者 forkserver 的启动方式时,multiprocessing 中的许多类型都必须是可序列化的,这样子进程才能使用它们。但是通常我们都应该避免使用管道和队列发送共享对象到另外一个进程,而是重新组织代码,对于其他进程创建出来的共享对象,让那些需要访问这些对象的子进程可以直接将这些对象从父进程继承过来。

避免杀死进程

听过 Process.terminate 停止一个进程很容易导致这个进程正在使用的共享资源(如锁、信号量、管道和队列)损坏或者变得不可用,无法在其他进程中继续使用。

所以,最好只对那些从来不使用共享资源的进程调用 Process.terminate

Join 使用队列的进程

记住,往队列放入数据的进程会一直等待直到队列中所有项被"feeder" 线程传给底层管道。(子进程可以调用队列的 Queue.canceljointhread 方法禁止这种行为)

这意味着,任何使用队列的时候,你都要确保在进程join之前,所有存放到队列中的项将会被其他进程、线程完全消费。否则不能保证这个写过队列的进程可以正常终止。记住非精灵进程会自动 join 。

下面是一个会导致死锁的例子:

  1. from multiprocessing import Process, Queuedef f(q): q.put('X' * 1000000)if name == '__main': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()

交换最后两行可以修复这个问题(或者直接删掉 p.join())。

显示传递资源给子进程

在Unix上,使用 fork 方式启动的子进程可以使用父进程中全局创建的共享资源。不过,最好是显式将资源对象通过参数的形式传递给子进程。

除了(部分原因)让代码兼容 Windows 以及其他的进程启动方式外,这种形式还保证了在子进程生命期这个对象是不会被父进程垃圾回收的。如果父进程中的某些对象被垃圾回收会导致资源释放,这就变得很重要。

所以对于实例:

  1. from multiprocessing import Process, Lockdef f(): do something using "lock" if name == 'main': lock = Lock() for i in range(10): Process(target=f).start()

应当重写成这样:

  1. from multiprocessing import Process, Lockdef f(l): do something using "l" if name == 'main': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()

谨防将 sys.stdin 数据替换为 “类似文件的对象”

multiprocessing 原本会无条件地这样调用:

  1. os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中 —— 这会导致与"进程中的进程"相关的一些问题。这已经被修改成了:

  1. sys.stdin.close()sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

它解决了进程相互冲突导致文件描述符错误的根本问题,但是对使用带缓冲的“文件类对象”替换 sys.stdin() 作为输出的应用程序造成了潜在的危险。如果多个进程调用了此文件类对象的 close() 方法,会导致相同的数据多次刷写到此对象,损坏数据。

如果你写入文件类对象并实现了自己的缓存,可以在每次追加缓存数据时记录当前进程id,从而将其变成 fork 安全的,当发现进程id变化后舍弃之前的缓存,例如:

  1. @propertydef cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache

需要更多信息,请查看 bpo-5155, bpo-5313 以及 bpo-5331

spawn 和 forkserver 启动方式

相对于 fork 启动方式,有一些额外的限制。

更依赖序列化

Process.init() 的所有参数都必须可序列化。同样的,当你继承 Process 时,需要保证当调用 Process.start 方法时,实例可以被序列化。

全局变量

记住,如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。

当全局变量知识模块级别的常量时,是不会有问题的。

安全导入主模块

确保主模块可以被新启动的Python解释器安全导入而不会引发什么副作用(比如又启动了一个子进程)

例如,使用 spawnforkserver 启动方式执行下面的模块,会引发 RuntimeError 异常而失败。

  1. from multiprocessing import Processdef foo(): print('hello')p = Process(target=foo)p.start()

应该通过下面的方法使用 if name == 'main': ,从而保护程序"入口点":

  1. from multiprocessing import Process, freezesupport, setstartmethoddef foo(): print('hello')if _name == '__main': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()

(如果程序将正常运行而不是冻结,则可以省略 freeze_support() 行)

这允许新启动的 Python 解释器安全导入模块然后运行模块中的 foo() 函数。

如果主模块中创建了进程池或者管理器,这个规则也适用。

示例

创建和使用自定义管理器、代理的示例:

  1. from multiprocessing import freeze_support
  2. from multiprocessing.managers import BaseManager, BaseProxy
  3. import operator
  4.  
  5. ##
  6.  
  7. class Foo:
  8. def f(self):
  9. print('you called Foo.f()')
  10. def g(self):
  11. print('you called Foo.g()')
  12. def _h(self):
  13. print('you called Foo._h()')
  14.  
  15. # A simple generator function
  16. def baz():
  17. for i in range(10):
  18. yield i*i
  19.  
  20. # Proxy type for generator objects
  21. class GeneratorProxy(BaseProxy):
  22. _exposed_ = ['__next__']
  23. def __iter__(self):
  24. return self
  25. def __next__(self):
  26. return self._callmethod('__next__')
  27.  
  28. # Function to return the operator module
  29. def get_operator_module():
  30. return operator
  31.  
  32. ##
  33.  
  34. class MyManager(BaseManager):
  35. pass
  36.  
  37. # register the Foo class; make `f()` and `g()` accessible via proxy
  38. MyManager.register('Foo1', Foo)
  39.  
  40. # register the Foo class; make `g()` and `_h()` accessible via proxy
  41. MyManager.register('Foo2', Foo, exposed=('g', '_h'))
  42.  
  43. # register the generator function baz; use `GeneratorProxy` to make proxies
  44. MyManager.register('baz', baz, proxytype=GeneratorProxy)
  45.  
  46. # register get_operator_module(); make public functions accessible via proxy
  47. MyManager.register('operator', get_operator_module)
  48.  
  49. ##
  50.  
  51. def test():
  52. manager = MyManager()
  53. manager.start()
  54.  
  55. print('-' * 20)
  56.  
  57. f1 = manager.Foo1()
  58. f1.f()
  59. f1.g()
  60. assert not hasattr(f1, '_h')
  61. assert sorted(f1._exposed_) == sorted(['f', 'g'])
  62.  
  63. print('-' * 20)
  64.  
  65. f2 = manager.Foo2()
  66. f2.g()
  67. f2._h()
  68. assert not hasattr(f2, 'f')
  69. assert sorted(f2._exposed_) == sorted(['g', '_h'])
  70.  
  71. print('-' * 20)
  72.  
  73. it = manager.baz()
  74. for i in it:
  75. print('<%d>' % i, end=' ')
  76. print()
  77.  
  78. print('-' * 20)
  79.  
  80. op = manager.operator()
  81. print('op.add(23, 45) =', op.add(23, 45))
  82. print('op.pow(2, 94) =', op.pow(2, 94))
  83. print('op._exposed_ =', op._exposed_)
  84.  
  85. ##
  86.  
  87. if __name__ == '__main__':
  88. freeze_support()
  89. test()

使用 Pool:

  1. import multiprocessing
  2. import time
  3. import random
  4. import sys
  5.  
  6. #
  7. # Functions used by test code
  8. #
  9.  
  10. def calculate(func, args):
  11. result = func(*args)
  12. return '%s says that %s%s = %s' % (
  13. multiprocessing.current_process().name,
  14. func.__name__, args, result
  15. )
  16.  
  17. def calculatestar(args):
  18. return calculate(*args)
  19.  
  20. def mul(a, b):
  21. time.sleep(0.5 * random.random())
  22. return a * b
  23.  
  24. def plus(a, b):
  25. time.sleep(0.5 * random.random())
  26. return a + b
  27.  
  28. def f(x):
  29. return 1.0 / (x - 5.0)
  30.  
  31. def pow3(x):
  32. return x ** 3
  33.  
  34. def noop(x):
  35. pass
  36.  
  37. #
  38. # Test code
  39. #
  40.  
  41. def test():
  42. PROCESSES = 4
  43. print('Creating pool with %d processes\n' % PROCESSES)
  44.  
  45. with multiprocessing.Pool(PROCESSES) as pool:
  46. #
  47. # Tests
  48. #
  49.  
  50. TASKS = [(mul, (i, 7)) for i in range(10)] + \
  51. [(plus, (i, 8)) for i in range(10)]
  52.  
  53. results = [pool.apply_async(calculate, t) for t in TASKS]
  54. imap_it = pool.imap(calculatestar, TASKS)
  55. imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
  56.  
  57. print('Ordered results using pool.apply_async():')
  58. for r in results:
  59. print('\t', r.get())
  60. print()
  61.  
  62. print('Ordered results using pool.imap():')
  63. for x in imap_it:
  64. print('\t', x)
  65. print()
  66.  
  67. print('Unordered results using pool.imap_unordered():')
  68. for x in imap_unordered_it:
  69. print('\t', x)
  70. print()
  71.  
  72. print('Ordered results using pool.map() --- will block till complete:')
  73. for x in pool.map(calculatestar, TASKS):
  74. print('\t', x)
  75. print()
  76.  
  77. #
  78. # Test error handling
  79. #
  80.  
  81. print('Testing error handling:')
  82.  
  83. try:
  84. print(pool.apply(f, (5,)))
  85. except ZeroDivisionError:
  86. print('\tGot ZeroDivisionError as expected from pool.apply()')
  87. else:
  88. raise AssertionError('expected ZeroDivisionError')
  89.  
  90. try:
  91. print(pool.map(f, list(range(10))))
  92. except ZeroDivisionError:
  93. print('\tGot ZeroDivisionError as expected from pool.map()')
  94. else:
  95. raise AssertionError('expected ZeroDivisionError')
  96.  
  97. try:
  98. print(list(pool.imap(f, list(range(10)))))
  99. except ZeroDivisionError:
  100. print('\tGot ZeroDivisionError as expected from list(pool.imap())')
  101. else:
  102. raise AssertionError('expected ZeroDivisionError')
  103.  
  104. it = pool.imap(f, list(range(10)))
  105. for i in range(10):
  106. try:
  107. x = next(it)
  108. except ZeroDivisionError:
  109. if i == 5:
  110. pass
  111. except StopIteration:
  112. break
  113. else:
  114. if i == 5:
  115. raise AssertionError('expected ZeroDivisionError')
  116.  
  117. assert i == 9
  118. print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
  119. print()
  120.  
  121. #
  122. # Testing timeouts
  123. #
  124.  
  125. print('Testing ApplyResult.get() with timeout:', end=' ')
  126. res = pool.apply_async(calculate, TASKS[0])
  127. while 1:
  128. sys.stdout.flush()
  129. try:
  130. sys.stdout.write('\n\t%s' % res.get(0.02))
  131. break
  132. except multiprocessing.TimeoutError:
  133. sys.stdout.write('.')
  134. print()
  135. print()
  136.  
  137. print('Testing IMapIterator.next() with timeout:', end=' ')
  138. it = pool.imap(calculatestar, TASKS)
  139. while 1:
  140. sys.stdout.flush()
  141. try:
  142. sys.stdout.write('\n\t%s' % it.next(0.02))
  143. except StopIteration:
  144. break
  145. except multiprocessing.TimeoutError:
  146. sys.stdout.write('.')
  147. print()
  148. print()
  149.  
  150.  
  151. if __name__ == '__main__':
  152. multiprocessing.freeze_support()
  153. test()

一个演示如何使用队列来向一组工作进程提供任务并收集结果的例子:

  1. import time
  2. import random
  3.  
  4. from multiprocessing import Process, Queue, current_process, freeze_support
  5.  
  6. #
  7. # Function run by worker processes
  8. #
  9.  
  10. def worker(input, output):
  11. for func, args in iter(input.get, 'STOP'):
  12. result = calculate(func, args)
  13. output.put(result)
  14.  
  15. #
  16. # Function used to calculate result
  17. #
  18.  
  19. def calculate(func, args):
  20. result = func(*args)
  21. return '%s says that %s%s = %s' % \
  22. (current_process().name, func.__name__, args, result)
  23.  
  24. #
  25. # Functions referenced by tasks
  26. #
  27.  
  28. def mul(a, b):
  29. time.sleep(0.5*random.random())
  30. return a * b
  31.  
  32. def plus(a, b):
  33. time.sleep(0.5*random.random())
  34. return a + b
  35.  
  36. #
  37. #
  38. #
  39.  
  40. def test():
  41. NUMBER_OF_PROCESSES = 4
  42. TASKS1 = [(mul, (i, 7)) for i in range(20)]
  43. TASKS2 = [(plus, (i, 8)) for i in range(10)]
  44.  
  45. # Create queues
  46. task_queue = Queue()
  47. done_queue = Queue()
  48.  
  49. # Submit tasks
  50. for task in TASKS1:
  51. task_queue.put(task)
  52.  
  53. # Start worker processes
  54. for i in range(NUMBER_OF_PROCESSES):
  55. Process(target=worker, args=(task_queue, done_queue)).start()
  56.  
  57. # Get and print results
  58. print('Unordered results:')
  59. for i in range(len(TASKS1)):
  60. print('\t', done_queue.get())
  61.  
  62. # Add more tasks using `put()`
  63. for task in TASKS2:
  64. task_queue.put(task)
  65.  
  66. # Get and print some more results
  67. for i in range(len(TASKS2)):
  68. print('\t', done_queue.get())
  69.  
  70. # Tell child processes to stop
  71. for i in range(NUMBER_OF_PROCESSES):
  72. task_queue.put('STOP')
  73.  
  74.  
  75. if __name__ == '__main__':
  76. freeze_support()
  77. test()