# Python并行计算 **Repository Path**: HaixuHe/High-performance-computing ## Basic Information - **Project Name**: Python并行计算 - **Description**: Python-并行计算 - **Primary Language**: Python - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2021-10-03 - **Last Updated**: 2023-02-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Python-并行计算 # 一、基本概念 1. 进程ID:多线程的主进程和它的子线程的进程ID,即os.getpid(),都是相同的,都是主进程的进程ID。多进程则是主进程和它的子进程都有各自的进程ID,都不相同。 1. 共享数据:多线程可以共享主进程内的数据,但是多进程用的都是各自的数据,无法共享。 1. 主线程:由Python解释器运行主py时,也就是开启了一个Python进程,而这个py是这个进程内的一个线程,不过不同于其他线程,它是主线程,同时这个进程内还有其他的比如垃圾回收等解释器级别的线程,所以进程就等于主线程这种理解是有误的。 1. CPU多核利用:Python解释器的线程只能在CPU单核上运行,开销小,但是这也是缺点,因为没有利用CPU多核的特点。Python的多进程是可以利用多个CPU核心的,但也有其他语言的多线程是可以利用多核的。 1. 单核与多核:一个CPU的主要作用是用来做计算的,多个CPU核心如果都用来做计算,那么效率肯定会提高很多,但是对于IO来说,多个CPU核心也没有太大用处,因为没有输入,后面的动作也无法执行。所以如果一个程序是计算密集型的,那么就该利用多核的优势(比如使用Python的多进程),如果是IO密集型的,那么使用单核的多线程就完全够了。Python多线程主要是为了提高程序在IO方面的优势。 1. 线程或进程间的切换:**线程间的切换是要快于进程间的切换的。 1. 死锁:指的是两个或两个以上的线程或进程在请求锁的时候形成了互相等待阻塞的情况,导致这些线程或进程无法继续执行下去,这时候称系统处于死锁状态或者系统产生了死锁,这些线程或进程就称为死锁线程或死锁进程。解决死锁的办法可以使用递归锁,即threading.RLock,然后线程或进程就可以随意请求和释放锁了,而不用担心别的线程或进程也在请求锁而产生死锁的情况。 1. 信号量与进程池:进程池Pool(n)只能是“池”中的n个进程运行,不能有新的进程,信号量只要保证最大线程数就行,而不是只有这几个线程,旧的线程运行结束,就可以继续来新的线程。 1. 进程是资源分配的最小单位,线程是CPU调度的最小单位,线程可以算作进程的子集,一个进程可能由多个线程组成。多进程的数据是分开的,共享复杂但同步简单。多线程共享进程数据,共享简单但同步复杂。
[Python内置库:threading(多线程)](https://www.cnblogs.com/guyuyun/p/11185832.html) # 二、基于进程的并行-multiprocessing - 各个进程间变量的值是**不共享**的:这是因为每创建一个进程就会copy一份原始代码(全局变量还是初值)给自己使用,所以进程间的代码是一样的,**变量和数据是独立**的。 - 各个进程间可以通过**Queue** 创建的队列来传递变量,列表,字符串值(包括全局变量的值)。 - 每个进程任务里的参数,除了**全局变量和函数局部变量**,其余的参数都需要通过外部实参,传入到内部形参。尤其是如**队列 q** 要作为**实参传给两个进程**,这样才能实现两个进程间的通信。 - **进程与线程之间的关系与区别:**(1)进程包含多个线程(2)进程间**不共用**变量与资源;线程间**共用**变量与资源(3)使用 **time.sleep()** ,可以停下当前的进程,让其他进程开始工作。 - python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到**并发执行**的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。 ## 2.1 知识点 1. multiprocessing.cpu_count() 1. 在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的**if __name__ == ‘__main__’ :语句的下面**,才能正常使用Windows下的进程模块。 ## 2.2 Process
multiprocessing模块提供了一个Process类来代表一个进程对象,multiprocessing模块像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。 ### 2.1.1 共享变量 因为多进程用的都是各自进程的数据,无法实现共享,为了解决这个问题,multiprocessing库提供了一些支持多进程之间共享的数据类型供我们使用,下面介绍几个常用的:

1. **multiprocessing.Value创建共享数值** ```python import multiprocessing def func(num): num.value = 20.0 #子进程改变数值的值,主进程跟着改变 if __name__ == "__main__": num = multiprocessing.Value("d", 10.0) # d表示浮点型数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value) print('num 改变前:', num.value) p = multiprocessing.Process(target=func, args=(num, )) p.start() p.join() print('num 改变后:', num.value) ``` 2. **multiprocessing.Arrat创建共享数组** ```python import multiprocessing def func(num): num[2] = 9999 #子进程改变数组,主进程跟着改变 if __name__ == "__main__": num = multiprocessing.Array("i", [1, 2, 3, 4, 5]) #i表示整形数值,主进程与子进程共享这个数组 print('num 数组改变前:',num[:]) p = multiprocessing.Process(target=func, args=(num, )) p.start() p.join() print('num 数组改变后:',num[:]) ``` 3. **multiprocessing.Queue创建共享队列** ```python import multiprocessing def func(i, q): ret = q.get() print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i)) q.put(i) if __name__ == "__main__": lis = multiprocessing.Queue(20) print(lis) lis.put(0) # 往lis中压入 for i in range(10): p = multiprocessing.Process(target=func, args=(i, lis,)) p.start() ``` 4. **multiprocessing.Array和np.frombuffer创建共享numpy数据** ```python import ctypes import os import multiprocessing import numpy as np NUM_PROCESS = multiprocessing.cpu_count() # 查看支持线程数 def worker(index, data): main_nparray = np.frombuffer(data, dtype=ctypes.c_double) main_nparray = main_nparray.reshape(NUM_PROCESS, 10) pid = os.getpid() main_nparray[index, :] = pid if __name__ == "__main__": shared_array_base = multiprocessing.Array(ctypes.c_double, NUM_PROCESS * 10, lock=False) # 创建一个共享Array for i in range(NUM_PROCESS): p = multiprocessing.Process(target=worker, args=(i,shared_array_base, )) p.start() p.join() main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double) # 将共享数据转为numpy格式 main_nparray = main_nparray.reshape(NUM_PROCESS, 10) print(main_nparray) ``` ### ### 2.1.2 Manager()创建共享数据 1. **multiprocessing提供了一种Manager()方式创建共享数据** ```python import multiprocessing def func(mydict, mylist): mydict["index1"] = "aaaaaa" #子进程改变dict,主进程跟着改变 mydict["index2"] = "bbbbbb" mylist.append(11) #子进程改变List,主进程跟着改变 mylist.append(22) mylist.append(33) if __name__ == "__main__": with multiprocessing.Manager() as MG: #重命名 mydict = MG.dict() #主进程与子进程共享这个字典 mylist = MG.list(range(5)) #主进程与子进程共享这个List print('mydict 改变前:', mydict) print('mylist 改变前:', mylist) p = multiprocessing.Process(target=func, args=(mydict, mylist)) p.start() p.join() print('mydict 改变后:', mydict) print('mylist 改变后:', mylist) ``` ## 2.3 Pool
Pool可以批量开启进程,用法比Process简单,并且可以直接返回进程的结果,常用的函数有map()、starmap()、close()、join() ,下面主要介绍一下map和starmap。 ### 2.3.1 Pool.map ```python import time from multiprocessing import Pool def run(fn): # fn: 函数参数是数据列表的一个元素 time.sleep(1) return fn * fn if __name__ == "__main__": testFL = [1, 2, 3, 4, 5, 6] # 写法一 # ########### # pool = Pool(3) # 创建拥有3个进程数量的进程池 # result = pool.map(run, testFL) # testFL:要处理的数据列表,run:处理testFL列表中数据的函数,result用于接受返回值 # pool.close() # 关闭进程池,不再接受新的进程 # pool.join() # 主进程阻塞等待子进程的退出 # 写法二 # ########### with Pool(3) as p: result = p.map(run, testFL) # map的第二个参数可以是列表也可以元组 p.close() # 关闭进程池,不再接受新的进程 p.join() # 主进程阻塞等待子进程的退出 print(result) ``` ### 2.3.2 Pool.starmap
Pool.starmap()也只仅接受一个迭代器参数,但在starmap()中,迭代器中的每一个元件也是一个迭代器。你可以通过这个内部迭代器向被并行化处理的函数传递参数,在执行时再顺序解开,只要传递和解开的顺序一致就行 ,Pool.starmap()就像是一个接受参数的Pool.map()版本。 ```python import time from multiprocessing import Pool def run(fn, min=0, max=5): if min < fn < max: time.sleep(1) return fn * fn if __name__ == "__main__": testFL = [1, 2, 3, 4, 5, 6] with Pool(3) as p: result = p.starmap(run, [(i, 1, 6) for i in testFL]) # starmap的第二个参数 p.close() # 关闭进程池,不再接受新的进程 p.join() # 主进程阻塞等待子进程的退出 print(result) ``` ​ **注意**: 1. multiprocessing.Pool不能直接用multiprocessing.Queue进行通信,只能通过共享内存,或者用multiprocessing.Manager()进行进程间通信。 1. 如果要使用Pool类使用**共享数据**,则需要继承共享数据,即全局数据。 1. 如果需要显式传递它们,则必须使用multiprocessing.Process。 # 三、基于线程的并行-threading ```python import threading threading.active_count() # 获取当前线程个数 threading.enumerate() # 查看线程情况 threading.current_thread() # 当前运行的线程是哪一个 ``` ## 3.1 创建线程
threading创建线程和multiprocessing的用法非常相似,需要start()、join() ```python import time import threading def test_thread(para='hi', sleep=2): """线程运行函数""" time.sleep(sleep) print(para) def main(): # 创建线程 thread_hi = threading.Thread(target=test_thread) thread_hello = threading.Thread(target=test_thread, args=('hello', 1)) # 启动线程 thread_hi.start() thread_hello.start() print('Main thread has ended!') # 该语句会直接运行,并不会等待子线程执行完毕,子线程不会阻塞主线程 if __name__ == '__main__': main() ``` ## 3.2 join和setDaemon 使用join()方法的话,主线程会等待子线程结束再继续执行(阻塞主线程)。 ```python import time import threading def test_thread(para='hi', sleep=5): """线程运行函数""" time.sleep(sleep) print(para) def main(): # 创建线程 thread_hi = threading.Thread(target=test_thread) thread_hello = threading.Thread(target=test_thread, args=('hello', 1)) # 启动线程 thread_hi.start() thread_hello.start() time.sleep(2) print('马上执行join方法了') # 执行join方法会阻塞调用线程(主线程),直到调用join方法的线程(thread_hi)结束 thread_hi.join() print('线程thread_hi已结束') # 这里不会阻塞主线程,因为运行到这里的时候,线程thread_hello已经运行结束了 thread_hello.join() print('Main thread has ended!') # 以上代码只是为了展示join方法的效果 # 如果想要等所有线程都运行完成后再做其他操作,可以使用for循环 # for thd in (thread_hi, thread_hello): # thd.join() # # print('所有线程执行结束后的其他操作') if __name__ == '__main__': main() ``` ​ setDaemon(True):将线程声明为守护线程,必须在start()方法调用之前设置。这个方法基本和join是相反的。 当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出(使用join())。但是有时候我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon()方法。 ```python import threading import time def music(): print('begin to listen music %s' % time.ctime()) time.sleep(3) print('stop to listen music %s' % time.ctime()) def game(): print('begin to play game %s' % time.ctime()) time.sleep(5) print('stop to play game %s' % time.ctime()) l = [] t1 = threading.Thread(target=music) t2 = threading.Thread(target=game) l.append(t1) l.append(t2) if __name__ == '__main__': for i in l: i.setDaemon(True) # 一定要在所有start()之前设置 for t in l: t.start() print('ending...') ``` ## 3.3 锁 锁Lock的概念在多进程和多进程中的应用广泛,同时也提空了丰富的函数可以调用,加锁的位置代表了一旦其中某一线程获取锁后,其他线程就会阻塞到此位置,当加锁的线程执行完毕释放锁后,其他线程会根据阻塞时的位置继续向向执行。 ```python import time import threading # 创建锁 lock = threading.Lock() # 全局变量 global_resource = [None] * 5 def change_resource(para, sleep): # 请求锁 lock.acquire() # 这段代码如果不加锁,第一个线程运行结束后global_resource中是乱的,输出为:修改全局变量为: ['hello', 'hi', 'hi', 'hello', 'hello'] # 第二个线程运行结束后,global_resource中还是乱的,输出为:修改全局变量为: ['hello', 'hi', 'hi', 'hi', 'hi'] global global_resource for i in range(len(global_resource)): global_resource[i] = para time.sleep(sleep) print('para',global_resource) print("修改全局变量为:", global_resource) # 释放锁 lock.release() def main(): thread_hi = threading.Thread(target=change_resource, args=('hi', 2)) thread_hello = threading.Thread(target=change_resource, args=('hello', 1)) thread_hi.start() thread_hello.start() if __name__ == '__main__': main() ``` ## 3.4 获取返回值 ### 3.4.1 使用Queue
**两种常见的Queue类型:**
​ queue.Queue # FIFO 即 First in First Out,先进先出
queue.LifoQueue # LIFO 即 Last in First Out,后进先出,与栈的类似

**关于Queue的其他函数 :**

Queue.qsize() # 返回队列的大小
Queue.empty() # 如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息
Queue.full() # 如果队列满了,返回True,反之False,给生产者提醒
Queue.get([block[, timeout]]) # 读队列,timeout等待时间
Queue.put(item, [block[, timeout]]) # 写队列,timeout等待时间
Queue.queue.clear() # 清空队列 ```python import threading import queue import numpy as np def demo(data): result = np.sum(data) Q.put(result) # 将数据压入 Q 中 if __name__ == '__main__': Q = queue.Queue(maxsize=5) data1 = np.array([1, 2, 3, 4, 5]) data2 = np.array([4, 5, 6, 7, 8]) t1 = threading.Thread(target=demo, args=(data1, )) t2 = threading.Thread(target=demo, args=(data2, )) t1.start() t2.start() t1.join() t2.join() print(Q.get()) # 从 Q 中取出数据 print(Q.get()) ``` ### 3.4.2 重写threading.Thread
也可以通过重写threading.Thread类来获取线程的返回值 ```python import threading # 多线程如何返回值 class MyThread(threading.Thread): def __init__(self, func, args=()): super(MyThread, self).__init__() self.func = func self.args = args self.result = self.func(*self.args) def get_result(self): try: return self.result # 如果子线程不使用join方法,此处可能会报没有self.result的错误 except Exception: return None def add(a, b): return a + b if __name__ == '__main__': threads = [] t1 = MyThread(add, args=( 1, 2, )) threads.append(t1) t2 = MyThread(add, args=( 3, 4, )) threads.append(t2) for t in threads: t.setDaemon(True) t.start() print("t1 : a+b=%d !" % (t1.get_result())) print("t2 : a+b=%d !" % (t2.get_result())) ``` # 名词解释 - 同步,调用方和被调用方结果一起返回 - 异步,调用方返回但是被调用方结果还没有返回 - 阻塞,阻塞调用是指调用结果返回之前,当前线程会被挂起 - 非阻塞,非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程 # 参考链接 1. [https://blog.csdn.net/fallfall2008/article/details/108891062](https://blog.csdn.net/fallfall2008/article/details/108891062) 1. [https://www.cnblogs.com/kaituorensheng/p/4445418.html](https://www.cnblogs.com/kaituorensheng/p/4445418.html) 1. [https://www.cnblogs.com/devilmaycry812839668/p/15074778.html](https://www.cnblogs.com/devilmaycry812839668/p/15074778.html)​ 4. [https://www.cnblogs.com/guyuyun/p/11185832.html](https://www.cnblogs.com/guyuyun/p/11185832.html) 4. [https://www.cnblogs.com/franknihao/p/6627857.html](https://www.cnblogs.com/franknihao/p/6627857.html) 4. [https://www.jianshu.com/p/e30d302ebdeb](https://www.jianshu.com/p/e30d302ebdeb) 4. [https://blog.csdn.net/xuexiaoyaani/article/details/80644678](https://blog.csdn.net/xuexiaoyaani/article/details/80644678)