tasks-runner
v1.4.0
Published
Scheduler and runner for nodejs scripts
Downloads
8
Maintainers
Readme
Tasks Runner
Could resolve following issues:
- Schedule task to be executed at specified time
- Schedule task to be executed with specified period
- Schedule tasks within specified group to be executed one by one in order like it was scheduled
- Graceful shutdown. All started runners will have 30 seconds to finish its current tasks in case of shutdown.
Installation
npm install --save tasks-runner
How it works
Based on periodical scanning iterations. Every iteration it executes a tasksPerScanning
count of tasks. New iteration will not
be scheduled until old one finished. When worker pick task - task will be marked with current date and locked.
In another words - this task will be excluded from queue, so nobody will touch it. It could be picked for processing again
with following requirements: task is not still processed and previous task lock was more than lockInterval
seconds before.
So you need to be sure that every of scheduled tasks will be finished in lockInterval
seconds.
Task will be marked as failed if it thrown any error and as processed in others cases.
Graceful shutdown
Runner supports graceful shutdown. All started runners will have 30 seconds to finish current tasks in case of shutdown.
Task processor
Task processor could be a function or an object with implemented .run()
method.
Please create it as function-generator or regular function which returns Promise.
Every task processor receives three arguments:
- Data that it needs to process.
- Result of previous task in case if both tasks are members of same group. In other cases it passes null.
- Extended information with recent error (in case of failed previous processing) and creation date of task.
Please see code sample below for details (it is how task processor will be executed by task runner):
extendedInfo = {
failedAt: task.failedAt, // date of previous error
errorMsg: task.errorMsg, // message of previous error
retries: task.retries, // count of failed executions
createdAt: task.createdAt // creation date of task
};
taskResult = yield taskProcessor(task.data, previousTaskResult, extendedInfo);
How to use group of tasks
It is possible to assign a couple of tasks into same group. In this case these tasks will be executed in order like it was scheduled. If some task in group will be failed by some reason - others tasks will be postponed. The result of previous task will be passed to current one as an second argument. Also every task processor will receive extended information about task as an third argument.
Task model
By default new task will be stored in "tasks" collection (it is configurable).
Field | Type | Description
:------------ | :------: | :----------
_id | ObjectId | Mongo id
taskId | string | Specified unique id or auto generated uuid.
name | string | Specified name that will be passed to taskProcessorFactory. See .run() for details.
data | * | Specified data that will be passed to task processor.
group | string | Specified group that will be used to execute tasks one by one in queue. If some task will be failed - others tasks with the same group will be postponed.
startAt | Date | Specified date when task should be executed.
repeatEvery | number | Specified period in seconds which should be used as basis for repeatable task.
retryStrategy | string | Specified strategy to apply for failed task. See .schedule() for details.
lockedAt | Date | Date when some worker picked task to process.
processedAt | Date | Date when task was finished successfully. For repeatable tasks it will be always null
.
failedAt | Date | Date of recent error.
errorMsg | string | Message of recent error.
retries | number | Counter of how many times this task was executed. It will be increased per every failed processing.
createdAt | Date | Date when task was created.
API
.connect(url, options)
Returns Promise. Promise will be resolved as soon as connection will be created. We suggest you to not wait its resolving because it will do it as soon as any query will need it.
Parameter | Type | Required | Description
--------------------- | ------ | -------- | -----------
url | string | required | Connection url for mongodb
options | Object | optional |
options.collection | string | optional | Collection name that should be used by default
options.logger | Object | optional | Logger that should be used. It should implement .debug()
and .error()
methods
.schedule(name, data, options)
Returns Promise. Passes scheduled task to resolver. Schedules task but does not execute it. For execution you will need worker
Parameter | Type | Required | Description --------------------- | ------ | -------- | ----------- name | string | required | Task name, will be passed as an argument into taskProcessorFactory data | mixed | required | Task data, will be passed as an rgument into taskProcessor options | Object | optional | options.taskId | string | optional | Unique identifier, by default uuid will be generated options.startAt | Date | optional | Defining when task should be executed. By default - current date. options.repeatEvery | number | optional | Period of repeatable task in seconds. By default - 0 (disabled) options.group | string | optional | Group of task. By default - null (disabled) options.retryStrategy | string | optional | In what time task should be rescheduled in case of any error. By default it uses "pow1". Value should be matched with following patterns (N - any integer):none - don't reschedule it, task will be run in every scanning iterationpowN - retries count with pow NNm - in N minutesNh - in N hoursNd - in N days options.collection | string | optional | Collection where task should be stored. By default - "tasks".
.run(options)
Returns Promise which will be resolved after first scanning iteration. Executes scheduled tasks.
Parameter | Type | Required | Description ---------------------------- | -------- | -------- | ----------- options | Object | required | options.scanInterval | number | optional | Period of scanning for ready to execute tasks, in seconds. By default - 60 seconds. options.lockInterval | number | optional | Max time for task to be finished or failed, in seconds. By default - 60 seconds. options.groupInterval | number | optional | Time that should be used for rescheduling tasks in scope of one group in case if previous task is not still processed, in seconds. By default - 5 seconds. options.taskProcessorFactory | function | optional | Should return task processor for provided task name (via first argument). Task processor could be a function or an object with .run() method. As an argument task processor receives task data. Also it can receive result of previous task execution in case if both tasks are members of the same group. options.tasksPerScanning | number | optional | Count of tasks that should be executed per scanning iteration. By default - 1000. Iteration should be finished in scanInterval seconds. options.collection | string | optional | Collection that should be managed by this worker. By default - "tasks".
.stop()
Returns Promise. Graceful shutdown. All started runners will have 30 seconds to finish its current tasks in case of executing this function.
.findTask(query, collection)
Returns Promise or null. Finds one task by some mongo query.
Parameter | Type | Required | Description ---------- | ------ | -------- | ----------- query | Object | required | Mongo query for find operation. collection | string | optional | Collection that should be used. By default - "tasks".
.remove(query, collection)
Returns Promise. Removes tasks by some mongo query.
Parameter | Type | Required | Description ---------- | ------ | -------- | ----------- query | Object | required | Mongo query for remove operation. collection | string | optional | Collection that should be used. By default - "tasks".
.close()
Returns Promise or undefined (in case if connection is not exists). Force closing connection to mongo. Usually you don't need to do it manually, but probably you will need it for some tests.