# parallel_test **Repository Path**: huangjeake/parallel_test ## Basic Information - **Project Name**: parallel_test - **Description**: python3并行计算测试 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-05-28 - **Last Updated**: 2021-05-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### 并发与并行的区别 * 并发:concurrency。单个cpu+多道技术就可以实现并发 * 并行:parallel:同时运行,只有具备多个cpu才能实现并行 ### 使用场景 * 计算密集型 * 多核CPU * [multiprocessing](https://docs.python.org/3.8/library/multiprocessing.html) * [Joblib](https://joblib.readthedocs.io/en/latest/index.html) * sklearn.externals.joblib * 分布式 * [parallel](https://www.parallelpython.com/) * [Celery](http://www.celeryproject.org/) * [redis](http://www.redis.cn/) * [rabbitmq](http://www.rabbitmq.com/) * [demo test](https://gitee.com/xiwanggit/python_distribute) * GPU * 自己写CUDA内核 * numba * cupy * pycuda * pyopencl * mars * IO密集型:多线程 ### 多种方式测试 #### 重要说明 * 以下测试结果虽有随机性,但是在数量级上还是能说明问题的 * 测试环境 * os:64位win10 * anconda3:1915 64 bit * python:3.7.3 * 测试基本函数 ```python def read_csv_pd(file): df = pd.read_csv(file) df = df.dropna() df = df[df['status'] < 5] return df['OC NO'].tolist() def read_csv_open(file): """ 读取文件内容,返回状态不为空,且小于5的对应id :param file: :return: """ set_id = set() with open(file, encoding='utf8') as f: lines = f.readlines() for num, line in enumerate(lines): if num == 0: continue fields = line.split(',') if len(fields[1]) > 0 and int(fields[1]) < 5: set_id.add(fields[0]) return set_id ``` * 测试一:`for`循环下测试`pd.read_csv`与`open`效率对比 ```python for file in files: mid_set =func(file=file) set_id_2.update(mid_set) ``` * 结论 | 方法 | 耗时 | |--|--| | `read_csv_pd` | 2.1082966327667236 | | `read_csv_open` | 0.9119999408721924 | * 测试二:`for` 循环的`map`与python内循环的比较 ```python for mid_set in map(read_csv_open, files): set_id_1.update(mid_set) [set_id_2.update(mid_set) for mid_set in map(read_csv_open, files)] ``` * 结论 | 方法 | 耗时 | |--|--| | `for` | 0.9460015296936035 | | `[]` | 0.8980069160461426 | * 测试三:测试`concurrent.futures.ProcessPoolExecutor` ```python with ProcessPoolExecutor(3) as pool: for mid_set in pool.map(read_csv_open, files): set_id.update(mid_set) ``` * 耗时:1.4520056247711182 * 测试四:`multiprocessing.Pool` ```python with multiprocessing.Pool(cores) as pool: rs = pool.map(read_csv_open, files) ``` * 耗时:1.6059844493865967 * 测试五:`multiprocessing.Process` ```python for file in files: logger.info(file) t = multiprocessing.Process(target=read_csv_open_test, kwargs={'file': file, 'q': q}) process_arr.append(t) t.start() ``` * 耗时:0.6759865283966064 * 测试六:`pp`单机 ```python job = job_server.submit(pp_test, (files,), (read_csv_open,), ()) ``` * 耗时:0.004994630813598633 * 测试七:`joblib` ```python rs = joblib.Parallel(4)(joblib.delayed(read_csv_open)(file) for file in files) ``` * 耗时:1.2500150203704834