npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

async-task-queue2

v1.0.2

Published

async task queue

Downloads

7

Readme

AsyncTaskQueue2

通常我们会使用 Promise.all 来执行异步任务队列(并发操作),但是它有一个致命的如果某个任务 reject 了会导致整个 Promise.all 立刻 reject。 AsyncTaskQueue2 是一个异步任务队列类,主要用于处理异步任务,它解决 Promise.all 的疼点。并且基于事件驱动,提供串行与并行两种方式。

PS:Nodejs v7.0+

安装

npm install async-task-queue2

API

  • constructor 构造函数接收一个数组用于初始化队列

  • run(options) options 配置项。

    • async 默认每个项之间不需要等待前一个完成。
    • promise 默认使用原生 Promise。
  • add(promise) 增加一个任务

  • size() 获取队列大小。

  • get(index) 根据索引获取 item。

  • isRun() 返回布尔值,指明队列是否在运行中。

  • on('start') 开始时,触发事件。

  • on('success') 每个任务项执行成功时,触发事件。

  • on('error') 某个任务项执行错误时,触发事件。

  • on('add') 添加某个任务时,触发事件。

  • on('complete') 整个任务队列执行完成时,触发事件。回调参数是 (successs,errors)

返回结果信息的结构:

{
    func: '',    // 执行的任务
    result: '',  // 结果数据
    index: '',   // 索引
}

返回错误信息的结构:

{
    func: '',    // 执行的任务
    error: '',   // 错误数据
    index: '',   // 索引
}

基本使用

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 300)),
    () => new Promise(rs => setTimeout(() => rs(true), 200)),
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
];

new AsyncTaskQueue(tasks)
    .on('start', _ => {
        // console.log('start');
    })
    .on('add', (...arg) => {
        // console.log(arg);
    })
    .on('success', (...arg) => {
        // console.log(arg);
    })
    .on('error', (...arg) => {
        // console.log(arg);
    })
    .on('complete', (ress, errs) => {
        console.log(JSON.stringify(ress, null, 3));
        console.log(JSON.stringify(errs, null, 3));
    })
    .run()
;

Promise

AsyncTaskQueue 是基于事件驱动的,不过你可以把它定义为 Promise。

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 300)),
    () => new Promise(rs => setTimeout(() => rs(true), 200)),
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
];

function All(tasks) {
    return new Promise((resolve, reject) => (
        new AsyncTaskQueue(tasks)
            .on('complete', (ress, errs) => {
                return resolve([ress, errs]);
            })
            .run()
    ));
}

(async () => {
    const [ress, errs] = await All(tasks);
    console.log(JSON.stringify(ress, null, 3));
    console.log(JSON.stringify(errs, null, 3));
})();

串行与并行

AsyncTaskQueue 提供串行与并行的执行顺序,实际上它们都是异步进行的。 串行是指每个任务项都需要等前一个完成,并行是指它们不需要等待别人。

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 300)),
    () => new Promise(rs => setTimeout(() => rs(true), 200)),
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
];
// 并行
new AsyncTaskQueue(tasks)
    .on('complete', (ress, errs) => {
        console.log(JSON.stringify(ress, null, 3));
        console.log(JSON.stringify(errs, null, 3));
    })
    .run({ async: true })
;
// 串行
new AsyncTaskQueue(tasks)
    .on('complete', (ress, errs) => {
        console.log(JSON.stringify(ress, null, 3));
        console.log(JSON.stringify(errs, null, 3));
    })
    .run({ async: !true })
;

动态添加任务

AsyncTaskQueue 支持在运行期间可以动态添加任务。一旦运行完成后(触发 complete 事件),就会重置整个队列为初始化状态(空)。

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
];

const atq = new AsyncTaskQueue(tasks);
const start = Date.now();

atq
    .on('success', (...arg) => {
        // 完成一个任务后,立刻添加一个新的任务到异步任务队列里。
        console.log(arg[2] + ', ' + (Date.now() - start) + ' ms');
        if (atq.size() < 15) {
            atq.add(() => new Promise(rs => setTimeout(() => rs(true), 100)));
        }
    })
    .on('add', (...arg) => {
        console.log('当前有 ' + atq.size() + ' 个');
    })
    .on('complete', (ress, errs) => {
        console.log('全部完成了', atq.size());   // 15
        setTimeout(() => {
            console.log(atq.size())             // 0
        });
    })
    .run()
;

多维度嵌套

AsyncTaskQueue 支持多维度嵌套,你可以尽情的嵌套使用。

const AsyncTaskQueue = require('./');

function All(tasks, id = 0) {
    console.time('run ' + id);
    return new Promise((resolve, reject) => (
        new AsyncTaskQueue(tasks)
            .on('complete', (ress, errs) => {
                console.timeEnd('run ' + id);
                return resolve([ress, errs]);
            })
            .run()
    ));
}

const tasks = [...new Array(10)].map(i =>
    new Promise(rs => setTimeout(() => rs(true), 1000))
);

All([
    All(tasks, 1).then(_ => console.log('async task queue 1 完成了')),
    All(tasks, 2).then(_ => console.log('async task queue 2 完成了')),
    All(tasks, 3).then(_ => console.log('async task queue 3 完成了')),
    All(tasks, 4).then(_ => console.log('async task queue 4 完成了')),
    All(tasks, 5).then(_ => console.log('async task queue 5 完成了')),
]).then(res => {
    console.log('全部 async task queue 完成了');
});