@raywhite/async-hofs
v2.0.0
Published
Async / Promise related higher order functions and utils
Downloads
793
Maintainers
Keywords
Readme
async-hofs
JS Promise & async / await higher order functions utils.
About
This repo contains utilities (mostly higher order functions) intending to help with common asynchronous tasks where async
/ await
or promises are used.
Compatibility
At present, and for the 0.x versions, the entire module expects a standard Promise
implementation to be available (Node v4+), and also doesn't itself use async
/ await
in any of it's source - so as to not require transpilation when used as a dependancy (it's intended to be used as a dep for google cloud functions), which at the time of writing this, is pinned to Node v8 LTS.
However from 1.x onward async
/ await
is permitted (although not yet necessarily used) and Node v8.x
compatibility is targeted and Node v6.x
compatility may break without major version changes.
Setup
To clone run git clone https://github.com/raywhite/async-hofs
.
Tests are written using ava, and can be run with npm run test:node
. The full test suite includes linting with eslint .
(npm run test:lint
), and can be run with npm run test
.
To install as a dep, run npm install @raywhite/async-hofs --save
.
API
compose(..fns) => fn
- ...fns - (
...Function
) - any number of functions. - fn - (
Function
) - the composed function.
While async functions are expected, synchronous functions will also be composed. Note that the composed function will alway return a promise. compose
will compose functions from right to left.
sequence(...fns) => fn
- ...fns - (
...Function
) - any number of functions. - fn - (
Function
) - the composed function.
While async functions are expected, synchronous functions will also be composed. Note that the composed function will alway return a promise. sequence
will compose functions from left to right.
- ...fns (
...Function
)
memoize(fn, *[stringify], [timeout = -1]) => memoized
- fn - (
Function
) - the composed function. - stringify - (
Function
) - the function use to stringify arguments. - timeout - (
Number
) - the time to live, for cached results. - memoized - (
Function
) - the memoized function.
A basic memoizer implementation, except that it will memoize the resolve
d or reject
ed values for async functions. The stringify
argument will be called with the arguments passed to the memoized function, and by default, will simply call String(value)
on the first argument to produce internal cache keys... which assumes the first argument is an primitive value - the user should be carful to provide a more appropriate stringify
function where this is not the case.
If a Number
is passed instead of a function for stringify
, it will be treated as timeout
, and any third argument will be ignored.
Where timeout
is an absolute Number
(in milliseconds), it will determine the time for which the resolution (or rejection) of memoized
is cached.
createAsyncFnQueue([concurrency = 1]) => enqueue
- concurrency - (
Number
) - how many times to spawn theasync
function - defaults to1
. - enqueue - (
Function
) - adds a function to the internal queue.
Provides a queue that executes given async functions with a maximum concurrency. Async functions are given by calling the returned function, which returns a promise that resolves or rejects when the function is eventually called.
Rejections do not impact the queue (other given async functions will continue to be called), but the user of the queue is responsible for handling rejections.
createAsyncFnPool(fn, [concurrency = 1]) => pool
- fn - (
Function
) - anasync
function to be invoked - where it requires parameters, usedArray.prototype.bind
. - concurrency - (
Number
) - how many times to spawn theasync
function - defaults to1
. - args = (
...Mixed
) - any extra arguments to pass tofn
. - pool - (
Promise
)
Wraps an async
function, and takes an optional concurrency. fn
will be used to create a "green" thread (think of it like a goroutine or something)... and it will limit the concurrency with which that function is called. Consider the following example:
const { createAsyncFnPool } = require('async-hofs')
const sleep = function (value) {
return new Promise(function (resolve) {
setTimeout(function () {
resolve(value)
}, 100) // Network delay.
})
}
const inputs = [1, 2, 3, 4, 5, 6]
const outputs = []
const thread = async function () {
while (inputs.length) {
const value = await sleep(inputs.shift())
outputs.push(value)
}
}
const fn = async function () {
await createAsyncFnPool(thread, 2)
console.log.call(console, outputs)
}
fn().catch(console.error.bind(console))
retry(fn, [curve = 2], [limit = 2], [shouldRetry = undefined]) => retrier
Wraps an async
function so that it will be attempted limit
times before it actually rejects.
Where the wrapped function rejects multiple times (exceeding limit
), the error that it finally rejects with will always be value that the last attempt rejected with.
- fn - (
Function
) - anasync
function to be wrapped for retrying. - curve - (
Function|Number
) - the number of times to retry - defaults to2
. - limit - (
Number
) - the number of times to retry - defaults to2
; if falsy, will retry indefinitely. - shouldRetry -
function (Error) => boolean
- an optional function to decide whether to retry; can be used to abort earlier thanlimit
. - retrier - (
Function
) - the wrapped function.
If not present, or a Number
is passed, the curve
argument will be treated as the limit
, and a curve will be generated internally (y = x => 0
) so that subsequent attempts are always invoked immediately after a failure.
For ease of use, this module provides some built in helpers for the generation of common curve
generators (see createLinear
and createExponential
below).
NOTE: this method is aliased as createRetrierFn
.
createLinear({ m, b }) => line
Intended for use with createRetierFn
, to create a function to be used as a linear curve
generator, or simply a line
.
- { m, b } - the constants
m
andb
or the gradient andy
intercept in the, respectively, in the equationy = m * x + b
. - line - a function that takes an
x
value and returns ay
value.
createExponential({ a = 2, b = 1 }, m = 1) => curve
Intended for use with createRetrierFn
, to create a function to be used as an exponential curve
generator.
- { a, b } - the constants
a
andb
in the equationy = ((a * b) ** x) * m
- m - the multiplier in the equation
y = ((a * b) ** x) * m
- to allow scaling between milliseconds and seconds. - curve - a function that takes an
x
value and returns ay
value.
mutex([concurrency = 1]) => lock
Given a concurrency
, this function will return a lock
, which is itself a function that resolves a release
function. The lock
function will not resolve unless a mutex is available - the user must ensure that they call release
whenever work is complete, otherwise any pending lock()
s will not resolve.
NOTE: this method is aliased as createCLock
and createConcurrencyLock
- which are just more verbose names.
- concurrency - (
Number
) - the number of concurrentlock()
s that may be resolved at any given time. - lock - (
Function
) - allocates a mutex promise.
Consider the following example, where the second async
function cannot proceed until release
has been called inside the first:
const lock = mutex(1)
const values = []
Promise.all([
// First promise.
(async function () {
const release = await lock()
await sleep(2000)
values.push(1)
release()
}())
// Second promise.
(async function () {
const release = await lock()
values.push(2)
release()
}())
]).then(function () {
console.log(values) // => [1, 2]
})
clock(fn, [concurrency = 1]) => clocked
Given a function fn
and an optional concurrency
, this function will return a version of fn
that will schedule invocation so as to allow a maximum of concurrency
concurrent invocations of that function. This is intended for use case where you don't want to exceed some memory or IO limit, or create a mutex (for instance to prevent concurrent access to files).
NOTE: this method is aliased as createCLockedFn
and createConcurrencyLockedFn
- which are just more verbose names.
- fn - (
Function
) - anasync
function to lock / release. - concurrency - (
Number
) - the number of concurrent invocations allowed - defaults to1
. - clocked - (
Function
) - the concurrency locked function.- clocked.pending - a getter for the number of invocations pending resolution.
- clocked.queued - a getter for the number of calls awaiting invocation.
limit(fn, [rate = 1], [interval = 1000]) => limited
Given a function fn
and a rate
and iterval
, the returned version of fn
will be rate limited such that invokation will be limited to a maximum of rate
calls per any rolling interval
period.
- fn - (
Function
) - anasync
function to lock / release. - rate - (
Number
) - the number of concurrent invocations allowed - defaults to1
. - interval - (
Number
) - the interval for therate
limit - defaults to1000
. - limited - (
Function
) - the rate limited function.
NOTE: this method is aliased as createRateLimitedFn
- which is just a more verbose name.
benchmark(fn, [precision = 'ms'], [...args]) => res
The returned value (res
) is a Promise
that resolves with a tuple in the form (time
, value
) where value
is the value resolved by calling fn
, and time
is the measured execuition time of fn
with a precision of precision
. Where fn
rejects, benchmark
itself with reject with the same value ツ.
- fn - (
Function
) - the async function to be invoked. - precision - (
String
) - a constant (s|ms|ns
) representing the precision of the timing. - args - (
...Mixed
) - extras arguments to pass to thefn
invokcation. - res - (
Array
) - thetime
andvalue
tuple.
buffer(readable, [limit = 1000 * 1024]) => buf
Given a stdlib stream.Readable
, this function will continue to read from the stream until the end
event is emitted by the stream, and then resolve the returned promise. The returned promise will reject if the limit
is exceeded, and will also reject with any errors emitted by the underlying stream.
NOTE: This function will actually consume the stream, meaning that the stream shouldn't also be consumed by another function, unless the event handlers are attached prior to calling buffer
. Importantly, buffer
itself can't actually consume a stream that is or was being consumed by buffer
- so subsequent calls to buffer
using the same stream will error.
- readable - (
stream.Readable
) - the readable stream to be buffered. - limit - (
Number
) - the max number of bytes to buffer. - buf - (
Promise
) - resolves with the buffer contents.
constant buffer.LIMIT_EXCEEDED
The value of the error message
and type
upon rejection of the promise returned by buffer
where the reason for rejection was exceeding of the limit
parameter. Should be used for asserting whether or not this was the type of error.
License
• MIT © Ray White, 2017-2018 •