rvl-pipe
v2.2.1
Published
Functions for easier async pipelining/composing of promises
Downloads
5
Readme
rvl-pipe
Helper functions for easier async pipelining/composing of promises
Promises are a simple (and cool) way of doing async programming in JavaScript. A promise encapsulates a future result, async in nature, that has only 3 possible states: Pending, Resolved, Rejected. Once the promise is resolved we can query its result, if it was rejected then catch and process the error accordingly.
We can run as many promises in parallel or chain them as we want. Like a pipeline.
This library contains helper functions to do simple pipelining of promises reusing a context object that will be pass down and can be queried or modified.
Version 2.0.0 the one with Typings (in TypeScript)
This version is the first completely ported to TypeScript so the build process exports the type definitions.
We have 3 main function definitions
export type Context = { [key: string]: any }
export type AsyncFunction = (ctx?: Context) => Promise<Context>
export type SyncFunction = (ctx: Context) => any
export type SyncPredicate = (ctx: Context) => boolean
Version 1.4.0 New functions
Version 1.4.0 introduces new functions: assign
, composer
and loop
assign
: For assign to a prop based on the result of a async functioncomposer
: Performs a merge on the results of a set of async functionsloop
: Performs a async loop based on a predicate and a body
How it works?
rvl-pipe
provides a set of functions to help you create, interact and
query the context object using this approach. We have 2 types of functions:
composability and querying, and also some error handling helpers.
You can require/import the helpers you need.
const { should, each, iff, prop, props } = require('rvl-pipe')
// as ES6 Modules
import { should, each, iff, prop, props } from 'rvl-pipe'
Composition functions
Composition functions can be always described as:
const createStep = (params) => AsyncFunction
So, basically functions that return a function that only takes the context
return a Promise with the context
after some async transformation. Or a rejection.
This steps can then be reused in a async pipeline, where the context
object gets passed down.
We already provide some helper functions for several common cases like parallel execution, conditionals, noops, etc.
each
: Composition function that runs other composition functions in sequence, passing the context to all of them and returning the resulting context.
const runAll = each(
asyncTask1(...),
asyncTask2(...)
)
return runAll({}) // {} is the starting context
.then(context => {
// context will have the resulting context after asyncTask2
})
each
is very handy to make reusable composition of common steps.
const myAsyncStep = each(
doSomeAsync1(...),
doSomeASync2(...)
)
return each(
myAsyncStep(...),
otherASyncStep(),
...
yetAnotherAsyncStep(),
myAsyncStep()
)()
all
: Same as each but running all task in parallel and merging the resulting contexts.
return all(
parallelAsyncTask1(...),
parallelAsyncTask2(...)
)({}) // Starting context
iff
: Performs a conditional step passing a condition (or predicate) and a async function. Also accepts an else async function.
return iff(
prop('account'),
asyncTask(...)
)({})
// Else is possible too
return iff(
prop('account'),
asyncTask(...),
elseAsyncTask(...)
)()
// Negation (using Ramda's complement)
const { complement } = require('ramda')
return iff(
complement(prop('account')),
asyncTask(...)
)()
should
: Performs a validation check for a property, it fails with aContextError
if the predicate is not satisfied.
return each(
should(prop('name')), // passes
should(prop('last')), // throws ContextError
)({name: 'John'})
// prop is a query function, check down for documentation
// You can also define your custom error names
return each(
should(prop('name')), // passes
should(prop('last'), 'InvalidLastName'), // throws ContextError(message='InvalidLastName', context=context)
)({name: 'John'})
noop
: This is a no brainer, does nothing, just returns the context.
return noop()({})
set
: Will add/merge data into the context. The parameters to this function can be a static object where a simple merge will be performed or a query function where the value depends on the context being passed.
return each(
set(always({ name: 'John' })), // statically
set(context => ({ last: 'Doe' })), // dinamically
set(context => ({ initial: context.name[0] }))
)({ name: 'Mary' })
// returns { name: 'John', last: 'Doe', initial: 'J' }
assign
: Will assign a prop based on the result of an async function
return each(
assign('response', fetch('http://api.server.com/status'))
)()
// response will have the response of the async request
loop
: Very simple loop execution of async function based on an async predicate.
return each(
loop(
ctx => ctx.index < 10,
each(
doSomethingElse(),
ctx => {
ctx.index += 1
return ctx
}
)
)
)
Querying functions
Querying functions can be used to pull data from the context and also allow to perform logical operations on them.
equals
: returns the triple equality of two query functions
return each(
iff(
equals(prop('a'), always(3)), // checking a prop with a static value
doAsyncTask(...)
),
iff(
equals(prop('a'), prop('b')), // checking 2 props dynamically
doAsyncTask(...)
)
)({ a: 3, b: 3 })
always
: is a helper function that returns the same value of the first parameter passed
const name = always('John')
const b = name() // John
every
: Evaluates true if all values or predicates are true
return iff(
every(prop('a'), prop('b'), always(true), always(10)), // a and b must evaluate truthy for doAsyncTask to run
doASyncTask(...)
)()
some
: Same as every but we only need one to be true
return iff(
some(prop('a'), prop('b')), // a or b should be truthy for doAsyncTask to run
doAsyncTask(...)
)()
prop
: returns the query function for the value of a prop. It can be nested via dots. This is only a property lookup in a object.
const getUserName = prop('user.name')
const name = getUserName({ user: { name: 'John' }}) // name === John
return iff(
getUserName,
doAsyncTask(...)
)()
props
: Helper to construct objects where props can be static or dynamically evaluated
const createAccountDocument = props({
user: {
name: prop('auth.username'),
token: prop('auth.token'),
newUser: true,
roles: ['admin', prop('auth.forRole')],
team: {
name: prop('auth.team') // nested props too :)
}
}
})
return each(
doAsyncAuth(), // Asumming this adds prop 'auth' to context
set(createAccountDocument),
saveToDB()
)()
composer
: Composer merges the result of a set of async functions, very useful to craft objects where some properties are added based on predicates.
const createObject = composer(
always({ name: 'John' }),
iff(
equals(prop('user'), always(1)),
always({ last: 'Doe' }),
always({})
),
iff(
equals(prop('user'), always(2)),
always({ last: 'Perez' }),
always({})
)
)
const a = createObject({ user: 1 }) // { name: 'John', last: 'Doe' }
const b = createObject({ user: 2 }) // { name: 'John', last: 'Perez' }
const c = createObject({ user: 3 }) // { name: 'John' }
createTracer
: Sometimes we need to trace some properties on the context. This function creates a no-op operation that performs that side-effect for us in a plugable way. The provided tracer should receive two paramspath
andvalue
const logger = createTracer((path, value) => {
// perform a log of the path and the value on our logging service
})
return each(
doAsyncAuth(), // Asumming this adds prop 'auth' to context
set(createAccountDocument),
logger('user'),
saveToDB()
)()
consoleTracer
is a pre-made simple console.log tracer ready to use
return each(
doAsyncAuth(), // Asumming this adds prop 'auth' to context
set(createAccountDocument),
consoleTracer('user'),
saveToDB()
)()
Error handling
If you need to send and error in the pipeline you must return an exception.
Depending in the context your async function is used the error message alone
will be wrapped in a ContextError
. This will help error handling to recover and close the necessary resources.
const myAsyncTask = () => context => {
// doing some async stuff
// oh no we found a error, lets throw
return Resolve.reject(new Error('MyAsyncTaskError'))
}
Passing the context in the error helps cleaning steps in the pipeline.
return each(
connectToDB(),
myAsyncTask(...), // This returns an error
closeDB() // This never gets executed
)()
We might want to capture the error and recover from it to close allocated resources. For that we use the capture
async helper.
return each(
connectToDB(),
capture(
myAsyncTask(...), // This returns an error
noop() // This will be executed only if there is an in the previous call error
),
closeDB() // This never will be executed
)()
Another way to achieve the same goal is to use the ensure
function.
return ensure(
each(
connectToDB(),
myAsyncTask(...), // This returns an error
),
closeDB() // This never will be always executed
)()
We can also define different error handlers depending on error type (message)
return each(
connectToDB(),
capture(
myAsyncTask(...), // This returns an error
{
'AsyncError': noop() // This will be executed only if there is an in the previous call error,
'VeryRareAsyncError': logItRemotely() // Will be executing if error.message === 'VeryRareAsyncError'
}
),
closeDB() // This never will be executed
)()
Notice that if we don't provide a handler for some error type the whole async function will fail with a promise rejection containing the error.
So, how to create async pipeline functions
If you are building a async pipeline function from scratch you function signature should look like this:
// Arrow function
const myAsyncFunction = (params) => context => {
// Async stuff depending on params that prob mutates context
if (someErrorCondition) {
return Promise.reject(new Error('MyCustomError'))
}
return Promise.resolve(context);
}
// Function notation
function myAsyncFunction (params) {
return function(context) {
// Async stuff depending on params that prob mutates context
if (someErrorCondition) {
return Promise.reject(new Error('MyCustomError'))
}
return Promise.resolve(context);
}
}
// Usage
return each(
myAsyncFunction(...),
otherAsyncFunction(...)
)({ starting: 'context' })
This way you can write your own set for mongodb, redis, request, rabbitmq, etc.
Async / Await
Async/Await in JavaScript is based on Promises. Since all async pipeline functions return a Promise of a Context you can also do:
const performTask = each(
connectToDB(),
capture(
myAsyncTask(...), // This returns an error
each(
logItRemotely(),
always({ error: 'failed' })
)
),
closeDB(),
props({ result: 'ok' })
)
const result = await performTask()
console.log(result)