operations-queue
v0.2.2
Published
A task queue for sync. and async. tasks respecting resource use and precedence constraints.
Downloads
3
Maintainers
Readme
OperationsQueue
A task queue for orchestrating tasks with pre-requisites and resource requirements.
This is probably easier to explain by example. Consider a "file uploader" where each file has to be hashed (and the hash stored) and uploaded only after hashing. However to avoid locking up the UI, there can be at most one hashing task (done progressively), and at most three simultaneous uploads (for whatever reason). This can be readily achieved within the OperationsQueue
framework:
- Each hash task is a pre-requisite to the corresponding upload task (tasks may have zero or more pre-requisites)
- The queue might have a resource availability description of
{hash: 1, upload: 3}
- Each hash task holds 1
"hash"
item (and releases it on completion) - Each upload task holds 1
"upload"
item (and releases it on completion)
Table of Contents
Install
npm Package
This is available as operations-queue
on npm. (Install with npm install operations-queue
.)
Usage By Example
Constructor
const OperationsQueue = require("operations-queue");
var opsQueue = new OperationsQueue({
availableResources: {
resource_a: 5,
resource_b: 10,
resource_c: 5
},
singleTaskCompletionCallBack: function(tId, result) {
// When a single task is done
var info = opsQueue.getTaskData();
console.log("[" + tId + "] Completed. Result", result);
console.log("[" + tId + "] Task Duration: " + (info.taskCompletionTime[tId] - info.taskStartTime[tId]) + "ms.");
taskStartTime[tId]) + "ms.");
},
terminationCallBack: function(completed, incomplete) {
// When all done
console.log("All Done.");
console.log("Completed:", completed);
console.log("Incomplete:", incomplete);
},
// ... in milliseconds
sleep_period_between_task_starts: 0,
show_debug_output: false
});
Task Creation
Here is an example of a synchronous task:
var tId = opsQueue.createTask({
name: "Name of Task",
// Priority: higher => higher priority (defaults to 5)
priority: 5,
// Resource use. Example: {forklift: 1, driver: 1}
resourceUse: {forklift: 1, driver: 1},
taskCompletionCallBack: function(return_value) {
console.log("[" + this.name + "] Default callback: Task complete with output ", return_value);
},
taskDependencies: [taskId1, taskId2, taskId2],
// Note that the output of pre-requisites are passed into the task function in order
task: function(arg1, arg2, arg3) {
// ....
return result;
},
});
Here is an example of an asynchronous task:
var tId = opsQueue.createTask({
name: "Name of Task",
// Priority: higher => higher priority (defaults to 5)
priority: 5,
// Resource use. Example: {forklift: 1, driver: 1}
resourceUse: {forklift: 1, driver: 1},
taskCompletionCallBack: function(return_value) {
console.log("[" + this.name + "] Default callback: Task complete with output ", return_value);
},
taskDependencies: [taskId1, taskId2, taskId2],
// Note that the output of pre-requisites are passed into the task function in order
task: function(arg1, arg2, arg3) {
// Asynchronous tasks require a function that returns a promise that does the work asynchronously (when it is created)
return new Promise(function(resolve, reject) {
// ...
});
},
});
Note: all tasks have results wrapped as {result: ..., error: ...}
. For synchronous tasks, for completion without exceptions being thrown up to OperationsQueue
, result
is defined by the return value, and error
is left undefined; otherwise, result
is left undefined and error
is set to the uncaught exception.
For promises, on resolve, .result
is set and .error
is left undefined; on reject, .error
is set and .result
is left undefined. So a result passed through from an asynchronous task should be accessed via arg.result
(after checking for arg.error
).
Starting/Aborting
opsQueue.start()
: Starts the task
opsQueue.abort()
: Aborts the work of the OperationsQueue. Irreversible. Does not actually stop tasks, just prevents new ones from running and prevents new tasks from being created (obviously).
"Visibility"
opsQueue.getTaskData()
returns an object with keys:
taskSequences
: unique task idstaskStartTime
: a dictionary (object) of start times of taskstaskCompletionTime
: a dictionary (object) of completion times of taskstaskDuration
: a dictionary (object) of duration times of tasks
opsQueue.taskList
provides a list of unique task ids.
opsQueue.totalResources
provides a dictionary (i.e.: an object) of all resources as specified in the constructor.
opsQueue.currentAvailableResources
provides the current resource availability (total resources less resources held by tasks in process).
opsQueue.status
returns the status of the OperationsQueue
. Possible statuses:
"READY"
(code: 0)"ABORTED"
(code: 1)"RUNNING"
(code: 2)
Debug
Global debug options for OperationsQueue
:
OperationsQueue.debugModeOn()
: Turns debug mode on (verbose output)OperationsQueue.debugModeOff()
: Turns debug mode offOperationsQueue.debugModeOnForDuration(t_in_milliseconds)
: Turns debug mode on for some duration