async-task-queue2
v1.0.2
Published
async task queue
Downloads
7
Readme
AsyncTaskQueue2
通常我们会使用 Promise.all 来执行异步任务队列(并发操作),但是它有一个致命的如果某个任务 reject 了会导致整个 Promise.all 立刻 reject。
AsyncTaskQueue2
是一个异步任务队列类,主要用于处理异步任务,它解决 Promise.all 的疼点。并且基于事件驱动,提供串行与并行两种方式。
PS:Nodejs v7.0+
安装
npm install async-task-queue2
API
constructor 构造函数接收一个数组用于初始化队列
run(options) options 配置项。
- async 默认每个项之间不需要等待前一个完成。
- promise 默认使用原生 Promise。
add(promise) 增加一个任务
size() 获取队列大小。
get(index) 根据索引获取 item。
isRun() 返回布尔值,指明队列是否在运行中。
on('start') 开始时,触发事件。
on('success') 每个任务项执行成功时,触发事件。
on('error') 某个任务项执行错误时,触发事件。
on('add') 添加某个任务时,触发事件。
on('complete') 整个任务队列执行完成时,触发事件。回调参数是 (successs,errors)
返回结果信息的结构:
{
func: '', // 执行的任务
result: '', // 结果数据
index: '', // 索引
}
返回错误信息的结构:
{
func: '', // 执行的任务
error: '', // 错误数据
index: '', // 索引
}
基本使用
const AsyncTaskQueue = require('./');
const tasks = [
() => new Promise(rs => setTimeout(() => rs(true), 300)),
() => new Promise(rs => setTimeout(() => rs(true), 200)),
() => new Promise(rs => setTimeout(() => rs(true), 100)),
() => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
() => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
() => new Promise(rs => setTimeout(() => rs(true), 1000)),
() => new Promise(rs => setTimeout(() => rs(true), 1000)),
];
new AsyncTaskQueue(tasks)
.on('start', _ => {
// console.log('start');
})
.on('add', (...arg) => {
// console.log(arg);
})
.on('success', (...arg) => {
// console.log(arg);
})
.on('error', (...arg) => {
// console.log(arg);
})
.on('complete', (ress, errs) => {
console.log(JSON.stringify(ress, null, 3));
console.log(JSON.stringify(errs, null, 3));
})
.run()
;
Promise
AsyncTaskQueue
是基于事件驱动的,不过你可以把它定义为 Promise。
const AsyncTaskQueue = require('./');
const tasks = [
() => new Promise(rs => setTimeout(() => rs(true), 300)),
() => new Promise(rs => setTimeout(() => rs(true), 200)),
() => new Promise(rs => setTimeout(() => rs(true), 100)),
() => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
() => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
() => new Promise(rs => setTimeout(() => rs(true), 1000)),
() => new Promise(rs => setTimeout(() => rs(true), 1000)),
];
function All(tasks) {
return new Promise((resolve, reject) => (
new AsyncTaskQueue(tasks)
.on('complete', (ress, errs) => {
return resolve([ress, errs]);
})
.run()
));
}
(async () => {
const [ress, errs] = await All(tasks);
console.log(JSON.stringify(ress, null, 3));
console.log(JSON.stringify(errs, null, 3));
})();
串行与并行
AsyncTaskQueue
提供串行与并行的执行顺序,实际上它们都是异步进行的。
串行是指每个任务项都需要等前一个完成,并行是指它们不需要等待别人。
const AsyncTaskQueue = require('./');
const tasks = [
() => new Promise(rs => setTimeout(() => rs(true), 300)),
() => new Promise(rs => setTimeout(() => rs(true), 200)),
() => new Promise(rs => setTimeout(() => rs(true), 100)),
() => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
() => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
() => new Promise(rs => setTimeout(() => rs(true), 1000)),
() => new Promise(rs => setTimeout(() => rs(true), 1000)),
];
// 并行
new AsyncTaskQueue(tasks)
.on('complete', (ress, errs) => {
console.log(JSON.stringify(ress, null, 3));
console.log(JSON.stringify(errs, null, 3));
})
.run({ async: true })
;
// 串行
new AsyncTaskQueue(tasks)
.on('complete', (ress, errs) => {
console.log(JSON.stringify(ress, null, 3));
console.log(JSON.stringify(errs, null, 3));
})
.run({ async: !true })
;
动态添加任务
AsyncTaskQueue
支持在运行期间可以动态添加任务。一旦运行完成后(触发 complete 事件),就会重置整个队列为初始化状态(空)。
const AsyncTaskQueue = require('./');
const tasks = [
() => new Promise(rs => setTimeout(() => rs(true), 100)),
];
const atq = new AsyncTaskQueue(tasks);
const start = Date.now();
atq
.on('success', (...arg) => {
// 完成一个任务后,立刻添加一个新的任务到异步任务队列里。
console.log(arg[2] + ', ' + (Date.now() - start) + ' ms');
if (atq.size() < 15) {
atq.add(() => new Promise(rs => setTimeout(() => rs(true), 100)));
}
})
.on('add', (...arg) => {
console.log('当前有 ' + atq.size() + ' 个');
})
.on('complete', (ress, errs) => {
console.log('全部完成了', atq.size()); // 15
setTimeout(() => {
console.log(atq.size()) // 0
});
})
.run()
;
多维度嵌套
AsyncTaskQueue
支持多维度嵌套,你可以尽情的嵌套使用。
const AsyncTaskQueue = require('./');
function All(tasks, id = 0) {
console.time('run ' + id);
return new Promise((resolve, reject) => (
new AsyncTaskQueue(tasks)
.on('complete', (ress, errs) => {
console.timeEnd('run ' + id);
return resolve([ress, errs]);
})
.run()
));
}
const tasks = [...new Array(10)].map(i =>
new Promise(rs => setTimeout(() => rs(true), 1000))
);
All([
All(tasks, 1).then(_ => console.log('async task queue 1 完成了')),
All(tasks, 2).then(_ => console.log('async task queue 2 完成了')),
All(tasks, 3).then(_ => console.log('async task queue 3 完成了')),
All(tasks, 4).then(_ => console.log('async task queue 4 完成了')),
All(tasks, 5).then(_ => console.log('async task queue 5 完成了')),
]).then(res => {
console.log('全部 async task queue 完成了');
});