npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

ponos

v5.8.3

Published

An opinionated queue based worker server for node.

Downloads

109

Readme

Ponos

travis coveralls dependencies devdependencies codeclimate

Documentation is available at runnable.github.io/ponos

A migration guide for v3.0.0 is available!

An opinionated queue based worker server for node.

For ease of use we provide options to set the host, port, username, and password to the RabbitMQ server. If not present in options, the server will attempt to use the following environment variables and final defaults:

options | environment | default ------------------------------------|-----------------------------|-------------- opts.rabbitmq.hostname | RABBITMQ_HOSTNAME | 'localhost' opts.rabbitmq.port | RABBITMQ_PORT | '5672' opts.rabbitmq.username | RABBITMQ_USERNAME | none opts.rabbitmq.password | RABBITMQ_PASSWORD | none opts.redisRateLimiting.host | REDIS_HOST | 'localhost' opts.redisRateLimiting.port | REDIS_PORT | '6379' opts.redisRateLimiting.durationMs | RATE_LIMIT_DURATION | 1000 opts.log | N/A | Basic bunyan instance with stdout stream (for logging) opts.errorCat | N/A | Basic error-cat instance (for rollbar error reporting)

Other options for Ponos are as follows:

environment variable | default | description -------------------------|---------|------------ WORKER_MAX_RETRY_DELAY | 0 | The maximum time, in milliseconds, that the worker will wait before retrying a task. The timeout will exponentially increase from MIN_RETRY_DELAY to MAX_RETRY_DELAY if the latter is set higher than the former. If this value is not set, the worker will not exponentially back off. WORKER_MIN_RETRY_DELAY | 1 | Time, in milliseconds, the worker will wait at minimum will wait before retrying a task. WORKER_TIMEOUT | 0 | Timeout, in milliseconds, at which the worker task will be retried. WORKER_MAX_NUM_RETRIES | 0 | The maximum number of times a job will retry due to failures. 0 means infinity

Usage

From a high level, Ponos is used to create a worker server that responds to jobs provided from RabbitMQ. The user defines handlers for each queue's jobs that are invoked by Ponos.

Ponos has built in support for retrying and catching specific errors, which are described below.

Workers

Workers need to be defined as a function that takes a Object job an returns a promise. For example:

function myWorker (job) {
  return Promise.resolve()
    .then(() => {
      return doSomeWork(job)
    })
}

This worker takes the job, does work with it, and returns the result. Since (in theory) this does not throw any errors, the worker will see this resolution and acknowledge the job.

Tasks vs. Events

Ponos provides (currently) two paradigms for doing work. First is subscribing directly to queues in RabbitMQ using the tasks parameter in the constructor. The other is the ability to subscribe to a fanout exchange using the events parameter, which can provide for a much more useful utilization of RabbitMQ's structure.

const ponos = require('ponos')
const server = new ponos.Server({
  tasks: {
    'a-queue': (job) => { return Promise.resolve(job) }
  },
  events: {
    'an-exchange': (job) => { return Promise.resolve(job) }
  }
})

Worker Errors

Ponos's worker is designed to retry any error that is not specifically a fatal error. Ponos has been designed to work well with our error library error-cat.

A fatal error is defined with the WorkerStopError class from error-cat. If a worker rejects with a WorkerStopError, the worker will automatically assume the job can never be completed and will acknowledge the job.

As an example, a WorkerStopError can be used to fail a task given an invalid job:

const WorkerStopError = require('error-cat/errors/worker-stop-error')
function myWorker (job) {
  return Promise.resolve()
    .then(() => {
      if (!job.host) {
        throw new WorkerStopError('host is required', {}, 'my.queue', job)
      }
    })
    .then(() => {
      return doSomethingWithHost(job)
    })
}

This worker will reject the promise with a WorkerStopError. Ponos will log the error itself, acknowledge the job to remove it from the queue, and continue with other jobs. You may catch and re-throw the error if you wish to do additional logging or reporting.

Finally, as was mentioned before, Ponos will retry any other errors. error-cat provides a WorkerError class you may use, or you may throw normal Errors. If you do, the worker will catch these and retry according to the server's configuration (retry delay, back-off, max delay, etc.).

const WorkerError = require('error-cat/errors/worker-error')
const WorkerStopError = require('error-cat/errors/worker-stop-error')
function myWorker (job) {
  return Promise.resolve()
    .then(() => {
      return externalService.something(job)
    })
    // Note: w/o this catch, the error would simply propagate to the worker and
    // be handled.
    .catch((err) => {
      logErrorToService(err)
      // If the error is 'recoverable' (e.g., network fluke, temporary outage),
      // we want to be able to retry.
      if (err.isRecoverable) {
        throw new Error('hit a momentary issue. try again.')
      } else {
        // maybe we know we can't recover from this
        throw new WorkerStopError(
          'cannot recover. acknowledge and remove job',
          {},
          'this.queue',
          job
        )
      }
    })
}

Worker Options

Currently workers can be defined with a msTimeout option. This value defaults to process.env.WORKER_TIMEOUT || 0. One can set a specific millisecond timeout for a worker like so:

server.setTask('my-queue', workerFunction, { msTimeout: 1234 })

Or one can set this option via setAllTasks:

server.setAllTasks({
  // This will use the default timeout, maxNumRetries, ...
  'queue-1': queueOneTaskFn,
  // This will use the specified timeout, maxNumRetries, ...
  'queue-2': {
    // worker function to run
    task: queueTwoTaskFn,

    // schema to validate job against
    jobSchema: Joi.object({ tid: Joi.string() }),

    // time before job will throw timeout error
    msTimeout: 1337,

    // number of times before job will stop retrying on failure
    maxNumRetries: 7,

    // function to run when we hit max retries
    finalRetryFn: () => { return Promise.try(...)},

    // number of jobs that we can perform in given duration
    maxOperations: 5,

    // duration under which rate limit is accounted for
    durationMs: 60000
  }
})

These options are also available for setEvent and setAllEvents.

Worker Namespaces

Each worker is wrapped in a continuation-local-storage namespace called ponos.

Ponos adds a tid to the ponos namespace. This tid is unique per job. To access this tid:

const getNamespace = require('continuation-local-storage').getNamespace

module.export.worker = Promise.try(() => {
  const tid = getNamespace('ponos').get('tid')
  console.log(`hello world: tid: ${tid}`)
})

NOTES:

  • Promise.resolve().then(() => {...}) breaks out of Ponos namespace and tid will not be available
  • getNamespace must be called in the worker itself

Full Example

const ponos = require('ponos')

const tasks = {
  'queue-1': (job) => { return Promise.resolve(job) },
  'queue-2': (job) => { return Promise.resolve(job) }
}

const events = {
  'exchange-1': (job) => { return Promise.resolve(job) }
}

// Create the server
var server = new ponos.Server({
  events: events,
  tasks: tasks
})

// If tasks were not provided in the constructor, set tasks for workers handling
// jobs on each queue
server.setAllTasks(tasks)
// Similarly, you can set events.
server.setAllEvents(events)

// Start the server!
server.start()
  .then(() => { console.log('Server started!') })
  .catch((err) => { console.error('Server failed', err) })

License

MIT