npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

daat-coordinator

v1.0.6

Published

A Directed Acyclic Asynchronous Task (DAAT) processor

Downloads

16

Readme

Coordinator

daat-coordinator is a node.js module for specifying asynchronous operations in a Directed Acyclic Graph (DAG) structure. That is sequences of tasks that must be completed, where some may depend on the results of others (without cycles).

You can review the technical documentation here

A Simple, Motivating Example: Concurrent Async Operations

The basic use case motivating this module was the following: We have a web app backend server that has to make several (async) database calls for a particular API route; but we want to reply to the calling client with "success" only if all database updates were succesful, and with "failed" if any failed (also negating any partial updates). In this case it is possible to nest callbacks, but after 3 or 4 updates this starts to get confusing especially when including logic for rolling back successful updates. Nesting callbacks also imposes a logic order on the program flow that need not formally exist for properly concurrent operations. Basically, we don't care which happens (or finishes) first, just that they both finish (and if they both succeeded).

Coordinator allows one to do this as follows: Say we write functions op1ex/op1rb to execute/rollback operation "1", op2ex/op2rb to execute/rollback operation "2", etc. Suppose also we have functions onFailure and onSuccess that we want to execute when any operation fails or every operation succeeds, respectively. Then the code

const Coordinator = require( 'daat-coordinator' );
var C = new Coordinator();
C.addStage( "op1" , op1ex , op1rb );
C.addStage( "op2" , op2ex , op2rb );
C.run( onFailure , onSuccess );

will execute operations "1" and "2" concurrently, calling onFailure if any operation fails or onSuccess if every operation succeeds. Moreover, if op1ex fails but op2ex succeeds, op2rb will be called and vice versa. Besides being a bit simpler than (many) nested callbacks, the explicit separation between operations "1" and "2" can also make it easier to program and debug them.

Other Features

Some other features:

  • Retries: C.addStage( "op1" , op1ex , op1rb , 2 ) will retry this operation twice before declaring "failure"
  • Prerequisites: C.addStage( "op2" , op2ex , op2rb , 0 , ["op1"] ) declares that operation "2" has to follow operation "1"'s success (and that we should not retry on failure)
  • Intermediate results: C.addStage( "op2" , op2ex , op2rb , 0 , ["op1"] , op2prep , op2rbrp ) will run op2prep on results of operation 1 and pass those to op2ex for execute, and op2rbrp on the results (including from op2ex) on rollback. These functions (op2prep/op2rbrp) have the prototype ( data , prereqs , results ) => { ... } where data will be the data passed to this stage and results will be the current coordinator results object for all stages.
  • Initial data: C.addStage( "op2" , op2ex , op2rb , 0 , ["op1"] , op2prep , op2rbrp , op2data ) will pass op2data to op2ex when it executes
  • Intermediate Data Transformation: Actually, op2prep acts on both op2data and the results of prior operations to result in a single data object to pass to op2ex
  • Warnings: You can declare success in your execution routines even if you want to "throw" a warning, and the Coordinator object (C) will keep track of these warnings
  • Results: Your Coordinator object (C) will store all the results from each operation, if there are any, and you can access these after processing is complete (i.e., in the onSuccess callback you pass to C.run)
  • Different Data: If you want to re-run a given set of stages with different initial stage data, you can just add that to the C.run call: C.run( onFailure , onSuccess , newData ).
  • Checking for Cycles: You can check if your stages are a DAG (a graph with no cycles) using the method C.isDAG(). This currently implements the tarjan-graph module.

If you haven't guessed by now, C.addStage has the prototype

( key , execute , rollback , retries , prereqs , prepare , repair , data ) => { ... }

You can pass these arguments independently, as shown above, or you can pass them as a single object with these values (literally) as keys. The only real condition is that execute (or key.execute if you pass an object) is a function. There is also a routine C.addStages that takes either an Object or Array argument to add multiple stages.

We haven't mentioned how tasks can "fail" or "succeed". Really, this is up to you in your execution (and rollback) routines. Coordinator learns about this through callbacks passed to the execution and rollback routines, which must both have the prototype

( data , failure , warning , success ) => { ... }

Coordinator expects these routines to call failure(err) (with an error argument) when there is an error, warning(warn,res) when there is a conditional success (with a warning and result objects), and success(res) when there is success (with a result object). Otherwise they can do what you want (asynchronously).

Sequential Operations with Coordinator

Let's say you have some N (async) operations to run, in sequence. This is challenging only in that you want one to start only after another has completed, and you don't know exactly when that happens. Suppose their execution routines are in an N-element array opex, rollbacks in an N-element array oprb, and operation n depends only on n-1 (for n > 0). Then you could setup Coordinator to do all of these with

const Coordinator = require( 'daat-coordinator' );
var C = new Coordinator() , prereq = [];
opex.forEach( (o,i) => {
	C.addStage( "op" + (i+1) , o , oprb[i] , 0 , prereq );
	prereq = [ "op" + (i+1) ];
} )

C.run( onFailure , onSuccess );

For a silly but tangible example, suppose we wanted to use Coordinator to sum up the values in a key-value store:

X = { key1 : value1 , ... , keyN : valueN }

We could then do

const Coordinator = require( 'daat-coordinator' );

const sumr  = ( d , f , w , s ) => { 
	setTimeout( () => {
		if( d === Object(d) ) { s( d.sum + d.value ); } else { s( d ); };
	} , 10 )
};

const prep = ( d , p , r ) => ( { sum : r[p[0]] , value : d } );

var C = new Coordinator() , p = [] , keys = Object.keys( X );
for( var i = 0 ; i < keys.length ; i++ ) {
	var key = keys[i];
	C.addStage( key , sumr , () => {} , 0 , p , prep , null , X[key] );
	p = [ key ];
}
C.run( onFailure , () => { console.log( "success: " + C.getResults()[keys[keys.length-1]] ) } );

The specific values { "bob" : 2 , "sue" : 3 , "joe" : 4 } ultimately yeilds

success: 9

Note that our sumr callback here does something weird: it sets a timeout to execute a simple function. This is, actually, required (as of now), even if we set the timeout to 0 instead of 10. Coordinator is intended for asynchronous tasks that act in some way like "forked" processes. It is not suitable for, and can (as of now) get confused by, traditional sequential programs as executors.

Testing with Coordinator

Another decent use case for Coordinator is testing: You can run a sequence of possibly interdependent (unit) tests, failing at the first failure. This can be useful for testing APIs involving network activity, like database updates/queries, such as the following test sequence (in "pseudocode"):

  • put: PUT /api/object { ... }; if this returns 200 (and an ID) succeed passing the new object ID, otherwise fail. Delete the object on rollback.
  • check: HEAD /api/object/:id; if this returns 200 succeed, otherwise fail.
  • get: GET /api/object/:id; if this returns 200 and the data succeed, otherwise fail.
  • delete: DELETE /api/object/:id; if this returns 200 succeed, otherwise fail.
  • recheck: HEAD /api/object/:id; if this returns 200 fail, otherwise succeed.

This sequence can have a slightly more complicated sequence of prerequisites, those working backwards in the following DAG:

'put' -> ['check','get'] -> 'delete' -> 'recheck'

There is an example of this in test/apiunittest.js.

To Do

  • rollback with prequisites, using partial execution graph in reverse.

Contact

W. Ross Morrow

Stanford GSB DARC Team