npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@ekifvk/workflow

v2.2.0

Published

Simple Workflow controller

Downloads

68

Readme

Workflow manager

status npm

Workflow manager is a suit of tools to provide workflow control in javascript. It has:

  • Manager to manager workflow
  • Producer (abstract class) to produce data
  • Relation to descripe relationship between each producer

with features:

  • Multiple input / output for all producers
  • Asynchronized data stream
  • Relations can use javascript code to allow full or part data transfer from one producer to another

For example, in this case, run sequence should be ROOT -> 1, 2-1, 3 -> 2-2 -> 4.

How to use

npm install @ekifvk/workflow
  1. Create a WorkflowManager.
const manager = new WorkflowManager();
  1. Create some producers.
const a = new SomeProducer();
const b = new SomeProducer2();
  1. Add relations.
// last param is the condition, use 'input' to access input data, like 'return input.a === "a"'.
Relation.create(a, b, 'return input.a === a');
Relation.create(a, b, input => input.a === a); // or use function
  1. Register entrace.
manager.entrance = a;
manager.output = b; // Optional. Set output will only return output's result for memory optimization.
  1. Run workflow.
if (manager.unreachableNodes.length > 0) { // Check if workflow's DAG has unreachable nodes.
    throw new TypeError(`Has unreachable node!`);
} else {
    manager.run(/* input array */, /* Optional environment parameter */, /* Optional options */).then(...).catch(...);
    // e.g.
    manager.run([0]);

    // With environment variable
    manager.run([0], { skip: 0 });

    // With env and options
    manager.run(0, { skip: 0 }, { singleInput: true, returnLast: true });

    // Way to receive any producer's data, not required
    manager.resultObserver = data => ...;
}

Stop, pause and resume

Workflow cannot pause/stop current running producer, but it can pause/stop before process next producer.

mamager.pause().then(() => {
    console.log('Paused!');
});

setTimeout(() => {
    manager.resume();
}, 3000);

manager.stop().then(() => {
    console.log('Stopped!');
});

How to write producer

All producers must extend Producer, which is an abstract class, they can have these function implementations:

public abstract introduce(): string;
public abstract parameterStructure(): IParameterDescriptor;
public abstract produce(input: any[], params: ParameterTable, context: WorkflowContext): any[] | Promise<any[]>;
protected checkParameters(params: { [key: string]: any }): { [key: string]: any };

Remember all producers are in multi-input (input is array) multi-output (output should be array or array inside promise) mode.

The checkParameters function should check the parameter's type and change them if needed. The introduce function should return producer's description. The parameterStructure function should return a list of Parameter which defined initialize's parameter structure (still it has no use). The _produce function should produce the data and return new data.

One more thing, producers can access their parameter by using params. Of course they can handle parameters by their own, but using params will have an automatic cache & replace if inject by relation(s) is active when running.

Producer should use params.get(/* name */) to get parameter in purpose to support parameter injection. Or it can use this.parameters.get(/* name */) to access parameters without injection.

return input + params.get<number>('number1');

Producer can also access workflow's current state by using third parameter context, like cancel current workflow:

context.cancelled = true

Or find other producer's state:

const isOtherFinished = context.finished.includes('Other ID')

Or access environment parameters (if have):

const skip = context.environment.skip;

For example, a producer that returns { key, value } pairs of any object can be like this:

export class KeyValuePairProducer extends Producer {
    public introduce(): string {
        return 'Read input object\'s key and value and return { key: key, value: value } array';
    }

    public parameterStructure(): IParameterDescriptor {
        return {}; // No parameter
    }

    public produce(input: any[], params: ParameterTable, context: WorkflowContext): any[] | Promise<any[]> {
        const result: any[] = [];
        input.forEach(data => { // Map the data
            const keys = Object.keys(data).forEach(key => {
                result.push({ key: key, value: data[key] }); 
            });
        });
        return result;
    }
}

Pre-defined producers

Under '@ekifvk/workflow/dist/producers'.

Empty producer

Producer that does nothing.

Wrap producer

This producer can create a temporary producer using given code.

const wrap = new WrapProducer();
wrap.initialize({
    handler: (input, params) => input // just like writing a producer
});
// or
wrap.initialize({
    handler: async (input, params) => input // Promise is also supported
});

Data pick producer / Structured data pick producer

Pick data from json object or array, see JPQuery's document for more information.

const picker = new DataPickerProducer();
picker.initialize({
    query: '/times/success[-1 -> -2]'
})

const picker2 = new StructuredDataPickProducer():
picker2.initialize({
    query: {
        lastTime: '/times/success[-1]',
        rawTime: '/times',
        lastTwoTimes: [ '/times/success[-2]', '/times/success[-1]' ]
    }
});

Value convert producer

Use given structure to focus on input data's specific places, then using rules to convert the value. See this producer's parameterStructure() for more information.

const converter = new ValueConvertProducer();
converter.initialize({
    rules: [
        default: true,
        value: i => i * 2
    ],
    structure: {
        data1: {
            data2: [ { data3: true } ]
        }
    }
});

Subworkflow producer / Circulate subworkflow producer

Contains a fully functional workflow, use that workflow's result as producer's return data. The first one only run workflow once, while other one run workflow multiple times.

const circulate = new CirculateSubWorkflowProducer();
const subWorkflow = new WorkflowManager(); // Another workflow
subWorkflow.entrance = new LogProducer('sub1'); // Enrance point
circulate.initialize({
    definition: subWorkflow, // Use that workflow
    env: { skip: 0 }, // Environment
    onResult: (input: IWorkflowResult[]) => { // Function that generate result
        return input.map(v => v.data[0].data[0]);
    },
    onLoop: (env: { [key: string]: any }, context: WorkflowContext, output: IWorkflowResult) => {
        // Change environment
        env.skip++;
        // While's condition. It will run loop two times.
        return env.skip < 3;
    }
});

Static workflow definition

A JSON object can be created to define a workflow or part of workflow (see IWorkflow):

{
    "producers": [], // Optional in each file
    "relations": [], // Optional in each file
    "entrance": "", // Optional in each file, must has one in all files. Entrance producer's ID.
    "output": "" // Optional. Output producer's ID of workflow.
}

Elements in producers should follow this structure (see IProducer):

{
    "id": "String. ID of this producer",
    "type": "String. the type of this producer. Normally if producer class's name is <name>Producer, then <name> is the type of that producer.",
    "parameters": "IParameterDescriptor. This producer's parameters.",
    "description": "String. Optional. Description of this producer.",
    "runningDelay": "Number. Optional. Delayed millisecond before run producer.",
    "replyDelay": "Number. Optional. Delayed millisecond before return producer's result.",
    "proceed": "Function/Function's content in string. Optional. Function runs after proceed data (after applied delay time), it takes one param (input: any[]) and return an array.",
    "errorHandler": "Function/Function's content in string. Optional. Function that handles error. If this function still returns error, workflow will be terminated. Must returns array."
}

Elements in relations should follow this structure (see IRelation):

{
    "from": "String. Parent producer's ID.",
    "to": "String. Child producer's ID.",
    "inject": "String. Optional. Inject parameter name.  Inject parameter means data transfered by this relation will be inject to producer as a temporaty \"initialize\" parameter only for this round of produce.",
    "condition": "Function/Function's content in string. Optional. It takes one param (input: any) and return true/false. Condition to judge the data that pass through this relation (in JavaScript)."
}

Call WorkflowManager.fromDefinitions() and provide all definition objects to get a workflow. The first paramater should be a function, which has a string param as type to return the constructor of Producer. Rest params will be combined to one, please notice that one and only one of them must has entrance property.

WorkflowManager.fromDefinitions(type => {
    switch (type) {
        case 'empty':
            return EmptyProducer;
        case 'datapick':
            return DataPickProducer;
        default:
            throw new TypeError(`Unknow type ${type}`);
    }
}, ...someDefinitions);

Example

var workflow = require('@ekifvk/workflow');

var manager = new workflow.WorkflowManager();

class LogProducer extends workflow.Producer {
    introduce() { return ''; }

    parameterStructure() {
        return {
            log: {
                type: workflow.ParameterType.String,
                optional: true,
                default: '',
                description: 'Log content'
            }
        };
    }

    checkParameters(params: { [key: string]: any }) {
        params.log = params.log || '';
        return params;
    }

    produce(input, activeParams) {
        const content = activeParams.get('log');
        console.log(content);
        return input;
    }
}

var entrance = new LogProducer('entrance');

var test1 = new LogProducer('test1');
var test2 = new LogProducer('test2');
var test3 = new LogProducer('test3');
var test4 = new LogProducer('test4');
var test5 = new LogProducer('test5');

entrance.initialize({ log: 'entrance' });
test1.initialize({ log: '1' });
test2.initialize({ log: '2' });
test3.initialize({ log: '3' });
test4.initialize({ log: '4' });

entrance.relation(new workflow.Relation(entrance, test1));
test1.relation(new workflow.Relation(test1, test2));
test2.relation(new workflow.Relation(test2, test3));
test3.relation(new workflow.Relation(test3, test4));

manager.entrance = entrance;
manager.output = test3;

// Check error
if (manager.unreachableNodes.length > 0) { // unreachableNodes is computed field, call it as less as possible
    throw new TypeError(`Has unreachable node!`);
}

// Run workflow
manager.run(0)
    .then(v => console.log(v))
    .catch(e => console.log('error: ' + e));

// Pause and resume
manager.pause().then(() => {
    setTimeout(() => {
        manager.resume();
    }, 3000);
});