fsqu
v1.1.0
Published
filesystem-backed job queue with delays
Downloads
10
Maintainers
Readme
FSqu: job queues on top of filesystem
This is a small library to implement job queues on top of a regular filesystem, where a queue is a directory and each element is a file
The main goal is to allow building job pipelines where the job units are arbitrarily large files, but all the processing happens in the same machine. For example, a pipeline to process audio files
Features
- supports many simultaneous consumers and producers (as long as all of them use the same root FSqu object and therefore live in the same process)
- constant-time operations: queue operations do not degrade as the queue grows, so queues with many thounsands of elements can me easily managed
- delay/schedule: entries can be pushed with a not before timestamp, and won't be elligible for a pop() until then this does not hamper performance, nor it does affect the ret of elements
- supports at-most-once (pop()) and at-east-once (reserve-commit-rollback) models
- supports direct addition of files (as in
mv fff.mp3 queue_dir/
) - queues survive process: files left in a queue will be re-taken after queue re-creation
- There are no FIFO guarantees: insertion order is mostly kept, but not guaranteed
- uses
debug
for debugging (DEBUG=fsqu:* )
Planned features
- associated deadletter queues
Quick Start
const FSqu = require ('fsqu');
const async = require ('async');
const fs = require ('fs');
const fsqu = new FSqu.fsqu ({
name: 'a-test-queue',
path: '/tmp/fsqu/test-queues-with-fsqu',
landing: '/tmp/fsqu/landing'
});
async.series ([
cb => fsqu.init (cb),
cb => fsqu.pushData ('contents of file #1', cb),
cb => fsqu.pushData ('contents of file #2', cb),
cb => fsqu.pop(cb),
cb => fsqu.pop(cb),
], (err, res) => {
if (err) console.error (err);
console.log(fs.readFileSync (res[3].loc, { encoding: 'utf8' })); // should yield 'contents of file #1'
console.log(fs.readFileSync (res[4].loc, { encoding: 'utf8' })); // should yield 'contents of file #2'
fsqu.end(err => console.log ('all done', err));
});
More examples can be found here
API
Queue lifecycle
const q = new FSqu.fsqu(opts)
: queue creation. Creates a queue from a directory. The directory is created if nonexistentopts
can be:path
: directory where the queue will be createdname
: name of the directory hosting the queue. The full queue directory will therefore be<path>/<name>
landing
: directory where to move entries after a call topop()
. It will be created if nonexistent
q.init(cb)
: Initializes a queue. This must be called before it can be usedq.end(cb)
: terminates a queue. It cancels any pending consumer
Insert-into-queue operations
q.pushData (data, opts, cb)
: pushes raw data into the queue.data
: can be astring
or aBuffer
and will be written in a file that will be added to the queueopts
can contain:ts
: aDate
to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible forpop()
until this time has comedelay
: milliseconds to add to the not-before timestamp. Could be negative
q.pushStream (stream, opts, cb)
: pushes data from a stream into the queue.stream
: stream, will be piped into a file (and therefore closed) that will be added to the queueopts
can contain:ts
: aDate
to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible forpop()
until this time has comedelay
: milliseconds to add to the not-before timestamp. Could be negative
q.pushFile (filename, opts, cb)
: pushes raw data into the queue.filemane
: existing file, it will be moved, not copied into the queueopts
can contain:ts
: aDate
to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible forpop()
until this time has comedelay
: milliseconds to add to the not-before timestamp. Could be negative
Get-from-queue operations
q.pop (cb)
: get a file from the queue. It will not return until a file is available (that is, present and with a not-before in the past)cb
will then be called with the usualcb(res, err)
form, whereres.loc
will be the file path, already extracted from the queue and left in thelanding
directory. After this, the file is not under the queue control, and it will the caller's responsibility to remove it when not needed anymore after treatmentq.reserve (opts, cb)
: reserves a file from the queue. It will not return until a file is available (that is, present and with a not-before in the past)cb
will then be called with the usualcb(res, err)
form, whereres.loc
will be the file path, still under queue control. Caller should not do anything with it besides readingopts
can contain:delta
: time in milliseconds the element will remain reserved. After that, it will be available forpop()
orreserve()
again
q.commit (id|reserve-res, cb)
: commits a previously-reserved file. First parameter can be the element id (res.id
in the reserve() call) or directly the wholeres
returned by reserve(). After this call, the file will be removed from the queue and deleted from disk
Direct add/remove of files
Files can be added to a queue by directly moving them (as in
mv
) into the queue's directory. This operation is equivalent to aq.pushFile()
with no options. For this to work the file name can't match the format used internally (^([0-9]+)-([0-9a-z]{16})-([0-9]+)$
)It is important to move them and not copying them: most filesystems can't reliably tell when a file that's opened for writing is closed (or worse: depending on who's producing the file you may see a series of open-append-close operations) so it is not possible to determine when a file can be acted upon
However, files can NOT be safely removed (as in
rm
) or moved away (as inmv
) from the queue's directory