npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

x-flow

v2.4.0

Published

x --> flow

Downloads

7

Readme

x-flow

node async x --> flow

安装

npm install x-flow

介绍

这是一个小型的异步流程控制工具,不同于Async的地方在于用链式调用实现,使用起来方便、简洁灵活性更高。

实现思路是基于koa的中间件思路,串行配置的步骤依次执行,并且每一个并行分支都拥有独立的运行上下文,用于步骤间数据共享,并行间运行环境相互隔离,并在执行完每次并行分支时返回该分支的运行环境

使用说明

该库有三个对象, x对象、flow与context对象。

#####x对象为导出的模块对象,对象有三个函数

  • x.begin([callback])函数用来返回一个flow对象,接受一个可省略的回调参数,效果与fork的参数一致(详情请看flow.fork([callback])),返回的flow对象默认会开启一个fork,所以可以直接配置step,不必重新再执行一次fork,当然如果你喜欢也可以fork
  • x.each(arrayOrObject, callback1, callback2)是并行循环,第一个参数为要遍历的对象可以为数组、对象、字符串、数字.第二个参数是callback1(valueOrItem, keyOrIndex)回调函数,接受遍历的对象的键值两个参数,执行每次循环的任务。 第三个参数callback2(callback)回调函数,用于循环执行完成后的结果处理,参数中的callback(err, results)接受异常与结果信息。
  • x.eachAsync(arrayOrObject, callback1, callback2)是串行循环。与each区别在于循环是在同一个fork中的context下依次执行的。而each每次循环都会开启一个fork并创建一个context

#####flow对象为核心对象,对象有三个函数

  • flow.fork([callback])用来开启一个并行执行的分支,并隐式创建了一个context对象用于该分支流步骤间的数据共享与其他操作(具体请看下面context介绍),它接受一个可省略的回调参数,效果与step一样,它会当做该fork下的第一个步骤进行配置,为了代码整洁美观,建议步骤还是使用step进行配置,此方式只适用为了像我一样懒成病得人
  • flow.step(callback)函数负责为分支配置需要执行的步骤,接受一个不可省略的回调函数参数callback(context),并且会将当前分支的执行环境注入到回调函数的参数内
  • flow.exec(callback)开始执行整个流,并获取执行结果. 接受一个不可省略的回调参数callback(err, results),当流执行完时,如果发生异常会传递给err参数,如正常返回,则将各个分支流中的context对象整合成一个数组赋值给results参数。

#####context对象为环境对象,每一个step的回调函数中都注入了此对象参数(同时回调函数的this默认也会指向它,根据喜好自行选择使用),它负责共享分支流中各个步骤间的数据,与传递异常,步骤跳转,结束流的功能。他有四个函数

  • context.next()执行下一个步骤
  • context.go(number)跳进某个步骤,number会以当前步骤为中心负数为向上跳,正数为向下跳,例如:go(0)相当于递归,go(1)相当于context.next()go(-1)执行上一个步骤
  • context.err(error)传递异常并终止流,error为异常对象
  • context.end()结束分支流。

===========================注意================================

  • 所有的forkstep函数均返回自身的flow对象用于链式调用

  • 对于结果集results中的context对象是不包含它内置的四个函数的。

  • eacheachSync是为了使用方便简单封装的两个循环结构,内部其实是使用forkstepunderscore的each函数实现的,所以用forkstep组合使用是可以处理各种情况下的需求的。并且对于结果集,each使用循环fork的方式,所以results是多个context对象的数组,而eachAsync使用循环step的方式,results是只有一个context对象的数组

==============================================================

并行与串行结合部分

var x = require("x-flow");
 
/**
 *  测试并行与串行结合部分
 */

var obj = {
    // 定义一个有上下文的异步函数
    asyncFunc: function(forkNum, funcNum, callback) {
        setTimeout(function() {
            console.log("第" + forkNum + "个分支的第" + funcNum + "异步函数开始执行");
            callback(null, "第" + forkNum + "个分支的第" + funcNum + "异步函数执行结果");
        }, 100);
    }
};


// 开始一个流,返回flow对象
x.begin()
    // 指定异步函数执行上下文
    .step(function(context) {
        obj.asyncFunc(1, 1, function(err, result) {
            if (err) {
                return context.err(err); // 传递异常
            }

            context.step1 = true; // 步骤间传递数据
            context.next(); // 进入下一步
        });
    })
    .step(function(context) {
        obj.asyncFunc(1, 2, function(err, result) {
            if (err) {
                return context.err(err); // 传递异常
            }

            console.log("step1:", context.step1); // 打印上一步的数据  step1: true
            context.step2 = true;
            context.end(); // 结束此分支流
        });
    })
    // 开启一个分支。分支将与begin主线并行执行
    .fork()
    .step(function(context) {
        obj.asyncFunc(2, 1, function(err, result) {
            if (err) {
                return context.err(err); // 传递异常
            }

            context.step1 = true;
            context.go(1); // 使用go进入下一步 
        });
    })
    .step(function(context) {
        obj.asyncFunc(2, 2, function(err, result) {
            if (err) {
                return context.err(err); // 传递异常
            }

            context.step2 = true;
            context.end();
        });
    })
    // 开始执行
    .exec(function(err, results) {
        if (err) {
            return console.log("err:", err); // 如果有错误打印错误信息
        }
        console.log("results:", results); // 正常时打印结果集 results: [ Context { step1: true, step2: true }, Context { step1: true, step2: true } ]
    });

each与eachSync部分

var x = require("x-flow");

/**
 * 测试each与eachSync部分
 */

var objEach = {
    asyncFunc: function(value, index, callback) {
        setTimeout(function() {
            console.log("第" + index + "此执行");
            // callback(new Error("test err!"), value);
            callback(null, value);
        }, 100);
    }
};

// 同步遍历
x.eachSync(["第一次返回", "第二次返回", "第三次返回"], function(item, index) {
    var context = this;  // 获取执行环境
    objEach.asyncFunc(item, index, function(err, result) {
        if (err) {
            return context.err(err);
        }

        context.results = context.results || [];   // 初始化结果集
        context.results.push(result);
        context.next();
    });
}, function(err, results) {
    if (err) {
        return console.log("err:", err.message); // err: test err!
    }

    console.log("results:", results); // results: [ Context { results: [ '第一次返回', '第二次返回', '第三次返回' ] } ]

});

// 异步遍历
x.each({
    one: "第一次返回",
    two: "第二次返回",
    three: "第三次返回"
}, function(value, key) {
    var context = this;   // 获取执行环境
    objEach.asyncFunc(value, key, function(err, result) {
        if (err) {
            return context.err(err);
        }

        context.result = result;
        context.end();
    });
}, function(err, results) {
    if (err) {
        return console.log("err:", err);    // err: test err!
    }
    console.log("results", results);    // results [ Context { result: '第一次返回' },Context { result: '第二次返回' },Context { result: '第三次返回' } ]
});