node-persistent-queue
v1.0.5
Published
Simple SQLite backed Queue for running many short tasks in Node.js event thread
Downloads
551
Readme
node-persistent-queue
Overview
Simple SQLite backed Queue for running many short tasks in Node.js using setImmediate()
If you have a large batch of small running tasks, this library will allow them to execute in sequence via the main event thread of node.js without blocking/starving other node.js events.
Description
The purpose of this library is to provide a simple means of:
- executing, serially in FIFO order a queue of tasks one at a time
- maintaining an on-disk queue (using SQLite) that persists through crashes/restarts
- ensuring the node.js event loop can return to the poll phase between invocations of each job using setImmediate(), thus preventing blocking.
- asynchronicity via Promises
A unit of work, or task is stored in the queue as a simple json object. Each task should complete with sufficient speed so as not to block your node.js event loop.
If you cannot break your tasks down sufficiently, you should consider a multi-threaded
Worker
implementation instead.
If however, you have a large batch of small running tasks, this library will allow them to execute via the main event thread of node.js without blocking/starving other node.js events.
Refer to the Event Loop Timers and Nexttick Guide from https://nodejs.org for a great explanation of the node.js events thread.
Installation
$ npm install --save node-persistent-queue
Usage
The module is quite simple to use through its EventEmitter API.
Instantiation
The following illustrates how to specify the location of the SQLite database for your instance.
var Queue = require('node-persistent-queue') ;
/*
Provide path to your sqlite database. If file doesn't exist, it will be created
*/
var q = new Queue('./path/to/db.sqlite') ;
/*
You can use an in-memory sqlite database (although it would no longer be a persistent queue)
*/
var q = new Queue(':memory:') ;
// or
var q = new Queue('') ;
The second optional parameter specifies the number of tasks to retrieve from the DB at a time.
/*
By default, the module will retrieve up to 10 'tasks' from the sqlite database at a time.
If the data for your tasks is quite large, you can reduce this to conserve more memory
or you can increase this limit to improve throughput.
*/
var q = new Queue('./path/to/db.sqlite',1) ; // Retrieve each job from DB one at a time
var q = new Queue('./path/to/db.sqlite',1000) ; // Grab 1000 at a time
Events
node-persistent-queue
emits events according to the following table:
| Event | Description | Event Handler Parameters |
|:-----:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
| start | Emitted when the queue starts processing tasks (after calling .start() method) | q.on('start',function(){ }) ; |
| stop | Emitted when the queue stops processing tasks (after calling .stop() method) | q.on('stop',function(){}) ; |
| next | Emitted when the next task is to be executed. This occurs: * when there are items in the queue and .start() has been called; or * after .add() has been called to add a task to an empty queue and queue isStarted()
already | q.on('next',function(job) { job.id, job.job }) ; |
| empty | Emitted when the last task is completed and removed from the db | q.on('empty',function() { }) ; |
| add | Emitted when a task has been added to the queue (after calling .add() method) | q.on('add',function(job) { job.id, job.job }) ; |
| open | Emitted when the sqlite database has been opened successfully (after calling .open() method) | q.on('open',function(sqlite) { sqlite //instance of sqlite3.Database }) ; |
| close | Emitted when the sqlite database has been closed successfully (after calling .close() method) | q.on('close',function() { }) ; |
Contrived Example
This example illustrates the use of the events emitted from node-persistent-queue.
The empty
event handler below automatically stops the queue when it becomes empty. It then closes
the SQLite DB and terminates the script.
Note, that the next
event handler, on completion of processing the task, must call the .done()
callback method. This will then schedule another next
event to be emitted, using setImmediate()
.
The .add()
method returns a promise that resolves when the job has been saved in the sqlite database.
var Queue = require('node-persistent-queue') ;
var q = new Queue(':memory:') ;
var task1 = {
data: "Data1"
} ;
var task2 = {
data: "Data2"
} ;
var task3 = {
data: "Data3"
} ;
var task4 = {
data: "Data4"
} ;
q.on('open',() => {
console.log('Opening SQLite DB') ;
console.log('Queue contains '+q.getLength()+' job/s') ;
}) ;
q.on('add',task => {
console.log('Adding task: '+JSON.stringify(task)) ;
console.log('Queue contains '+q.getLength()+' job/s') ;
}) ;
q.on('start',() => {
console.log('Starting queue') ;
}) ;
q.on('next',task => {
console.log('Queue contains '+q.getLength()+' job/s') ;
console.log('Process task: ') ;
console.log(JSON.stringify(task)) ;
// Must tell Queue that we have finished this task
// This call will schedule the next task (if there is one)
q.done() ;
}) ;
// Stop the queue when it gets empty
q.on('empty',() => {
console.log('Queue contains '+q.getLength()+' job/') ;
q.stop() ;
q.close()
.then(() => {
process.exit(0) ;
})
}) ;
q.on('stop',() => {
console.log('Stopping queue') ;
}) ;
q.on('close',() => {
console.log('Closing SQLite DB') ;
}) ;
q.open()
.then(() => {
q.add(task1) ;
q.add(task2) ;
q.add(task3) ;
q.add(task4) ;
q.start() ;
})
.catch(err => {
console.log('Error occurred:') ;
console.log(err) ;
process.exit(1) ;
}) ;
The above script produces the following output:
Opening SQLite DB
Queue contains 0 job/s
Starting queue
Adding task: {"id":1,"job":{"data":"Data1"}}
Queue contains 1 job/s
Adding task: {"id":2,"job":{"data":"Data2"}}
Queue contains 2 job/s
Adding task: {"id":3,"job":{"data":"Data3"}}
Queue contains 3 job/s
Adding task: {"id":4,"job":{"data":"Data4"}}
Queue contains 4 job/s
Queue contains 4 job/s
Process task:
{"id":1,"job":{"data":"Data1"}}
Queue contains 3 job/s
Process task:
{"id":2,"job":{"data":"Data2"}}
Queue contains 2 job/s
Process task:
{"id":3,"job":{"data":"Data3"}}
Queue contains 1 job/s
Process task:
{"id":4,"job":{"data":"Data4"}}
Queue contains 0 job/
Stopping queue
Closing SQLite DB
Take a look at the test script for further examples of the API
Notes
You may be wondering why the start
event is emitted before the add
events, based on the
output above.
The .add()
method calls perform an asynchronous write to the SQLite database.
The callback that emits the add
event from this I/O doesn't happen until after the
current block of code completes. Thus, the .start()
method is called first along with
the start
event.
Whether the tasks are added before starting the queue or after doesn't matter because
the next
event cannot occur until both the add
and start
events have fired.
The queue can be left in the start
state (when the queue becomes empty, you don't have
to .stop()
it). As things are .add()
ed, the next
event will be emitted after the
current code block finishes.
TODO
A TODO List of possible future features is included. Contributions welcome.
Licence
Copyright (C) 2019 Damien Clark, Damo's World
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Acknowledgements / Attribution
Thanks to the SQLite team for an awesome "in-process library that implements a self-contained, serverless, zero-configuration, transactional SQL database engine."