@raywhite/workflow
v0.0.1
Published
A DSL for controlling async workflows
Downloads
5
Maintainers
Keywords
Readme
workflow
A DSL (and compiler) for creating trees of dependancies (of async operations). Mostly for running heavy (containerized) workloads in a particular sequence.
About
What's the problem being solved here?
Through trail and error, and evaluation of the many many alternatives, we've settled on a relatively simple paradigm for ETL - some (compute weak) central system should coordinate the triggering and monitoring of (compute heavy) jobs. The triggers are either time based, or a result of some dependency (another job) completing. Sometimes workloads need to run in sequence, where possible (no shared deps) they should be run in parallel as a performance optimization.
This means these workflows can be described as DAGs (Directed Acyclic Graphs), they always proceed in some direction, they do no go backwards or around like a circle in a spiral, like a wheel within a wheel, never ending or beginning on an ever spinning reel, like a snowball down a mountain, or a carnival balloon, like a carousel that's turning - running rings around the moon, like a clock whose hands are sweeping past the minutes of its face and the world is like an apple whirling silently in space, like the circles that you find in the windmills of your mind (to reiterate; they don't do this).
How would one implement on of these DAGs?
Well... they can generally be described using (for instance in Node.JS) Promise
s. The examples in this document use a timeout
operation as a placeholder for some work (for instance starting a VM with a network request and giving it a container to run).
const x = async function () {
try {
await timeout(...a_params) // a.
await Promise.all([ // b
timeout(...b1_params), // b1
timeout(...b2_params), // b2
])
await timeout(...c_params) // c
} catch (err) {
// ... do something with this err?
throw err
}
}
x.then(console.log).catch(console.error)
This looks pretty simple, but actually understanding which job(s) threw (if any), or waiting for all Promise
s to settle before resolving or throwing would add a layer of complexity, and verbosity... this, in and of itself is not terrible - until this is being done multiple times (for multiple workflows), in which case one will find themselves parsing different files will different hand coded sequences every time an issue arrises.
The whole sequence of steps can easily be described by a DSL, and middleware can be used to catch
errors at every node, or log the completion of stages etc.
What does that DSL look like?
It's very similar to the Circle CI DSL, or even that used by GH actions. Some background on it's design could be found in this GH PR
The below is in YAML (the parser expects JS), it describes an identical workflow to the one above.
jobs:
- name: a
operation: timeout
params:
duration: 1
- name: b1
operation: timeout
params:
duration: 2
- name: b2
operation: timeout
params:
duration: 3
- name: c
operation: timeout
params:
duration: 4
workflows:
- x:
sequence:
- a
- b:
parallel:
- b1
- b2
- c
JS API
Jobs / Operations
These are just async function
instances that have been assigned a name
. Each operation
(or middleware) has the signature async (context, params, next)
.
params
are whatever parameters the job was called with.context
contains contextual values, such as a unique identifier for the execution (id
) andstate
, which should be used to transmit state between jobs.next
is intended for use by middleware (it's the next function in the middleware chain).
Consider this example:
const timeout = function (_, params) {
const { duration } = params
return new Promise(function (resolve, reject) {
return setTimeout(function () {
if (Math.random() > 0.9) {
const err = new Error('BOOM!')
return reject(err)
}
return resolve()
}, duration)
})
}
const logger = async function (context, _, next) {
console.log(`log: \`${context.name}\` starting.`)
try {
await next()
console.log(`log: \`${context.name}\` completed successfully.`)
} catch (err) {
console.log(`log: \`${context.name}\` failed.`)
throw err
}
}
Both timeout
and logger
are examples of operation
s. timeout
actually does some work, whereas logger
is intended to be used as a middleware. Both of these functions actually have the same signature, but timeout
does not use context
and next
, and logger
does not use params.
NOTE: That the logger middleware doesn't consume the error, but logs a line and then passes the error on. This is because the middleware above (the ancestors) of this job should also receive the error and be able to deal with it.
Compilation
new Compiler(options)
orcreateCompiler(options)
Instantiates a compiler, designed to be used as a singleton across an application. The only option
is createIdentifier
which is used to create the identifier passed (via context
) to executions.
Below are the instance methods.
compiler.createOperation(name, operation)
Registers an operation under name
. operation
should be a function (possibly a middleware) with the signature discussed above. The compiler will throw when compiledSpec
is called if an unrecognized name
is encountered - so all operations should be created ahead of time.
compiler.compileSpec(spec)
Parses a specification (can be called more than once).
const compiler = new Compiler()
// NOTE: These ops declared above.
compiler.createOperation('logger', logger)
compiler.createOperation('timeout', timeout)
// NOTE: Spec is the YAML above.
const config = YAML.safeLoad(spec)
compiler.compileSpec(spec)
Execution
compiler.execute(name, ...middleware)
Calling this will execute the job name
, and any of it's dependencies in the specified sequence. Note that any job (regardless of whether or not it was declared as a nested job) can be triggered in this way. This method is what is used to actually schedule job executions (see below).
Note that execution will not settle until all internally triggered Promise
s have settled, and where one or more Promise
(s) throw, only the first error will be thrown (middleware can be used for more granular detection of errors).
Compiler.compose(...fns)
This is a static method of the Compiler
constructor.
Mostly for internal use - composes functions with the operation
/ middleware
signature (left to right).
context
Each async
call (even nodes in the tree which specify parallel
or sequence
groups) are called with context
as the first param... that object has the methods getState() => Object
and setState(Object)
, which retrieve or store (using Object.assign
) state that needs to be transmitted between operations. Further that context object will also contain the following metadata properties:
- `root` - the name of the top level workflow.
- `id` - an identifier (created with `createIdentifier`) that is unique per execution.
- `name` - the name of this job / step.
- `type` - `'job' || 'sequence' || 'parallel'`.
Scheduling
Consult the below contrived example of scheduling over HTTP (using koa
).
const compiler = new Compiler()
/**
* ... add operations / middleware, then
* a handler of some sort for HTTP requests.
*/
app.get('/cron/jobs/:name', function (context) {
const { name } = context.params
/**
* NOTE: This runs in background, and so
* returns immediately.
*/
compiler.execute(name).catch(console.error)
context.body = null
return
})
License
• MIT © Ray White, 2019 •