@fangjinlyx/processpool
v0.1.9
Published
#### 介绍 基于cluster模块开发的进程池,只需要很简单的编码就可以最大程度的发挥多核的性能。
Downloads
2
Readme
processPool
介绍
基于cluster模块开发的进程池,只需要很简单的编码就可以最大程度的发挥多核的性能。
修复
- 0.1.9
进程池溢出问题(严重)
调试日志
${env:NODE_DEBUG='master,worker'};
任务格式
|属性名|类型|说明| |:--:|:--:|:--:| | id | string | 任务的id,要保证唯一性 | | args | any[] | 会被当做参数注入到工作进程中暴露出的run方法 | | executableFilePath | string | 工作进程的执行路径,应为一个.js文件,该值也作为工种的分类依据,所以传值时建议通过path.resolve()进行格式化 |
进程工种
进程工种是为了更好的细分进程池,将相似度高的任务放到同一工种进程去执行,可以减少进程环境切换带来的额外开销。举个简单的例子:
// master.js
// 创建一个8个进程的进程池
const processPool = new ProcessPool(8);
// 生成100个任务
for (let i = 0; i < 100; i++) {
processPool.addTask({
id: Math.random().toString(),
args: [],
executableFilePath: path.join(__dirname, './worker'),
});
}
// worker.js
import { Worker } from "../src/processPool";
export default class Test extends Worker {
private mongoClient;
public async before() {
// 连接mongodb
// this.mongoClient = .....
}
public async run() {
// 某些数据库操作
// this.mongoClient...update();
}
public async after() {
// 断开mongodb
}
}
如果没有进程工种概念,这100个任务每个任务都需要建立一个mongo连接,这样的开销是非常大的,理想状态下,只需要建立8个连接,后续的任务可以复用之前已有的连接。进程工种正是为了解决这个问题,进程工种把任务进行了细分,分为初始化过程
和执行过程
,相同工种的初始化过程
是可以复用的,上面问题的初始化过程
就是建立mongo连接。详细的流程如下:
- 主进程启动工作进程后会优先执行
before
方法进行初始化,初始化后的进程可以被同一工种共享。 - 初始化完成后才会执行
run
方法真正开始处理任务。 - 任务完成后,告知主进程,主进程会检索待处理任务列表中是否有同一工种的任务,如果存在同一工种的任务,则会分配到该进程,不在执行
before
,直接执行run
。 - 主进程没有检索到同一工种的其它任务,则会让该进程进入
退化期
进行工种退化。
工种退化
每一个新的进程都会被第一个任务绑定工种(比如进程被绑定为A工种),以便后续的同工种任务进行环境复用。假如待处理中没有同工种的任务了,那剩余的B工种任务也无法获取到进程资源。所以需要引入工种退化
的概念,工种退化
是指当同一工种不存在未执行的任务时,进程主动释放掉初始化资源,重新回到未绑定工种的状态,进而被其它工种重新绑定的过程。
这样仍会面临一种问题,当任务是即时交叉出现的怎么办?A1执行完后,检测没有An了,于是被下一个任务B1绑定为工种B,此时新的任务A2被创建,B1执行完以后又要被重置为A工种。这样就又回到了最初开销大的问题上。
所以只有退化还不够,还需要引入工种退化期
的概念,工种退化期可以在延迟进程的退化,尽可能的减少环境切换,可以在创建进程池指定工种退化期的大小:
// 指定退化期为3000毫秒
const processPool = new ProcessPool(8, 3000);
Agent进程
如果把主进程比喻成老板,工作进程比喻成员工,那Agent进程则为老板的秘书。Agent进程可以用来处理一些定时任务等,它与常规任务并没有本质上的区别。
进程通信
工作进程之间的通信可以在工作进程中使用this.sendToWorker()
进行广播。而工作进程与Agent进程可以调用this.sendToAgent(name)
一对一通信.
主从之间的通信,依旧使用原始的api,例如主进程给工作进程发送通知使用 worker.send(....)
,工作进程给主进程发送通知使用process.send(...)
,秉承少即是多的理念,在模块仅对收发消息的格式进行了规范:
{
"type": string,
[key: string]?: any,
}
type
为通信路由,在消息体中必填,消息体中其它参数可作为传参任意添加。模块内部定义使用了一些type类型,在使用是应特别注意:
|type|说明|
|:--:|:--:|
| task | 主进程将任务分配给工作进程 |
| end | 工作进程接受到该通知后会直接退出 |
| finish | 工作进程完成(或发生异常导致任务失败)当前任务,向主进程通知 |
| error | 工作进程发生异常,上报给主进程 |
| check | 保留选项,主进程发送通知要求工作进程上报进度 |
| status | 保留选项,所有的工作进程每隔一段时间就上报一下自己当前任务的进度 |
主进程接受工作进程消息时,可以选择cluster模块原始的通信监听,比如:
cluster.on('message', (worker, message) => {
// 解析message中的内容,由于所有的消息要在这里处理,会用到很多的判断条件,可读性低
});
模块中提供了更简单的方式,可以让代码不那么臃肿:
// 主进程监听消息
processPool.messageEvent.on('example', ({ pid, msg }, worker) => {
console.log(pid, msg);
});
// 工作进程发送消息
process.send({ type: 'example', pid: process.pid, msg: 'hello' });
任务重试机制
任务在执行过程中如果出现了未捕获的异常导致执行失败,那该任务会重新进行分配,这样的流程默认会重复三次,超过三次后任务会被放置在errorTaskList中留存,不再被处理。 如果想改变重试次数,可通过修改retryCount属性来实现:
// 重试次数改为5次
processPool.retryCount = 55;
工作进程的生命周期
如果工作进程长时间处于空闲状态,那么它会一直占用着系统资源,如果每次任务完成后就杀死进程,新任务创建进程,也会造成系统资源的浪费,为了解决这个问题,需要引入生命周期的概念。 进程空闲一段时间后,会被销毁释放资源,生命周期默认是10秒钟,你可以在创建实例时指定这个值:
// 进程池上限为8,退化期为3秒,空闲进程生命周期为1秒
const processPool = new ProcessPool(8, 3000, 1000);
特别的,如果设置生命周期为0,那么将不会进行进程的销毁。
优雅的暂停任务
调用processPool.stop()
后工作进程会在当前任务完成后进入空闲状态,不再接收新的任务,保证任务不被强制中断。通过processPool.start()
可以回复执行状态。