npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

node-persistent-queue

v1.0.5

Published

Simple SQLite backed Queue for running many short tasks in Node.js event thread

Downloads

834

Readme

node-persistent-queue

NPM

Overview

Simple SQLite backed Queue for running many short tasks in Node.js using setImmediate()

If you have a large batch of small running tasks, this library will allow them to execute in sequence via the main event thread of node.js without blocking/starving other node.js events.

Description

The purpose of this library is to provide a simple means of:

  • executing, serially in FIFO order a queue of tasks one at a time
  • maintaining an on-disk queue (using SQLite) that persists through crashes/restarts
  • ensuring the node.js event loop can return to the poll phase between invocations of each job using setImmediate(), thus preventing blocking.
  • asynchronicity via Promises

A unit of work, or task is stored in the queue as a simple json object. Each task should complete with sufficient speed so as not to block your node.js event loop.

If you cannot break your tasks down sufficiently, you should consider a multi-threaded Worker implementation instead.

If however, you have a large batch of small running tasks, this library will allow them to execute via the main event thread of node.js without blocking/starving other node.js events.

Refer to the Event Loop Timers and Nexttick Guide from https://nodejs.org for a great explanation of the node.js events thread.

Installation

$ npm install --save node-persistent-queue

Usage

The module is quite simple to use through its EventEmitter API.

Instantiation

The following illustrates how to specify the location of the SQLite database for your instance.

var Queue = require('node-persistent-queue') ;

/*
Provide path to your sqlite database.  If file doesn't exist, it will be created
 */
var q = new Queue('./path/to/db.sqlite') ;

/*
You can use an in-memory sqlite database (although it would no longer be a persistent queue)
 */
var q = new Queue(':memory:') ;
// or
var q = new Queue('') ;

The second optional parameter specifies the number of tasks to retrieve from the DB at a time.

/*
By default, the module will retrieve up to 10 'tasks' from the sqlite database at a time. 
If the data for your tasks is quite large, you can reduce this to conserve more memory
or you can increase this limit to improve throughput.
 */
var q = new Queue('./path/to/db.sqlite',1) ; // Retrieve each job from DB one at a time

var q = new Queue('./path/to/db.sqlite',1000) ; // Grab 1000 at a time

Events

node-persistent-queue emits events according to the following table:

| Event | Description | Event Handler Parameters | |:-----:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------| | start | Emitted when the queue starts processing tasks (after calling .start() method) | q.on('start',function(){ }) ; | | stop | Emitted when the queue stops processing tasks (after calling .stop() method) | q.on('stop',function(){}) ; | | next | Emitted when the next task is to be executed. This occurs: * when there are items in the queue and .start() has been called; or * after .add() has been called to add a task to an empty queue and queue isStarted() already | q.on('next',function(job) {  job.id,  job.job }) ; | | empty | Emitted when the last task is completed and removed from the db | q.on('empty',function() { }) ; | | add | Emitted when a task has been added to the queue (after calling .add() method) | q.on('add',function(job) {  job.id,  job.job }) ; | | open | Emitted when the sqlite database has been opened successfully (after calling .open() method) | q.on('open',function(sqlite) {  sqlite //instance of sqlite3.Database }) ; | | close | Emitted when the sqlite database has been closed successfully (after calling .close() method) | q.on('close',function() { }) ; |

Contrived Example

This example illustrates the use of the events emitted from node-persistent-queue.

The empty event handler below automatically stops the queue when it becomes empty. It then closes the SQLite DB and terminates the script.

Note, that the next event handler, on completion of processing the task, must call the .done() callback method. This will then schedule another next event to be emitted, using setImmediate().

The .add() method returns a promise that resolves when the job has been saved in the sqlite database.

var Queue = require('node-persistent-queue') ;
var q = new Queue(':memory:') ;

var task1 = {
	data: "Data1"
} ;
var task2 = {
	data: "Data2"
} ;
var task3 = {
	data: "Data3"
} ;
var task4 = {
	data: "Data4"
} ;

q.on('open',() => {
	console.log('Opening SQLite DB') ;
	console.log('Queue contains '+q.getLength()+' job/s') ;
}) ;

q.on('add',task => {
	console.log('Adding task: '+JSON.stringify(task)) ;
	console.log('Queue contains '+q.getLength()+' job/s') ;
}) ;

q.on('start',() => {
	console.log('Starting queue') ;
}) ;

q.on('next',task => {
	console.log('Queue contains '+q.getLength()+' job/s') ;
	console.log('Process task: ') ;
	console.log(JSON.stringify(task)) ;

	// Must tell Queue that we have finished this task
	// This call will schedule the next task (if there is one)
	q.done() ;
}) ;

// Stop the queue when it gets empty
q.on('empty',() => {
	console.log('Queue contains '+q.getLength()+' job/') ;
	q.stop() ;
	q.close()
	.then(() => {
		process.exit(0) ;
	})
}) ;

q.on('stop',() => {
	console.log('Stopping queue') ;
}) ;

q.on('close',() => {
	console.log('Closing SQLite DB') ;
}) ;

q.open()
.then(() => {
	q.add(task1) ;
	q.add(task2) ;
	q.add(task3) ;
	q.add(task4) ;
	q.start() ;
})
.catch(err => {
	console.log('Error occurred:') ;
	console.log(err) ;
	process.exit(1) ;
}) ;

The above script produces the following output:

Opening SQLite DB
Queue contains 0 job/s
Starting queue
Adding task: {"id":1,"job":{"data":"Data1"}}
Queue contains 1 job/s
Adding task: {"id":2,"job":{"data":"Data2"}}
Queue contains 2 job/s
Adding task: {"id":3,"job":{"data":"Data3"}}
Queue contains 3 job/s
Adding task: {"id":4,"job":{"data":"Data4"}}
Queue contains 4 job/s
Queue contains 4 job/s
Process task: 
{"id":1,"job":{"data":"Data1"}}
Queue contains 3 job/s
Process task: 
{"id":2,"job":{"data":"Data2"}}
Queue contains 2 job/s
Process task: 
{"id":3,"job":{"data":"Data3"}}
Queue contains 1 job/s
Process task: 
{"id":4,"job":{"data":"Data4"}}
Queue contains 0 job/
Stopping queue
Closing SQLite DB

Take a look at the test script for further examples of the API

Notes

You may be wondering why the start event is emitted before the add events, based on the output above.

The .add() method calls perform an asynchronous write to the SQLite database. The callback that emits the add event from this I/O doesn't happen until after the current block of code completes. Thus, the .start() method is called first along with the start event.

Whether the tasks are added before starting the queue or after doesn't matter because the next event cannot occur until both the add and start events have fired.

The queue can be left in the start state (when the queue becomes empty, you don't have to .stop() it). As things are .add()ed, the next event will be emitted after the current code block finishes.

TODO

A TODO List of possible future features is included. Contributions welcome.

Licence

Copyright (C) 2019 Damien Clark, Damo's World

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Acknowledgements / Attribution

Thanks to the SQLite team for an awesome "in-process library that implements a self-contained, serverless, zero-configuration, transactional SQL database engine."