npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

apoq

v0.2.1

Published

PostgreSQL-backed Node.js task queue

Downloads

30

Readme

apoq

A PostgreSQL Queue for processing background tasks, with a focus on simplicity and reliability.

Example

const { Apoq } = require("apoq")

const MESSAGES = ["💖", "🧡", "💛", "💚", "💙"]
const randomColor = () => MESSAGES[Math.floor(Math.random() * MESSAGES.length)]

const main = async () => {
  const apoq = new Apoq(process.env.DATABASE_URL)

  // Prepare the database tables to record tasks
  await apoq.prepare()

  // Add some example "sendMessage" tasks
  for (let i = 0; i < 100; i++) {
    await apoq.add("sendMessage", { color: randomColor() })
  }

  // Give apoq a function which processes "sendMessage" tasks
  apoq.use("sendMessage", async ({ data }) => {
    console.log(`📧 sending a ${data.color}`)
  })

  // Listen for task completed events
  apoq.events.on("task.completed", (task) => {
    console.log(`✨ task ${task.id} complete`)
  })

  // Start processing tasks!
  await apoq.start()
}

main()

Usage

Configuration and setup

Initialize an instance of apoq with a connection string or an object of options that the pg library accepts, for example:

const { Apoq } = require("apoq")

const apoq = new Apoq(process.env.DATABASE_URL)

or

const { Apoq } = require("apoq")

const apoq = new Apoq({
  user: 'postgres',
  host: 'localhost',
  database: 'app',
  password: 'shhh!',
  port: 5432,
})

Migrating the database

The library manages 2 tables (apoq_tasks and apoq_migrations), and the prepare() function takes care of running any pending migrations against an apoq instance. You need to run this before using the library; for convenience, you might choose to run it automatically during deployments on when your app starts.

const { Apoq } = require("apoq")

const apoq = new Apoq(process.env.DATABASE_URL)
await apoq.prepare()

Instance functions

add

add(type: string, data: object)

Add a task to the queue. The type is used by workers to determine what to do when processing this task, so you'll probably want to use it to either label the work to be done (e.g. sendWelcomeEmail) or as an event name (e.g. user.created).

The data object can hold any additional information for the task processor, e.g. { userId: 123 }. It'll be stored in a JSONB column.

apoq.add("logMessage", { foo: "bar!" })

use

use(type: string, processor: function)

use(type: string, processor: function, options: { retryDelay: function, retryLimit: number })

Configures a processor function this worker should use for a specific type of task. The worker will call the processor function for each task we add with the same type.

const processor = (args) => {
  console.log(`Message: ${args.data.foo}`)
}

apoq.use("logMessage", processor)

You can also provide retry options to indicate how the worker should deal with tasks the fail. By default task processors have a retryLimit of 5, so they will retry a task up to 5 times (for a total of 6 attempts) if it fails. retryDelay is a function which takes the current number of failures and determines how many seconds to wait before the next attempt. The default retryDelay function exponentially backs off with jitter.

Here's an example of a simple custom retry delay which will retry 2 seconds after the first failure, 4 seconds after the second failure, 6 seconds after the third failure, then give up because of the limit:

const processor = async (args) => {
  await fetch("https://flaky.example")
}

const retryDelay = (failCount) => {
  return failCount * 2
}

apog.use("pingFlakyApi", processor, { retryLimit: 3, retryDelay })

start

await apoq.start()

Starts a worker, which will check for all task types configured with use and run their functions for you.

work

await apoq.work()

Transactionally works the next configured task in the queue. This will run a single task then resolve, marking the task as completed or failed. If there are no tasks, it'll resolve.

Most of the time you'll want to use start instead, as start continues to run (or wait for) tasks until the worker is stopped while work only runs a single task.

stop

await apoq.stop()

Stops a worker started with start. Any tasks that are already actively being processed by the worker will continue until they complete or fail, but no more tasks will be started.

Events

You can subscribe to events for an apoq instance.

task.completed

This event is emitted after a task has been completed and the task's transaction has committed. It receives the completed task as an argument.

apoq.events.on("task.completed", (task) => {
  console.log(`✨ task ${task.id} complete`)
})

task.failed

This event is emitted when a task fails, eg. the task processor function threw an error. It receives the failed task and the error as arguments.

The task will be in the failed state, and will not be retried automatically.

apoq.events.on("task.failed", (task, error) => {
  console.log(`💥 task ${task.id} failed...`)
  console.error(error)
})