metatasks
v1.1.1
Published
FIFO queue for Promise-based tasks. Optionally persistent to ensure tasks survive reboots. Supports concurrency-limiting, metadata and state-tracking for statistics.
Downloads
3
Maintainers
Readme
Metatasks
Metatasks is a FIFO queue for Promise-based tasks. Optionally persistent to ensure potentially long, fault-prone running tasks survive reboots. Supports concurrency-limiting, metadata and state-tracking for statistics or debugging. Makes use of ES2015 decorators and optionally the new ES6 Proxy object.
Installation
$ npm install metatasks --save
This library supports ES7 decorators proposal which is supported by babel and typescript.
To use it with babel you should enable experimental es7.decorators
feature in babel as described here.
To use it with typescript you should enable experimentalDecorators
and emitDecoratorMetadata
in tsconfig.json
Usage
The simplest way to use metatasks queue system is to decorate an asynchronous method (one which returns a Promise).
import {useQueue} from 'metatasks';
class Example {
@useQueue(3)
async exampleMethod(retVal) {
return retVal;
}
}
Now, however many times you invoke the exampleMethod, only 3 instances will be running at the same time. You may still await the result, as the Promise will pass it eventually, when it's the task's turn to start and complete.
This is great if you want to queue all the calls to the method using the same queue. If you'd like to make queueing optional, you can use the experimental method proxying described below or generate a queue manually.
Persistence
Persistence is achieved through an interface class of your design, thus any database can be used for task storage.
class Example {
@useQueue({
concurrency: 3,
queueId: 'storedExample',
injectTask: true,
taskNameResolver: function(someParam) { return `${this.somethingFromThisObject} : ${someParam}` },
persistence: {
interface: PersistentTaskDatabase,
thisRestorer: async function (objectStoredInTheDatabase) { return new Example(objectStoredInTheDatabase) },
thisStorer: async function (target) { return { ... }}
}
})
async exampleMethod(someParam, task) {
return someParam;
}
}
Persistence interface
The interface must be a class that implements static methods as in the following example:
export default class PersistentTaskDatabaseExample {
static async taskEnqueue(queueId, taskJson:Object) {
}
static async taskSetStarted(queueId, taskId, metadata) {
}
static async taskSetCompleted(queueId, taskId, value, metadata) {
}
static async taskSetFailed(queueId, taskId, error, metadata) {
}
static async taskSetMetadata(queueId, taskId, key, value) {
}
static async taskMergeMetadata(queueId, taskId, obj) {
}
static async setAllStartedAsInterrupted(queueId) {
}
static async getTasksToQueue(queueId, includeInterrupted = true):Array<Object> {
return [];
}
}
Available configuration options for useQueue
concurrency
: how many instances of this method are allowed to run simultaneouslyqueueId
: the unique identifier for this queue, can be used to access the queue from the script and by the database to identify the stored tasks
defaults to: automatically generated name from filename, class and method nameinjectTask
: if true, will make it possible to access the task from the method as the last parameter (note: when this is enabled, be careful to always pass the right amount of parameters to the method, even when they are 'undefined')
defaults to:false
taskNameResolver
: a method returning aString
with the name of the task, which makes it possible to identify the instance of the task; invoked when the task is enqueued with the same parameters as the actual method and on the same target object (can accessthis
)
defaults to: name automatically generated from the class name, method name and a sequential number (or random when using persistence)metadataResolver
: a method returning anObject
like { name: 'taskName', ... }, similar to above, if you want to store more information about the given task to the database
when used: overridestaskNameResolver
and requires 'name' property to be presentpersistence
: an object containing:interface
: prototype of a class, implementing static async methods that store or retrieve information about tasks in the databasethisStorer
: a method that converts the target object - execution context of the task (this
) - into an object that can be stored in the databasethisRestorer
: a method that recreates the context (this
), reversing the above
queueModifier
: a function that is invoked on the immediately after the queue is created, before any tasks are added
mostly useful for adding additional callbacks for queue events
Example queueModifier
function(queue) {
// ...
queue.on('started', onStarted);
queue.on('finished', onFinished);
queue.on('drained', onDrained);
queue.on('added', onAdded);
}
Method proxying
Method proxying will only work under V8 if you run your application with the --harmony_proxies
flag (as of October 2015, subject to change).
import {enableQueuedCalls} from 'metatasks';
@enableQueuedCalls
class Example {
async exampleMethod(retVal) {
...
return retVal;
}
}
let example = new Example();
example.useQueue({concurrency: 1, queueId: 'my-queue'}).exampleMethod(123);
Adding metadata during task execution
This is useful for creating super-tasks, long-running tasks that embed other tasks and continue execution from a certain point, instead of starting over from beginning in case of a system failure.
class Example {
@useQueue({
concurrency: 3,
injectTask: true,
persistence: { ... }
})
async exampleMethod(someParam, task) {
if (task.metadata.status != 'part-1-completed') {
// doing something...
// finished part 1 of task
await task.setMetadata('status', 'part-1-completed');
}
// no need to redo what was already done...
// ...
return someParam;
}
}
Accessing queues in memory and the states of their tasks
import {Queues} from 'metatasks';
let queueId = 'storedExample';
let queue = Queues.get(queueId);
Available properties and actions on a TaskQueue:
- queue.whenDrained - a
Promise
that resolves once all tasks have finished running and their states were synced with the persistent database - queue.empty -
Boolean
, true when empty - queue.queue -
Array<TaskInstance>
waiting to be started - queue.runningTasks -
Map<processID, TaskInstance>
- queue.pause() - pause the queue (won't interrupt tasks which are already running)
- queue.start() - resume the queue
- queue.scheduleTask(metadata:Object|string, target:Object, ...params:Array)
- queue.asyncEventPromises - can be used to ensure all the up-to-date information is stored in the database, e.g. used when doing a graceful shutdown
queue.pause(); await Promise.all(queue.asyncEventPromises)
- queue.gracefulShutdown() - halts the system while making sure all tasks have their states synced
- queue.happyShutdown() - halts the system while making sure all running tasks complete and have their states synced
- queue can be iterated with
for-of
to go through all currently running and queued tasks - queue is an
EventEmitter
and emits the following events:- added => function(task)
- started => function(task)
- finished => function(task)
- drained => function(tasks[])
Available properties and actions on a TaskInstance:
- task.metadata - the object that contains tasks metadata
- task.setMetadata(key, value) - returns
Promise
- task.mergeMetadata(obj) - returns
Promise
- task.promise - promise that is fulfilled once the task completes
- task.generatedPromise - the actual promise of the method, once it is started
- task.setCancelMethod(method) - to be used from inside of the task - pass a method that will cancel (fail) the task prematurely
- task.name - returns the task's name
Creating a queue without using decorators
See this test for an example of how to do that.
Alternatives
I have not tested these, however they look like they could be used to achieve similar results.
- https://www.npmjs.com/package/qtask
- https://www.npmjs.com/package/schedule-drone