npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

fsqu

v1.1.0

Published

filesystem-backed job queue with delays

Downloads

11

Readme

FSqu: job queues on top of filesystem

This is a small library to implement job queues on top of a regular filesystem, where a queue is a directory and each element is a file

The main goal is to allow building job pipelines where the job units are arbitrarily large files, but all the processing happens in the same machine. For example, a pipeline to process audio files

Features

  • supports many simultaneous consumers and producers (as long as all of them use the same root FSqu object and therefore live in the same process)
  • constant-time operations: queue operations do not degrade as the queue grows, so queues with many thounsands of elements can me easily managed
  • delay/schedule: entries can be pushed with a not before timestamp, and won't be elligible for a pop() until then this does not hamper performance, nor it does affect the ret of elements
  • supports at-most-once (pop()) and at-east-once (reserve-commit-rollback) models
  • supports direct addition of files (as in mv fff.mp3 queue_dir/)
  • queues survive process: files left in a queue will be re-taken after queue re-creation
  • There are no FIFO guarantees: insertion order is mostly kept, but not guaranteed
  • uses debug for debugging (DEBUG=fsqu:* )

Planned features

  • associated deadletter queues

Quick Start

const FSqu =  require ('fsqu');
const async = require ('async');
const fs =    require ('fs');

const fsqu = new FSqu.fsqu ({
  name:    'a-test-queue',
  path:    '/tmp/fsqu/test-queues-with-fsqu',
  landing: '/tmp/fsqu/landing'
});

async.series ([
  cb => fsqu.init (cb),
  cb => fsqu.pushData ('contents of file #1', cb),
  cb => fsqu.pushData ('contents of file #2', cb),
  cb => fsqu.pop(cb),
  cb => fsqu.pop(cb),
], (err, res) => {
  if (err) console.error (err);

  console.log(fs.readFileSync (res[3].loc, { encoding: 'utf8' })); // should yield 'contents of file #1'
  console.log(fs.readFileSync (res[4].loc, { encoding: 'utf8' })); // should yield 'contents of file #2'

  fsqu.end(err => console.log ('all done', err));
});

More examples can be found here

API

Queue lifecycle

  • const q = new FSqu.fsqu(opts): queue creation. Creates a queue from a directory. The directory is created if nonexistent

    opts can be:

    • path: directory where the queue will be created
    • name: name of the directory hosting the queue. The full queue directory will therefore be <path>/<name>
    • landing: directory where to move entries after a call to pop(). It will be created if nonexistent
  • q.init(cb): Initializes a queue. This must be called before it can be used

  • q.end(cb): terminates a queue. It cancels any pending consumer

Insert-into-queue operations

  • q.pushData (data, opts, cb): pushes raw data into the queue.

    • data: can be a string or a Buffer and will be written in a file that will be added to the queue
    • opts can contain:
      • ts: a Date to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible for pop() until this time has come
      • delay: milliseconds to add to the not-before timestamp. Could be negative
  • q.pushStream (stream, opts, cb): pushes data from a stream into the queue.

    • stream: stream, will be piped into a file (and therefore closed) that will be added to the queue
    • opts can contain:
      • ts: a Date to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible for pop() until this time has come
      • delay: milliseconds to add to the not-before timestamp. Could be negative
  • q.pushFile (filename, opts, cb): pushes raw data into the queue.

    • filemane: existing file, it will be moved, not copied into the queue
    • opts can contain:
      • ts: a Date to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible for pop() until this time has come
      • delay: milliseconds to add to the not-before timestamp. Could be negative

Get-from-queue operations

  • q.pop (cb): get a file from the queue. It will not return until a file is available (that is, present and with a not-before in the past)

    cb will then be called with the usual cb(res, err) form, where res.loc will be the file path, already extracted from the queue and left in the landing directory. After this, the file is not under the queue control, and it will the caller's responsibility to remove it when not needed anymore after treatment

  • q.reserve (opts, cb): reserves a file from the queue. It will not return until a file is available (that is, present and with a not-before in the past)

    cb will then be called with the usual cb(res, err) form, where res.loc will be the file path, still under queue control. Caller should not do anything with it besides reading

    opts can contain:

    • delta: time in milliseconds the element will remain reserved. After that, it will be available for pop() or reserve() again
  • q.commit (id|reserve-res, cb): commits a previously-reserved file. First parameter can be the element id (res.id in the reserve() call) or directly the whole res returned by reserve(). After this call, the file will be removed from the queue and deleted from disk

Direct add/remove of files

  • Files can be added to a queue by directly moving them (as in mv) into the queue's directory. This operation is equivalent to a q.pushFile() with no options. For this to work the file name can't match the format used internally (^([0-9]+)-([0-9a-z]{16})-([0-9]+)$)

    It is important to move them and not copying them: most filesystems can't reliably tell when a file that's opened for writing is closed (or worse: depending on who's producing the file you may see a series of open-append-close operations) so it is not possible to determine when a file can be acted upon

  • However, files can NOT be safely removed (as in rm) or moved away (as in mv) from the queue's directory