npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@raywhite/workflow

v0.0.1

Published

A DSL for controlling async workflows

Downloads

5

Readme

workflow

CircleCI

A DSL (and compiler) for creating trees of dependancies (of async operations). Mostly for running heavy (containerized) workloads in a particular sequence.

About

What's the problem being solved here?

Through trail and error, and evaluation of the many many alternatives, we've settled on a relatively simple paradigm for ETL - some (compute weak) central system should coordinate the triggering and monitoring of (compute heavy) jobs. The triggers are either time based, or a result of some dependency (another job) completing. Sometimes workloads need to run in sequence, where possible (no shared deps) they should be run in parallel as a performance optimization.

This means these workflows can be described as DAGs (Directed Acyclic Graphs), they always proceed in some direction, they do no go backwards or around like a circle in a spiral, like a wheel within a wheel, never ending or beginning on an ever spinning reel, like a snowball down a mountain, or a carnival balloon, like a carousel that's turning - running rings around the moon, like a clock whose hands are sweeping past the minutes of its face and the world is like an apple whirling silently in space, like the circles that you find in the windmills of your mind (to reiterate; they don't do this).

How would one implement on of these DAGs?

Well... they can generally be described using (for instance in Node.JS) Promises. The examples in this document use a timeout operation as a placeholder for some work (for instance starting a VM with a network request and giving it a container to run).

const x = async function () {
  try {
    await timeout(...a_params) // a.

    await Promise.all([ // b
      timeout(...b1_params), // b1
      timeout(...b2_params), // b2
    ])

    await timeout(...c_params) // c
  } catch (err) {
    // ... do something with this err?
    throw err
  }

}

x.then(console.log).catch(console.error)

This looks pretty simple, but actually understanding which job(s) threw (if any), or waiting for all Promises to settle before resolving or throwing would add a layer of complexity, and verbosity... this, in and of itself is not terrible - until this is being done multiple times (for multiple workflows), in which case one will find themselves parsing different files will different hand coded sequences every time an issue arrises.

The whole sequence of steps can easily be described by a DSL, and middleware can be used to catch errors at every node, or log the completion of stages etc.

What does that DSL look like?

It's very similar to the Circle CI DSL, or even that used by GH actions. Some background on it's design could be found in this GH PR

The below is in YAML (the parser expects JS), it describes an identical workflow to the one above.

jobs:
  - name: a
    operation: timeout
    params:
      duration: 1

  - name: b1
    operation: timeout
    params:
      duration: 2

  - name: b2
    operation: timeout
    params:
      duration: 3

  - name: c
    operation: timeout
    params:
      duration: 4

workflows:
  - x:
    sequence:
      - a
      - b:
        parallel:
          - b1
          - b2
      - c

JS API

Jobs / Operations

These are just async function instances that have been assigned a name. Each operation (or middleware) has the signature async (context, params, next).

  • params are whatever parameters the job was called with.
  • context contains contextual values, such as a unique identifier for the execution (id) and state, which should be used to transmit state between jobs.
  • next is intended for use by middleware (it's the next function in the middleware chain).

Consider this example:

const timeout = function (_, params) {
  const { duration } = params
  return new Promise(function (resolve, reject) {
      return setTimeout(function () {
        if (Math.random() > 0.9) {
          const err = new Error('BOOM!')
          return reject(err)
        }

        return resolve()
      }, duration)
  })
}


const logger = async function (context, _, next) {
  console.log(`log: \`${context.name}\` starting.`)

  try {
    await next()
    console.log(`log: \`${context.name}\` completed successfully.`)
  } catch (err) {
    console.log(`log: \`${context.name}\` failed.`)
    throw err
  }
}

Both timeout and logger are examples of operations. timeout actually does some work, whereas logger is intended to be used as a middleware. Both of these functions actually have the same signature, but timeout does not use context and next, and logger does not use params.

NOTE: That the logger middleware doesn't consume the error, but logs a line and then passes the error on. This is because the middleware above (the ancestors) of this job should also receive the error and be able to deal with it.

Compilation

  • new Compiler(options) or createCompiler(options)

Instantiates a compiler, designed to be used as a singleton across an application. The only option is createIdentifier which is used to create the identifier passed (via context) to executions.

Below are the instance methods.

  • compiler.createOperation(name, operation)

Registers an operation under name. operation should be a function (possibly a middleware) with the signature discussed above. The compiler will throw when compiledSpec is called if an unrecognized name is encountered - so all operations should be created ahead of time.

  • compiler.compileSpec(spec)

Parses a specification (can be called more than once).

const compiler = new Compiler()

// NOTE: These ops declared above.
compiler.createOperation('logger', logger)
compiler.createOperation('timeout', timeout)

// NOTE: Spec is the YAML above.
const config = YAML.safeLoad(spec)

compiler.compileSpec(spec)

Execution

  • compiler.execute(name, ...middleware)

Calling this will execute the job name, and any of it's dependencies in the specified sequence. Note that any job (regardless of whether or not it was declared as a nested job) can be triggered in this way. This method is what is used to actually schedule job executions (see below).

Note that execution will not settle until all internally triggered Promises have settled, and where one or more Promise(s) throw, only the first error will be thrown (middleware can be used for more granular detection of errors).

  • Compiler.compose(...fns)

This is a static method of the Compiler constructor.

Mostly for internal use - composes functions with the operation / middleware signature (left to right).

  • context

Each async call (even nodes in the tree which specify parallel or sequence groups) are called with context as the first param... that object has the methods getState() => Object and setState(Object), which retrieve or store (using Object.assign) state that needs to be transmitted between operations. Further that context object will also contain the following metadata properties:

- `root` - the name of the top level workflow.
- `id` - an identifier (created with `createIdentifier`) that is unique per execution.
- `name` - the name of this job / step.
- `type` - `'job' || 'sequence' || 'parallel'`.

Scheduling

Consult the below contrived example of scheduling over HTTP (using koa).

const compiler = new Compiler()

/**
 * ... add operations / middleware, then
 * a handler of some sort for HTTP requests.
 */

app.get('/cron/jobs/:name', function (context) {
  const { name } = context.params

  /**
   * NOTE: This runs in background, and so
   * returns immediately.
   */
  compiler.execute(name).catch(console.error)
  context.body = null
  return
})

License

MIT © Ray White, 2019 •