foratata
v2.1.1
Published
Concurrent TaskQueue with lifecycle notification
Downloads
3,489
Readme
foratata
Concurrent repeatable TaskQueue with lifecycle notification in typescript (alike a Promise.map with concurrency and continue on failure) and cancellation support
NodeJS only (if somebody interested on browser, please open issue)
Getting Started
npm i foratata
Basic Usage
import { TaskQueue } from 'foratata';
// Create the queue
const queue = new TaskQueue();
// add tasks: functions that run workloads
queue.push(
() => delay(1000), // a promise
() => { throw new Error() }, // something that throws (sync or async)
() => whatever(), // something sync
);
try {
// Run the queue with concurrency.
// 2 simultaneous tasks as a time
const res = await queue.run({ concurrency: 2 });
} catch (err) {
console.error(err); // error abstract with all errors
}
Advanced
import { TaskQueue } from 'foratata';
// Give the queue a name and specify the concurrency and timeout
const queue = new TaskQueue('MyQueue', { concurrency: 2, timeout: 50 });
// take optionally an https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
// meaning the Queue has reached its timeout
const task = ({ signal }) => delay(1000, { signal });
// Add a title for better traceability
task.title = 'A task';
queue.push(task);
// Lifecycle events
queue.on('taskStart', ({ task }) => console.log(`START ${task.title}`));
queue.on('taskSuccess', ({ task, time, result }) => console.log(`SUCCESS ${task.title} took ${time}ms and the result was`, result));
queue.on('taskError', ({ task, time, error }) => console.log(`ERROR ${task.title} took ${time}ms and the error was`, error));
queue.on('taskComplete',({ task, time }) => console.log(`COMPLETE ${task.title} took ${time}ms`));
queue.on('start', ({ concurrency, size }) => console.log(`QUEUE START with ${size} items and ${concurrency} tasks in parallel`));
queue.on('complete', ({ time, results, errors }) => console.log(`QUEUE ENDED in ${time}ms`, errors, results));
// Anidate queues
const otherQueue = new TaskQueue();
queue.push(otherQueue.toTask());
await queue.run(); // uses the constructor concurrency. Or all in parallel if none was specified
await queue.every(30); // repeats the task every 30ms. uses the constructor concurrency. Or all in parallel if none was specified
Notifications
All queue lifecycle is emmited as events, and you can subscribe to them to have some feedback on how is going. We ship an event subscriber that outputs to console.
import { TaskQueue, ConsoleSubscriber } from 'foratata';
// Create the queue
const queue = new TaskQueue();
const unsubscribe = ConsoleSubscriber(queue);
// calling unsubscribe() will stop listening to events
Development setup
To clone the repository use the following commands:
git clone https://github.com/jmendiara/foratata && cd foratata
Use VSCode development containers, directly docker-compose
# Shell interactive session inside a container
docker-compose run app bash
Available Scripts
clean
- remove coverage data, Jest cache and transpiled files,build
- transpile TypeScript to ES6,watch
- interactive watch mode to automatically transpile source files,lint
- lint source files and tests,test
- run tests,test:watch
- interactive watch mode to automatically re-run testsformat
- format the code
License
Copyright 2020 Javier Mendiara Cañardo
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.