point-free
v0.7.0
Published
Async combinators and decorators collection
Downloads
1,648
Maintainers
Readme
Point free async utilities
This library designed to encourage composition of a program from Node.js style asynchronous primitives. It provides generally useful:
- decorators - wrappers that alter async function behavior some way,
- combinators - things that combine several functions into one.
Installation
npm install point-free
API
Decorators
Decorator wrap a function and either provide additional functionality or alter its semantics for particular case. Common usage: logging, retrying or limiting operations.
retry([options | attempts = 5], func)
Makes a function retrying func
up to attempts
times, behaving the same otherwise.
If amount of attempts as exceeded then last error is returned.
Options:
attempts
- number of attempts to runfunc
, defaults to 5.timeout
ortimeout(failed)
- a number of milliseconds to wait between tries. If specified as function then it is called with a number of failed attempts passed.factor
- timeout will be multiplied by this value for each failed attempt but first. A shortcut to implement exponential backoff.
This way one can make fetchURL
use 5 attempts with timeouts 1, 2, 4, 8 and 16 seconds:
var fetchURL = pf.retry({timeout: 1000, factor: 2}, _fetchURL);
function _fetchURL(url, callback) {
// ...
}
limit([options | limit], func)
Limit number of concurrent executions of a func
. Excessing calls will be queued and executed in FIFO order.
Options:
limit
- number of concurrent executions allowed.by
- limit only those calls clashing by values ofby(args..)
.
Here is how you can limit HTTP requests to 4 by domain and 50 overall:
var fetchURL = pf.limit({by: getDomain, limit: 4},
pf.limit(50, _fetchURL));
function _fetchURL(url, callback) {
// ...
}
By specifying limit to be 1 you can force calls to be sequential. E.g. in map:
var mapSerial = function (seq, process) {
return pf.map(seq, pf.limit(1, process));
}
TODO: document introspection and .emptyQueue()
fallback(defaultValue, func)
Returns a version of func
that never fails, but returns defaultValue
instead.
E.g. this function returns 'unknown'
if any of waterfall components fail:
var detectPageLanguage = pf.fallback('unknown', pf.waterfall(
fetchPage,
getPageText,
detectTextLanguage
));
logCalls([logger = console.log], func)
On each function call pass its arguments
to logger
. Aimed to use for logging and debugging in a way like:
var fetchURL = logCalls(fetchURL);
// ... use fetchURL same as before, look at urls passed.
logExits([logger = console.log], func)
On each function callback call pass its arguments
to logger
. Useful to trace async function results.
logErrors([logger = console.error], func)
Pass all function errors to logger
. They are still passed the normal way too. Can be used with a third party logger utility like debug:
var debug = require('debug')('my-module');
var shakyFunc = logErrors(debug, shakyFunc);
// ... use shaky func as usual while seeing its errors.
Combinators
waterfall(funcs...)
Combines several functions to be executed serially with results of each function passed to next one. Arguments to resulting function before callback are passed to the first step. Results of last function will be returned as a result of combined action. Any error will be passed out immediately, stopping chain of execution.
var pf = require('point-free');
var displayFile = pf.waterfall(
fs.readFile,
console.log.bind(console),
);
displayFile('filename.txt', callback)
When it's not possible to pass everything to first function waterfall()
could be enclosed and either called immediately...:
function copyFile(from, to, callback) {
pf.waterfall(
fs.readFile.bind(null, from),
fs.writeFile.bind(null, to),
)(callback);
}
... or passed to other combinator or decorator:
pf.serial(
action1,
// Combined subtask
pf.waterfall(
fetchData,
actOnIt
),
// ...
)
serial(funcs... | funcs)
Combines several actions into one executing them serially, arguments to combined action passed to each subtask. Results of subtasks are combined into array preserving order. If an error occurs it's passed out immediately, stopping chain of execution.
// Note same arguments
var dropShard = function (jobId, callback) {...};
var deleteJob = function (jobId, callback) {...};
var cleanup = pf.serial(dropShard, deleteJob);
Most commonly used to construct an operation from several async steps:
pf.serial(
pg.query.bind(pg, 'drop database if exists pg_bricks', []),
pg.query.bind(pg, 'create database pg_bricks', []),
pg.query.bind(pg, 'create table ...', []),
...
)(function (err, res) {
done(err);
})
parallel(funcs... | funcs)
Combines several actions into one executing them in parallel. Arguments are passed to each subtask, results are collected into array preserving order. Any error is passed out immediately, all functions still running parallel continue, but their results are ignored.
var recalcAll = pf.parallel(recalcLinks, recalcDomains, recalcQueue);
Can be used to create a function as above or as a substep in a bigger combinator:
pf.waterfall(
// Fetch jobs and stats
pf.parallel(
db.select('*').from('job').rows,
db.query.bind(db, STATS_SQL)
),
// Render the page
function (results, callback) {
var jobs = results[0], stats = results[1];
// ...
}
)(next)
auto(jobs)
Automatically resolves dependencies and executes subtasks in appropriate order and in parallel if possible. Results of dependent calls are passed as parameters to dependent actions. In the end all the subtask results are combined into an object with corresponding properties.
Here jobs
and stats
are executed in parallel, their results are passed to report
function,
then its result is passed to send
function:
pf.auto({
jobs: db.select('*').from('job').rows,
stats: db.query.bind(db, STATS_SQL),
report: ['jobs', 'stats', function (jobs, stats, callback) {
// ...
return html;
}],
send: ['report', function (report, callback) {...}]
})(done)
manual(states)
A way to create asynchronous state machine. Accepts an object with steps, call next.name()
or
use it as callback to progress to next step.
start
and end
steps are special: execution always starts from start
and
calling next.end()
will stop machine and pass a result out:
function cachedGet(url) {
var filename = __dirname + '/cache/' + url.replace(/\//g, '#');
return pf.manual({
// always starts from 'start' state
start: function (next) {
fs.exists(filename, function (exists) {
// go to some new state
if (exists) next.readCache()
else next.request();
});
},
request: function (next) {
// use state transition as callback
request(url, next.writeCache);
},
readCache: function (next) {
// use next.end to leave state machine
fs.readFile(filename, 'utf-8', next.end);
},
writeCache: function (response, body, next) {
fs.writeFile(filename, body, 'utf-8', function (error) {
next.end(error, body);
});
}
});
}
cachedGet('http://...')(function (err, body) {
...
})
while(test, body)
Creates an asynchronous function repeatively calling body
while test
condition holds:
var waitForLock = pf.while(isLocked, function (callback) {
setTimeout(callback, 1000);
});
pf.serial(
waitForLock,
... // do something useful
)
doWhile(body, test)
Same as while, but test
condition is checked after body
execution:
var bytesReceived = 0;
var readChunk = pf.doWhile(function (callback) {
// ...
bytesReceived += ...
// ...
}, function () {return bytesReceived <= CHUNK_SIZE});
Primitives
noop
A nice thing when you want to do something conditionally:
pf.waterfall(
jobs[id] ? pf.noop : loadJob,
// ...
)
sleep(timeout)
Delays any subsequent actions in a pipeline. Could be used with serial and waterfall:
var delayedHandler = pf.waterfall(pf.sleep(1000), handler);
clear
Ignores it's arguments and just calls a callback. Intended to be used in waterfall pipeline to ignore all results from previous function:
var acquireTask = pf.waterfall(
db.query('select pg_advisory_xact_lock($1, $2)', [job_id, module]),
// ignore select result
pf.clear,
// mark task as worked on
db.update('queue', {active: true}).where(...).returning('*')
)
Collections
each(seq, func)
Execute func
for each item in seq
in parallel and ignore results.
If any sub-call fails then entire call fails immediately.
var pingHosts = pf.each(HOSTS, ping);
map(seq, func)
Execute func
for each item in seq
in parallel and collect results into array preserving order.
If any sub-call fails then entire call fails immediately.
pf.map(seq, function (item, callback) {
// ...
})(done)
chunk(size, seq, func)
Chunk seq
and process each chunk with func
serially.
Chunks will be sized up to size
.
Any arrays returned by processing function are combined into single resulting array,
non-array results are ignored.
// Insert links into database in chunks of 1000
pf.chunk(1000, links, function (chunk, callback) {
db.insert('link', chunk).run(callback);
})(done)