# python multiprocessing **Repository Path**: damon_SJTU/python-multiprocessing ## Basic Information - **Project Name**: python multiprocessing - **Description**: A introduction to multiprocessing package in python. - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-02-28 - **Last Updated**: 2024-03-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## python multiprocessing 1. [创建多进程](#一-创建多进程) 2. [进程间通信](#二-进程间通信) 3. [进程间数据共享](#三-进程间数据共享) 4. [进程间同步](#四-进程间同步) 5. [](#) ## 一. 创建多进程 python multiprocessing 和 threading 的 API 类似,但是 multiprocessing 可以绕过 python global interpreter lock 的限制,真正使用多个 cpu 核心。 ### 1. Process 类 #### 1.1 直接使用 Process 类 在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。 一个简单的多进程程序示例是: ```python from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') # ========== create method 1, directly using Process ========== # p = Process(target=f, args=('bob',)) # ========== create method 2, using get_context ========== ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() p.join() ``` 可以使用 get_context() 来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法. 创建一个新进程的启动方法主要有三种: - spawn: 父进程会启动一个新的 Python 解释器进程。 子进程将只继承那些运行进程对象的 run() 方法所必须的资源。 特别地,来自父进程的非必需文件描述符和句柄将不会被继承。 使用此方法启动进程相比使用 fork 或 forkserver 要慢上许多。 - fork: 父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。fork 对于子进程和父进程的安全保证是比较困难的。 - forkserver: #### 1.2 继承 Process 类 可以通过类的继承来实现 multiprocessing 多进程 ```python from multiprocessing import Process class BaseProcess(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None): self._target = target ... def run(self): ''' Method to be run in sub-process; can be overridden in sub-class ''' if self._target: self._target(*self._args, **self._kwargs) # Process 类通过target 参数传入要执行的函数,在 run() 里面调用 target 对应的函数。所以继承 Process 类也可以通过覆写 run() 函数来达到控制多进程行为的方式。 class MyProcess(Process): def __init__(self, name): super(MyProcess, self).__init__() self.name = name def run(self): # 重写父类的run()方法,其中应包含进程将要执行的代码 print('hi', self.name + '!') if __name__ == '__main__': p = MyProcess('tony') p.start() p.join() ``` #### 1.3 Process 对象常用的方法和属性: - name:进程的名称; - daemon:布尔值,是否是守护进程; - pid:进程ID; - exitcode:进程退出码; - run():表示进程所要做的事情,通过向参数 taget 指定一个函数的方式指定 run 的行为,或者在子类中重载该方法; - start():启动进程; - join():阻塞当前进程,直至调用该方法的进程结束; - is_alive():判断进程是否还活着; - terminate():终止进程; - close():关闭Process对象,释放与之关联的资源; ```python from multiprocessing import Process import time import signal p = Process(target=time.sleep, args=(10, )) print(p, p.is_alive()) # False p.start() print(p, p.is_alive()) # True p.terminate() time.sleep(1) print(p, p.is_alive()) # False print(p.exitcode == -signal.SIGTERM) # True ``` ### 1.4 进程池 Pool 批量创建进程: Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。Pool 是一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。 Warning: 需要注意的是:multiprocessing.pool 对象具有需要正确管理的内部资源 (像任何其他资源一样),具体方式是**将进程池用作上下文管理器,或者手动调用 close() 和 terminate()**。 未做此类操作将导致进程在终结阶段挂起。请注意依赖垃圾回收器来销毁进程池是不正确的做法,因为 CPython 并不保证进程池终结器会被调用。 ```python class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) ``` 参数包括: - processes(可选):指定要创建的工作进程数。默认值为 None,这将使用计算机上的 CPU 核心数作为工作进程数。 - initializer(可选):指定一个可调用对象,如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)。 - initargs(可选):传递给 initializer 的参数元组。 - maxtasksperchild(可选):是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,以为了能够释放未使用的资源。默认的 maxtasksperchild 是 None,意味着worker processes 的生命周期和 Pool 一致。 - context 可用于指定用于启动工作进程的上下文。通常,池是使用函数 multiprocessing.Pool() 或上下文对象的 Pool() 方法创建的。在这两种情况下,上下文都被适当设置。 multiprocessing.pool 包含的主要方法包括: - **apply(func[, args[, kwds]])**: 使用 args 参数以及 kwds 命名参数调用 func , 它会返回结果前**阻塞**。func 只会在一个进程池中的一个工作进程中执行。 - **apply_async(func[, args[, kwds[, callback[, error_callback]]]])**: apply() 方法的一个变种,返回一个 **AsyncResult** 对象。回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。 - **map(func, iterable[, chunksize])**: 内置 map() 函数的并行版本,它会保持阻塞直到获得结果。这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小可以。 - **map_async(func, iterable[, chunksize[, callback[, error_callback]]])**: map() 方法的一个变种,返回一个 **AsyncResult** 对象。 回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。 - **imap(func, iterable[, chunksize])**: map() 的延迟执行版本。 - **imap_unordered(func, iterable[, chunksize])**: 和 imap() 相同,只不过通过迭代器返回的结果是任意的 - **close()**: 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。 - **terminate()**: 不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用 terminate()。 - **join()**: 等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate() 。 **class multiprocessing.pool.AsyncResult** 是 Pool.apply_async() 和 Pool.map_async() 返回对象所属的类。其方法包括: - **get([timeout])**: 用于获取执行结果。如果 timeout 不是 None 并且在 timeout 秒内仍然没有执行完得到结果,则抛出 multiprocessing.TimeoutError 异常。如果远程调用发生异常,这个异常会通过 get() 重新抛出。 - **wait([timeout])**: 阻塞,直到返回结果,或者 timeout 秒后超时。 - **ready()**: 返回执行状态,是否已经完成。 - **successful()**: 判断调用是否已经完成并且未引发异常。 如果还未获得结果则将引发 ValueError。 使用 multiprocessing.Pool 的例子是: ```python from multiprocessing import Pool import time import random def f(x): time.sleep(random.random()/10) return x*x if __name__ == '__main__': with Pool(processes=4) as pool: # start 4 worker processes # ========== 1. apply 阻塞版本 ========== result = pool.apply(f, (10,)) # ========== 2. apply_async 非阻塞版本 ========== result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow # ========== 3. map 并行阻塞版本 ========== print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" # ========== 4. imap 并行,延迟执行版本 ========== it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow # ========== 5. map_async 并行,非阻塞版本 ========== result_async = pool.map_async(f, range(10)) print(result_async.get()) # 获取结果列表 # ========== 6. imap_unordered 并行,乱序延迟版本 ========== it = pool.imap_unordered(f, range(10)) print(next(it)) # may be any in "[0, 1, 4,..., 81]" print(next(it)) # may be any in "[0, 1, 4,..., 81]" print(it.next(timeout=1)) # may be any in "[0, 1, 4,..., 81]" result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError ``` ## 二. 进程间通信 multiprocessing 支持进程之间的两种通信通道:[Queue](#Queue) 和 [Pipe](#Pipe)

2.1 Queue 队列

multiprocessing.Queue 类是一个近似 queue.Queue 的克隆,Queue 是线程和进程安全的。放出和从队列中取出的函数分别是 **put()** 和 **get()** 函数。 例如: ```python from multiprocessing import Process, Queue import time, os def prodcut(q): print("开始生产.") for i in range(5): time.sleep(1) q.put('产品'+str(i)) print("产品"+str(i)+"生产完成") def consume(q): while True: prod = q.get() print("消费者:{},消费产品:{}".format(os.getpid(), prod)) time.sleep(1) if __name__ == '__main__': q = Queue() p = Process(target=prodcut, args=(q, )) # 生产者 c1 = Process(target=consume, args=(q, )) # 消费者1 c2 = Process(target=consume, args=(q, )) # 消费者2 p.start() c1.start() c2.start() p.join() # 当生产者结束后,将两个消费则也结束 c1.terminate() c2.terminate() ``` 输出如下。这里消费者 28772,28773 会交替的获得数据,是因为其 sleep(1) 和 生产者的 sleep(1) 比较吻合。实际上,一般说来,multiprocessing.Queue 是基于管道和锁实现的,它可以安全地在多个进程之间传递数据。当多个消费者同时等待一个产品时,操作系统会根据调度算法在可用的消费者进程之间进行切换,并按照一定的顺序将产品传递给它们。 由于操作系统的调度算法和进程执行速度的差异,无法保证两个消费者进程完全交替获取产品。 ``` 开始生产. 产品0生产完成 消费者:28772,消费产品:产品0 产品1生产完成 消费者:28773,消费产品:产品1 产品2生产完成 消费者:28772,消费产品:产品2 产品3生产完成 消费者:28773,消费产品:产品3 产品4生产完成 消费者:28772,消费产品:产品4 ```

2.2 Pipe 管道

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 **send()** 和 **recv()** 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。例子: ```python from multiprocessing import Pipe, Pool import os,time def product(send_pipe): print("开始生产.") for i in range(5): time.sleep(1) send_pipe.send("产品"+str(i)) print("产品" + str(i) + "生产完成") def consume(recv_pipe): while True: print("消费者:{},消费产品:{}".format(os.getpid(), recv_pipe.recv())) time.sleep(1) if __name__ == '__main__': # 使用进程池来创建进程 send_pipe, recv_pipe = Pipe() pool = Pool(2) pool.apply_async(product, args=(send_pipe,)) pool.apply_async(consume, args=(recv_pipe,)) pool.close() pool.join() ``` 输出如下: ``` 开始生产. 产品0生产完成 消费者:25036,消费产品:产品0 产品1生产完成 消费者:25036,消费产品:产品1 产品2生产完成 消费者:25036,消费产品:产品2 产品3生产完成 消费者:25036,消费产品:产品3 产品4生产完成 消费者:25036,消费产品:产品4 ``` ## 三. 进程间数据共享 Python进程间数据共享主要有两种方式,一种是**共享内存**(#shm),另一种是通过数据管理其 **(Manager)**(#manager) 来实现。

3.1 共享内存

共享内存允许多个进程共享一个存储区域,一个进程写入共享内存中的信息,其他进程可以方便的读取。 在Python中可以使用 **Value**、**Array** 将数据存储在共享内存中,也可以使用模块 multiprocessing.sharedctypes 自定义共享内存的 ctypes 对象。 例子如: ```python from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) ``` 将打印出: ``` 3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] ``` 创建 num 和 arr 时使用的 'd' 和 'i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。 使用自定义的共享内存如下: ```python from multiprocessing import Process, Lock from multiprocessing.sharedctypes import Value, Array from ctypes import Structure, c_double # 自定义数据结构 class Point(Structure): _fields_ = [('x', c_double), ('y', c_double)] def modify(n, x, s, A): n.value **= 2 x.value **= 2 s.value = s.value.upper() for a in A: a.x **= 2 a.y **= 2 if __name__ == '__main__': lock = Lock() n = Value('i', 7) # 定义int型 x = Value(c_double, 1.0/3.0, lock=False) # 定义ctypes类型 s = Array('c', b'hello world', lock=lock) # 定义字符串 A = Array(Point, [(1.875, -6.25), (-5.75, 2.0), (2.375, 9.5)], lock=lock) # 定义Point类型的数组 p = Process(target=modify, args=(n, x, s, A)) p.start() p.join() print(n.value) print(x.value) print(s.value) print([(a.x, a.y) for a in A]) ``` 将输出: ``` 49 0.1111111111111111 b'HELLO WORLD' [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)] ```

3.2 Manager(数据管理器)

由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用**代理(Proxy)**操作它们。使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。并且每个manager() 都会耗费一定的资源(比如memory和cpu资源)。 Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array. 例子如下: ```python from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None d['a'].append(7) temp_b = d['b'] temp_b.append(7) d['b'] = temp_b d['c'].append(7) l.reverse() l.append(100) if __name__ == '__main__': with Manager() as manager: d = manager.dict() d['a'] = [1] d['b'] = [1] d['c'] = manager.list([1]) l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(d['c'], len(d['c'])) print(l) ``` 将打印: ``` {'a': [1], 'b': [1, 7], 'c': , 1: '1', '2': 2, 0.25: None} [1, 7] 2 [9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 100] ``` 虽然 manager.list() 和 manager.dict() 实现了在多进程间共享 list 和 dict 变量,但是有时候使用 manager 仍会发现变量没有被其他进程改变,比如这里的 d['a']. 同样的,manager.list()创建多层列表时里面列表中的元素修改无效。这是因为它们是**可变对象**,修改时内存地址不变,于是主进程还是读取原来的地址,读到的还是原来的值。 解决方法之一是,如 d['c'] = manager.list([1]) 一样,内层也使用共享的 list 等对象。另外一个方法是如同对 d['b'] 的处理一样,对 d['b'] 修改后再赋值。这一次赋值会直接改变对象的内存地址,于是修改是有效的。第一种方法时,如果要在子进程内部新定义一个内层的 共享list(), 不能在每个子进程都创建一个 manager() 对象,而可能需要把 manager() 对象传给子进程。 Manager() 管理的对象称为代理对象,代理是一个 指向 其他共享对象的对象,这个对象(很可能)在另外一个进程中。共享对象也可以说是代理 指涉 的对象。多个代理对象可能指向同一个指涉对象。 代理对象代理了指涉对象的一系列方法调用(虽然并不是指涉对象的每个方法都有必要被代理)。通过这种方式,代理的使用方法可以和它的指涉对象一样。 被代理的对象很重要的一点是必须可以被序列化,这样才能允许他们在进程间传递。因此,指涉对象可以包含 代理对象 。这允许管理器中列表、字典或者其他 代理对象 对象之间的嵌套。 如果指涉对象包含了普通 list 或 dict 对象,对这些内部可变对象的修改不会通过管理器传播,因为代理无法得知被包含的值什么时候被修改了。但是把存放在容器代理中的值本身是会通过管理器传播的(会触发代理对象中的 __setitem__ )从而有效修改这些对象,所以可以把修改过的值重新赋值给容器代理。 **Note:python 和 C++ 内存分配的区别** C++ 是编译性语言,也就是说变量的地址在编译期间都是确定的。比如先定义了一个 int a=1; 再赋值 a=2; 此时底层做的事情应该类似于 Mov 0x.... 2; 也就是往变量 a 对应的内存地址的位置写入 2. python 是动态的非编译性语言,一个对象 a 在 python 里可能实际上是一个 C 对象。比如 List,就是通过 cpython 源码里的 PyListObject 类定义的一个 C 对象,python 端获得的是一个 C 对象的指针,该指针指向的就是 PyListObject 对象。 所以在 C 里,一个 int 型变量可能占据的是 4 字节,但是在 python 里定义一个 int 变量,除了变量自身占有的 4 字节外(python int 实际上并不一定是 4 字节,随着其大小变量,可能需要更多的 bit 来保存这个数值),还至少需要一个指针。这个指针指向的是 int(上文的 1 或者 2)保存的地址。所以你可以认为,有一个 python 的栈,里面保存着你定义的每个变量(类似 C 函数,栈用来控制变量的生命周期?)。这些栈里变量保存的都是地址,对应的地址里才存着真实的数据。 Manager() 实际上是新起了一个进程,这个进程可以被称作一个 proxy (即一个服务进程 server)。然后 Manager().dict(), Manager().list() 都是利用 shared_memory 来保存一个 dict,list 等对象,这些 shm 里的对象只能通过定义其的 proxy 进程来进行更改。 在某个工作子进程里,对 dict 发起修改的实质是,将修改这个需求发给 proxy 进程,请求 proxy 进程来对 shm 中对应的数据进行修改。因为这个请求是可以通过网络来进行的,所以这种方法甚至能达到多台机器间的数据共享。比如 dict['a'] = 2,或者添加/删除 key 都没问题,因为这个 dict 对象都是在 shm 里被 proxy 控制的。但是如果 dict 的某个 value 是一个可变对象,情况就发生变化了。 比如 dict['b'] = [1]; dict['b'].append(2) 或者 dict['b'][0] = [-1] 的修改都是无效的。 这是为什么呢?因为被 shared 的只是 dict 对象, dict['b'] 这个 list 对象并不是共享的。那么 dict['b'] 实际上会给使用它的进程返回一个副本,所以 append 的实际上发生在副本上,对 shm 里的数据没产生影响。 这里是一段 stackoverflow 的解释: When you create a managed dictionary, the actual implementation of that dictionary "lives" in the address space of a new process that is created by the Multiprocessing.SyncManager instance that is created and started when you call multiprocessing.Manager(). The actual value that is returned by multiprocessor.Manager().dict() is a reference to a special proxy in shared memory that is created, which can be passed to other processes. But when you invoked a method on this proxy, what is actually happening is that the proxy is sending a message to the manager server process via a socket to perform the method on the actual dictionary implementation in the server's address space. So you can add a key/value pair to the managed dictionary where that value is a dictionary. But if you start modifying that dictionary by adding/deleting/modifying keys, since that dictionary is not a managed dictionary, you are only modifying the copy in the local address space. 所以上述子进程获得的 dict对象的值的 copy 并不是平时简简单单的 b = a 这样,而是在两个不同的地址空间里。 ```python from multiprocessing import Process, Manager def print_dict_address(shared_dict): # 输出共享字典对象的地址和键 'a' 的地址 print(f"Shared Dict Address: {id(shared_dict)} {id(shared_dict)}") value_in_cur_proc = shared_dict['a'] # print(shared_dict) print(f"Key 'a' Address: {id(shared_dict['a'])} {id(value_in_cur_proc)}") # shared_dict['a'].append(-100) print(f"Key 'a' Address: {id(shared_dict['a'])}") # print(shared_dict) if __name__ == '__main__': # 创建共享字典 manager = Manager() shared_dict = manager.dict() # 添加元素到共享字典 shared_dict['a'] = [1, 2, 3] # 输出主进程中的字典地址 print(f"Main Process - Shared Dict Address: {id(shared_dict)}") print(f"Main Process - Key 'a' Address: {id(shared_dict['a'])}") # 创建两个子进程,分别输出子进程中的字典地址 p1 = Process(target=print_dict_address, args=(shared_dict,)) p2 = Process(target=print_dict_address, args=(shared_dict,)) p1.start() p2.start() print(f"Main Process - Shared Dict Address: {id(shared_dict)}") print(f"Main Process - Key 'a' Address: {id(shared_dict['a'])}") p1.join() p2.join() ``` 上述代码的输出如下: ``` Main Process - Shared Dict Address: 139997044400208 Main Process - Key 'a' Address: 139997036802624 Main Process - Shared Dict Address: 139997044400208 Main Process - Key 'a' Address: 139997036896384 Shared Dict Address: 139997044400208 139997044400208 Shared Dict Address: 139997044400208 139997044400208 Key 'a' Address: 139997036896384 139997036897856 Key 'a' Address: 139997036896320 139997036897728 Key 'a' Address: 139997036896384 Key 'a' Address: 139997036896320 ``` 从输出中可以看到,不同进程的 id(shared_dict) 是一致的,也就是都是 shm 的那个对象? 但是 dict['a'] 在不同进程里看到的 id 是不一致的,并且在当前进程内进行赋值 value_in_cur_proc = shared_dict['a']。 id(value_in_cur_proc ) 和 id(shared_dict['a']) 也是不一致的。但是同一进程里多次获得 id(shared_dict['a']) 也是一致的。 这里的原因是:获取 shared_dict['a'] 实际上当前进程获得的是共享内存的一份 copy,在当前进程的地址空间里,所以不同进程获得的 id(shared_dict) 不一致很合理。同一进程多次获得 id(value_in_cur_proc),可能获得相同的结果,这是因为前一次得到的(value_in_cur_proc ) 是一个临时对象,print(id(value_in_cur_proc )) 这句代码执行完毕后就被 python 解释器 garbage collection了。所以再次 print(id(value_in_cur_proc )),python 解释器又将其安排给了原来的这个 id。但是为什么赋值后 id 会不一致呢?因为被赋值的变量在当前进程还并未被 gabbage collection,所以从 shm 获得一份拷贝,这个拷贝自然就只能占据其余的 id 了。 **NOTE:这个Manager() 也是有生命周期的,最好在 with context 环境下使用,或者要主动调用 terminate() 方法** ## 四. 进程间同步 当多个进程对一个共享的变量进行读写操作时,为了保证运行结果的正确性,通常需要对进程之间进行同步。当然,同步会降低并发的程度。常见的同步方式有:[Lock(锁)](#lock)、[Semaphore(信号量)(#semaphore)]、[Event(事件)](#event)和[Condition(条件变量)](#condition)。

4.1 锁(Lock)

通过使用Lock来控制一段代码在同一时间只能被一个进程执行。Lock对象的两个方法,acquire()用来获取锁,release()用来释放锁。当一个进程调用acquire()时,如果锁的状态为unlocked,那么会立即修改为locked并返回,这时该进程即获得了锁。如果锁的状态为locked,那么调用acquire()的进程则阻塞。 注意 Lock 实际上是一个工厂函数。它返回由默认上下文初始化的 multiprocessing.synchronize.Lock 对象。 - acquire(block=True, timeout=None)¶ 可以阻塞或非阻塞地获得锁。 如果 block 参数被设为 True ( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回 True 。需要注意的是第一个参数名与 threading.Lock.acquire() 的不同。 如果 block 参数被设置成 False ,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回 False ; 否则将锁设置成锁住状态,并返回 True 。 当 timeout 是一个正浮点数时,会在等待锁的过程中最多阻塞等待 timeout 秒,当 timeout 是负数时,效果和 timeout 为0时一样,当 timeout 是 None (默认值)时,等待时间是无限长。需要注意的是,对于 timeout 参数是负数和 None 的情况, 其行为与 threading.Lock.acquire() 是不一样的。当 block 参数 为 False 时, timeout 并没有实际用处,会直接忽略。否则,函数会在拿到锁后返回 True 或者 超时没拿到锁后返回 False 。 - release() 释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。 当尝试释放一个没有被持有的锁时,会抛出 ValueError 异常,除此之外其行为与 threading.Lock.release() 一样。 RLock 是一个递归锁,递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。 - acquire(block=True, timeout=None) 可以阻塞或非阻塞地获得锁。 当 block 参数设置为 True 时,会一直阻塞直到锁处于空闲状态(没有被任何进程、线程拥有),除非当前进程或线程已经拥有了这把锁。然后当前进程/线程会持有这把锁(在锁没有其他持有者的情况下),锁内的递归等级加一,并返回 True . 注意, 这个函数第一个参数的行为和 threading.RLock.acquire() 的实现有几个不同点,包括参数名本身。 当 block 参数是 False , 将不会阻塞,如果此时锁被其他进程或者线程持有,当前进程、线程获取锁操作失败,锁的递归等级也不会改变,函数返回 False , 如果当前锁已经处于释放状态,则当前进程、线程则会拿到锁,并且锁内的递归等级加一,函数返回 True 。 timeout 参数的使用方法及行为与 Lock.acquire() 一样。但是要注意 timeout 的其中一些行为和 threading.RLock.acquire() 中实现的行为是不同的。 - release() 释放锁,使锁内的递归等级减一。如果释放后锁内的递归等级降低为0,则会重置锁的状态为释放状态(即没有被任何进程、线程持有),重置后如果有有其他进程和线程在等待这把锁,他们中的一个会获得这个锁而继续运行。如果释放后锁内的递归等级还没到达0,则这个锁仍将保持未释放状态且当前进程和线程仍然是持有者。 只有当前进程或线程是锁的持有者时,才允许调用这个方法。如果当前进程或线程不是这个锁的拥有者,或者这个锁处于已释放的状态(即没有任何拥有者),调用这个方法会抛出 AssertionError 异常。注意这里抛出的异常类型和 threading.RLock.release() 中实现的行为不一样。 **NOTE: Lock 和 RLoCK 支持 context manager 协议,因此可在 with 语句内使用。** 使用例子如下: ```python import os import time import random from multiprocessing import Process, Lock def work(lock, n): # lock.acquire() # 获得锁,只有一个进程可以获得 with lock: # 在 with context 中使用 lock print('{}: {} is running'.format(n, os.getpid())) time.sleep(random.random()) print('{}: {} is done'.format(n, os.getpid())) # lock.release() # 释放锁 if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work, args=(lock,i)) p.start() ``` 输出如下,可以确保不同的进程间的操作是有序的。 ``` 0: 16356 is running 0: 16356 is done 2: 23500 is running 2: 23500 is done 1: 18336 is running 1: 18336 is done ```

4.2 信号量(Semaphore)

Lock 只允许同一时刻有一个进程访问锁住的代码段,而 Semaphore 则是允许**一定数量**的进程访问。Semaphore 在实现时会维护一个计数器,每调用一个 acquire(),计数器减 1,调用一次 release() 则计数器加 1。当计数器为 0 时,调用 acquire() 则会阻塞. 示例代码如下: ```python import multiprocessing import time def worker(smp, i): # smp.acquire() with smp: print(multiprocessing.current_process().name + " acquire") time.sleep(i) print(multiprocessing.current_process().name + " release") # smp.release() if __name__ == "__main__": smp = multiprocessing.Semaphore(2) # 最多允许2个进程进入,否则阻塞 for i in range(5): p = multiprocessing.Process(target=worker, args=(smp, i * 2)) p.start() ``` 一个可能的输出如下: ``` Process-1 acquire Process-1 release Process-3 acquire Process-2 acquire Process-2 release Process-4 acquire Process-3 release Process-5 acquire Process-4 release Process-5 release ```

4.3 事件(Event)

vent是主线程控制其他线程的方式。在Event机制中,会设置一个"Flag" (内部标识),如果"Flag"为False时,那么调用event.wait()的进程就会阻塞。当"Flag"为True时,那些阻塞了的进程就不再阻塞。其中, set()方法用于设置"Flag"为True,clear()则是设置"Flag"为False。 - is_set() 当且仅当内部标识为 true 时返回 True 。 isSet 方法是此方法的已弃用别名。 - set() 将内部标识设置为 true 。所有正在等待这个事件的线程将被唤醒。当标识为 true 时,调用 wait() 方法的线程不会被被阻塞。 - clear() 将内部标识设置为 false 。之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标识再次设置为 true 。 - wait(timeout=None) Block as long as the internal flag is false and the timeout, if given, has not expired. The return value represents the reason that this blocking method returned; True if returning because the internal flag is set to true, or False if a timeout is given and the the internal flag did not become true within the given wait time. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds, or fractions thereof. 在 3.1 版本发生变更: 很明显,方法总是返回 None。 一个使用的例子如下: ```python import multiprocessing import time def wait_for_event(e): """等待event对象被设置为True""" print('wait_for_event: starting') res = e.wait() # 如果主线程不设置event对象,那么该进程会一直阻塞 print('wait_for_event: e.is_set()->' + str(e.is_set()) + f'. res is {res}') def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print('wait_for_event_timeout: starting') res = e.wait(t) # 如果主线程不设置event对象,那么该进程会一直阻塞直至超市 print('wait_for_event_timeout: e.is_set()->' + str(e.is_set()) + f'. res is {res}') if __name__ == '__main__': e = multiprocessing.Event() print(e.is_set()) # Event的初始状态为False。此时,任何调用该Event对象的wait()方法都会阻塞 w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start() w2 = multiprocessing.Process(name='non-block', target=wait_for_event_timeout, args=(e, 2)) w2.start() time.sleep(10) e.set() print('main: event is set') ``` 得到的输出如下: ``` False wait_for_event: starting wait_for_event_timeout: starting wait_for_event_timeout: e.is_set()->False. res is False main: event is set wait_for_event: e.is_set()->True. res is True ```

4.4 条件变量(Condition)

class threading.Condition(lock=None):实现条件变量对象的类。一个条件变量对象允许一个或多个线程在被其它线程所通知之前进行等待。如果给出了非 None 的 lock 参数,则它必须为 Lock 或者 RLock 对象,并且它将被用作底层锁。否则,将会创建新的 RLock 对象,并将其用作底层锁。 - acquire(*args) 请求底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值。 - release() 释放底层锁。此方法调用底层锁的相应方法。没有返回值。 - wait(timeout=None) 等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发 RuntimeError 异常。 这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify() 或 notify_all() 唤醒它,或者直到可选的超时发生。一旦被唤醒或者超时,它重新获得锁并返回。 当提供了 timeout 参数且不是 None 时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。 当底层锁是个 RLock ,不会使用它的 release() 方法释放锁,因为当它被递归多次获取时,实际上可能无法解锁。相反,使用了 RLock 类的内部接口,即使多次递归获取它也能解锁它。 然后,在重新获取锁时,使用另一个内部接口来恢复递归级别。 返回 True ,除非提供的 timeout 过期,这种情况下返回 False。 在 3.2 版本发生变更: 很明显,方法总是返回 None。 - wait_for(predicate, timeout=None) 等待,直到条件计算为真。 predicate 应该是一个可调用对象而且它的返回值可被解释为一个布尔值。可以提供 timeout 参数给出最大等待时间。 这个实用方法会重复地调用 wait() 直到满足判断式或者发生超时。返回值是判断式最后一个返回值,而且如果方法发生超时会返回 False 。 忽略超时功能,调用此方法大致相当于编写: ```python while not predicate(): cv.wait() ``` 因此,规则同样适用于 wait() :锁必须在被调用时保持获取,并在返回时重新获取。 随着锁定执行判断式。 在 3.2 版本加入. - notify(n=1) 默认唤醒一个等待这个条件的线程。如果调用线程在没有获得锁的情况下调用这个方法,会引发 RuntimeError 异常。 这个方法唤醒最多 n 个正在等待这个条件变量的线程;如果没有线程在等待,这是一个空操作。 当前实现中,如果至少有 n 个线程正在等待,准确唤醒 n 个线程。但是依赖这个行为并不安全。未来,优化的实现有时会唤醒超过 n 个线程。 注意:被唤醒的线程并没有真正恢复到它调用的 wait() ,直到它可以重新获得锁。 因为 notify() 不释放锁,其调用者才应该这样做。 - notify_all() 唤醒所有正在等待这个条件的线程。这个方法行为与 notify() 相似,但并不只唤醒单一线程,而是唤醒所有等待线程。如果调用线程在调用这个方法时没有获得锁,会引发 RuntimeError 异常。 notifyAll 方法是此方法的已弃用别名。 下面是一个进程间的生产者-消费者的例子,当生产达到5个产品时开始消费,并在消费完所有产品后再进行生产。 ```python from multiprocessing import Process, Condition, Value import time def product(num, con): con.acquire() while True: print("开始生产.") time.sleep(1) if num.value >= 5: print("产生数量已达到5个,无法继续生产") con.notify() # 唤醒消费者 con.wait() # 阻塞,等待唤醒 num.value += 1 print("产品数量:{}".format(str(num.value))) con.release() def consume(num, con): con.acquire() while True: print("开始消费.") time.sleep(1) if num.value <= 0: print("产品已被消费完.") con.notify() # 唤醒生产者 con.wait() # 阻塞,等待唤醒 num.value -= 1 print("产品剩余数量:{}".format(num.value)) con.release() if __name__ == '__main__': num = Value('i', 0) # 进程间共享内存 con = Condition() producer = Process(target=product, args=(num, con)) consumer = Process(target=consume, args=(num, con)) producer.start() consumer.start() ``` 获得的部分输出如下: ``` 开始生产. 产品数量:1 开始生产. 产品数量:2 开始生产. 产品数量:3 开始生产. 产品数量:4 开始生产. 产品数量:5 开始生产. 产生数量已达到5个,无法继续生产 开始消费. 产品剩余数量:4 开始消费. 产品剩余数量:3 开始消费. 产品剩余数量:2 开始消费. 产品剩余数量:1 开始消费. 产品剩余数量:0 开始消费. 产品已被消费完. 产品数量:1 开始生产. 产品数量:2 开始生产. ```