# processPool **Repository Path**: lyxfj/processPool ## Basic Information - **Project Name**: processPool - **Description**: 基于cluster模块开发的进程池,只需要很简单的编码就可以最大程度的发挥多核的性能。 - **Primary Language**: Unknown - **License**: LGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2020-05-03 - **Last Updated**: 2022-09-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # processPool #### 介绍 基于cluster模块开发的进程池,只需要很简单的编码就可以最大程度的发挥多核的性能。 #### 修复 * 0.1.9 进程池溢出问题(严重) #### 调试日志 ${env:NODE_DEBUG='master,worker'}; ##### 任务格式 |属性名|类型|说明| |:--:|:--:|:--:| | id | string | 任务的id,要保证唯一性 | | args | any[] | 会被当做参数注入到工作进程中暴露出的run方法 | | executableFilePath | string | 工作进程的执行路径,应为一个.js文件,该值也作为工种的分类依据,所以传值时建议通过path.resolve()进行格式化 | ##### 进程工种 进程工种是为了更好的细分进程池,将相似度高的任务放到同一工种进程去执行,可以减少进程环境切换带来的额外开销。举个简单的例子: ```js // master.js // 创建一个8个进程的进程池 const processPool = new ProcessPool(8); // 生成100个任务 for (let i = 0; i < 100; i++) { processPool.addTask({ id: Math.random().toString(), args: [], executableFilePath: path.join(__dirname, './worker'), }); } // worker.js import { Worker } from "../src/processPool"; export default class Test extends Worker { private mongoClient; public async before() { // 连接mongodb // this.mongoClient = ..... } public async run() { // 某些数据库操作 // this.mongoClient...update(); } public async after() { // 断开mongodb } } ``` 如果没有进程工种概念,这100个任务每个任务都需要建立一个mongo连接,这样的开销是非常大的,理想状态下,只需要建立8个连接,后续的任务可以复用之前已有的连接。进程工种正是为了解决这个问题,进程工种把任务进行了细分,分为`初始化过程`和`执行过程`,相同工种的`初始化过程`是可以复用的,上面问题的`初始化过程`就是建立mongo连接。详细的流程如下: 1. 主进程启动工作进程后会优先执行`before`方法进行初始化,初始化后的进程可以被同一工种共享。 2. 初始化完成后才会执行`run`方法真正开始处理任务。 3. 任务完成后,告知主进程,主进程会检索待处理任务列表中是否有同一工种的任务,如果存在同一工种的任务,则会分配到该进程,不在执行`before`,直接执行`run`。 4. 主进程没有检索到同一工种的其它任务,则会让该进程进入`退化期`进行工种退化。 ##### 工种退化 每一个新的进程都会被第一个任务绑定工种(比如进程被绑定为A工种),以便后续的同工种任务进行环境复用。假如待处理中没有同工种的任务了,那剩余的B工种任务也无法获取到进程资源。所以需要引入`工种退化`的概念,`工种退化`是指当同一工种不存在未执行的任务时,进程主动释放掉初始化资源,重新回到未绑定工种的状态,进而被其它工种重新绑定的过程。 这样仍会面临一种问题,当任务是即时交叉出现的怎么办?A1执行完后,检测没有An了,于是被下一个任务B1绑定为工种B,此时新的任务A2被创建,B1执行完以后又要被重置为A工种。这样就又回到了最初开销大的问题上。 所以只有退化还不够,还需要引入`工种退化期`的概念,工种退化期可以在延迟进程的退化,尽可能的减少环境切换,可以在创建进程池指定工种退化期的大小: ```js // 指定退化期为3000毫秒 const processPool = new ProcessPool(8, 3000); ``` ##### Agent进程 如果把主进程比喻成老板,工作进程比喻成员工,那Agent进程则为老板的秘书。Agent进程可以用来处理一些定时任务等,它与常规任务并没有本质上的区别。 ##### 进程通信 工作进程之间的通信可以在工作进程中使用`this.sendToWorker()`进行广播。而工作进程与Agent进程可以调用`this.sendToAgent(name)`一对一通信. 主从之间的通信,依旧使用原始的api,例如主进程给工作进程发送通知使用 `worker.send(....)`,工作进程给主进程发送通知使用`process.send(...)`,秉承少即是多的理念,在模块仅对收发消息的格式进行了规范: ```typescript { "type": string, [key: string]?: any, } ``` `type`为通信路由,在消息体中必填,消息体中其它参数可作为传参任意添加。模块内部定义使用了一些type类型,在使用是应特别注意: |type|说明| |:--:|:--:| | task | 主进程将任务分配给工作进程 | | end | 工作进程接受到该通知后会直接退出 | | finish | 工作进程完成(或发生异常导致任务失败)当前任务,向主进程通知 | | error | 工作进程发生异常,上报给主进程 | | check | 保留选项,主进程发送通知要求工作进程上报进度 | | status | 保留选项,所有的工作进程每隔一段时间就上报一下自己当前任务的进度 | 主进程接受工作进程消息时,可以选择cluster模块原始的通信监听,比如: ```javascript cluster.on('message', (worker, message) => { // 解析message中的内容,由于所有的消息要在这里处理,会用到很多的判断条件,可读性低 }); ``` 模块中提供了更简单的方式,可以让代码不那么臃肿: ```javascript // 主进程监听消息 processPool.messageEvent.on('example', ({ pid, msg }, worker) => { console.log(pid, msg); }); // 工作进程发送消息 process.send({ type: 'example', pid: process.pid, msg: 'hello' }); ``` ##### 任务重试机制 任务在执行过程中如果出现了未捕获的异常导致执行失败,那该任务会重新进行分配,这样的流程默认会重复三次,超过三次后任务会被放置在errorTaskList中留存,不再被处理。 如果想改变重试次数,可通过修改retryCount属性来实现: ```javascript // 重试次数改为5次 processPool.retryCount = 55; ``` ##### 工作进程的生命周期 如果工作进程长时间处于空闲状态,那么它会一直占用着系统资源,如果每次任务完成后就杀死进程,新任务创建进程,也会造成系统资源的浪费,为了解决这个问题,需要引入生命周期的概念。 进程空闲一段时间后,会被销毁释放资源,生命周期默认是10秒钟,你可以在创建实例时指定这个值: ```javascript // 进程池上限为8,退化期为3秒,空闲进程生命周期为1秒 const processPool = new ProcessPool(8, 3000, 1000); ``` 特别的,如果设置生命周期为0,那么将不会进行进程的销毁。 ##### 优雅的暂停任务 调用`processPool.stop()`后工作进程会在当前任务完成后进入空闲状态,不再接收新的任务,保证任务不被强制中断。通过`processPool.start()`可以回复执行状态。