parallel-universe
v6.1.1
Published
The set of async flow control structures and promise utils.
Downloads
423
Maintainers
Readme
npm install --save-prod parallel-universe
🚀 API documentation is available here.
AbortablePromise
The promise that can be aborted.
const promise = new AbortablePromise((resolve, reject, signal) => {
signal.addEventListener('abort', () => {
// Listen to the signal being aborted
});
// Resolve or reject the promise
});
promise.abort();
When abort
is called,
the promise is instantly rejected with
an AbortError
if it isn't settled yet.
Provide a custom abort reason:
promise.abort(new Error('Operation aborted'));
Abort promise if an external signal is aborted:
promise.withSignal(signal);
Deferred
The promise that can be resolved externally.
const promise = new Deferred<string>();
promise.then(value => {
doSomething(value);
});
promise.resolve('Mars');
AsyncQueue
Asynchronous queue decouples value providers and value consumers.
const queue = new AsyncQueue();
// Provider adds a value
queue.append('Mars');
// Consumer takes a value
queue.take();
// ⮕ AbortablePromise { 'Mars' }
append
appends the value to the queue, while take
removes the value from the queue as soon as it is available.
If there are no values in the queue upon take
call then the returned promise is resolved after the next append
call.
const queue = new AsyncQueue();
// The returned promise would be resolved after the append call
queue.take();
// ⮕ AbortablePromise { 'Mars' }
queue.append('Mars');
Consumers receive values from the queue in the same order they were added by providers:
const queue = new AsyncQueue();
queue.append('Mars');
queue.append('Venus');
queue.take();
// ⮕ AbortablePromise { 'Mars' }
queue.take();
// ⮕ AbortablePromise { 'Venus' }
Acknowledgements
In some cases removing the value from the queue isn't the desirable behavior, since the consumer may not be able to
process the taken value. Use takeAck
to examine available value and acknowledge that it can be processed. takeAck
returns a tuple of the available value and the acknowledgement callback. The consumer should call ack
to notify
the queue on weather to remove the value from the queue or to retain it.
queue.takeAck().then(([value, ack]) => {
try {
if (doSomeChecks()) {
ack(true);
doSomething(value);
}
} finally {
ack(false);
}
});
To guarantee that consumers receive values in the same order as they were provided, acknowledgements prevent subsequent
consumers from being fulfilled until ack
is called. Be sure to call ack
to prevent the queue from being stuck
indefinitely.
Calling ack
multiple times is safe, since only the first call would have an effect.
To acknowledge that the consumer can process the value, and the value must be removed from the queue use:
ack(true);
To acknowledge that the value should be retained by the queue use:
ack(false);
The value that was retained in the queue becomes available for the subsequent consumer.
const queue = new AsyncQueue();
queue.append('Pluto');
queue.takeAck(([value, ack]) => {
ack(false); // Tells queue to retain the value
});
queue.take();
// ⮕ AbortablePromise { 'Pluto' }
WorkPool
The callback execution pool that executes the limited number of callbacks in parallel while other submitted callbacks wait in the queue.
// The pool that processes 5 callbacks in parallel at maximum
const pool = new WorkPool(5);
pool.submit(signal => {
return Promise.resolve('Mars');
});
// ⮕ AbortablePromise<string>
You can change how many callbacks can the pool process in parallel:
pool.setSize(2);
// ⮕ Promise<void>
setSize
returns the promise that is resolved when there are no excessive callbacks being processed in parallel.
If you resize the pool down, callbacks that are pending and exceed the new size limit, are notified via signal
that
they must be aborted.
To abort all callbacks that are being processed by the pool and wait for their completion use:
// Resolved when all pending callbacks are fulfilled
pool.setSize(0);
// ⮕ Promise<void>
Lock
Promise-based lock implementation.
When someone tries to acquire a Lock
they receive a promise for a release callback that is resolved as soon as
previous lock owner invokes their release callback.
const lock = new Lock();
lock.acquire();
// ⮕ Promise<() => void>
You can check that the lock is locked before acquiring a lock.
For example, if you want to force an async callback executions to be sequential you can use an external lock:
const lock = new Lock();
async function doSomething() {
const release = await lock.acquire();
try {
// Long process is handled here
} finally {
release();
}
}
// Long process would be executed three times sequentially
doSomething();
doSomething();
doSomething();
Blocker
Provides a mechanism for blocking an async process and unblocking it from the outside.
const blocker = new Blocker<string>();
blocker.block();
// ⮕ Promise<string>
You can later unblock it passing a value that would fulfill the promise returned from the block
call:
blocker.unblock('Mars');
PubSub
Publish–subscribe pattern implementation:
const pubSub = new PubSub<string>();
pubSub.subscribe(message => {
// Process the message
});
pubSub.publish('Pluto');
repeat
Invokes a callback periodically with the given delay between settlements of returned promises until the condition is met. By default, the callback is invoked indefinitely with no delay between settlements:
repeat(async () => {
await doSomething();
});
// ⮕ AbortablePromise<void>
Specify a delay between invocations:
repeat(doSomething, 3000);
// ⮕ AbortablePromise<void>
Abort the loop:
const promise = repeat(doSomething, 3000);
promise.abort();
Specify the condition when the loop must be stopped. The example below randomly picks a planet name once every 3 seconds
and fulfills the returned promise when 'Mars'
is picked:
repeat(
() => ['Mars', 'Pluto', 'Venus'][Math.floor(Math.random() * 3)],
3000,
value => value === 'Mars'
);
// ⮕ AbortablePromise<string>
You can combine repeat
with timeout
to limit the repeat duration:
timeout(
repeat(async () => {
await doSomething();
}),
5000
);
retry
Invokes a callback periodically until it successfully returns the result. If a callback throws an error or returns a promise that is rejected then it is invoked again after a delay.
retry(async () => {
await doSomethingOrThrow();
});
// ⮕ AbortablePromise<void>
Specify a delay between tries:
retry(doSomethingOrThrow, 3000);
// ⮕ AbortablePromise<void>
Specify maximum number of tries:
retry(doSomethingOrThrow, 3000, 5);
// ⮕ AbortablePromise<void>
Abort the retry prematurely:
const promise = retry(doSomethingOrThrow, 3000);
promise.abort();
You can combine retry
with timeout
to limit the retry duration:
timeout(
retry(async () => {
await doSomethingOrThrow();
}),
5000
);
waitFor
Returns a promise that is fulfilled when a callback returns a truthy value:
waitFor(async () => doSomething());
// ⮕ AbortablePromise<ReturnType<typeof doSomething>>
If you don't want waitFor
to invoke the callback too frequently, provide a delay in milliseconds:
waitFor(doSomething, 1_000);
delay
Returns a promise that resolves after a timeout. If signal is aborted then the returned promise is rejected with an error.
delay(100);
// ⮕ AbortablePromise<void>
Delay can be resolved with a value:
delay(100, 'Pluto');
// ⮕ AbortablePromise<string>
timeout
Rejects with an error if the execution time exceeds the timeout.
timeout(async signal => doSomething(), 100);
// ⮕ Promise<ReturnType<typeof doSomething>>
timeout(
new AbortablePromise(resolve => {
// Resolve the promise value
}),
100
);