npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pg-swag

v1.0.0

Published

A distributed scheduler with a grin.

Downloads

65

Readme

Scheduling with a Grin

Coverage Status

Have you ever needed to run a task (emails, reports, cleanup) at a specific time, or on a recurring schedule? Have you ever had multiple processes you needed to distribute the workload between?

PG Swag is a distributed scheduling library intended to simplify the process of scheduling tasks across one or more nodes, leveraging the Postgres database as a shared state.

During our local testing, we found that this setup easily scaled to 30K Recurring Schedules/s on a single node; however, we believe it to be unlikely that the scheduler will be your bottleneck.

Information

In this package, there are no "leader nodes", each node is responsible for grabbing and executing tasks. It also does not require a separate service to manage the scheduling, as the database is used as the source of truth. Additionally, it is designed to be resilient to node failures, and can be run in a distributed environment. Lastly, it is designed to be efficient with fetching tasks -- it precomputes when the task should be run, and only fetches tasks that are ready to be run.

The querying leverages skip locks to ensure that only one node is running a task at a time; however, I believe it would be trivial to remove it and apply this system to other databases.

How to use

To use this package, you will need to install it with your preferred package manager; we enjoy bun:

bun add pg-swag

Then, you can use it in your code like so:

import { Swag } from 'pg-swag';

const swag = new Swag({
    dialect: 'postgres',
    config: {
        host: 'localhost',
        user: 'postgres',
        password: 'password',
    }
})

await swag.on('email', async job => {
    console.log('Sending email to', job.data.email, 'with message', job.data.body)
})

// Schedules an email to be sent to Bob every day from July 1st, 2024
await swag.schedule('email', { 
    email: '[email protected]',
    body: 'Hello, Bob!'
}, 'R/2024-07-01/P1D')

Alternatively, you can pass in a query method like so:

import { Swag } from 'pg-swag';

const swag = new Swag({
    dialect: 'postgres',
    query: db.query 
})

Supported Scheduling

We support a variety of scheduling options, including:

Some examples are outlined below.

ISO 8601 Repeating Intervals

These are defined by the R prefix, followed by the start date, and then the interval. For example, R/2024-07-01/P1D would start on July 1st, 2024, and repeat every day.

P represents the period, and can be followed by Y for years, M for months, W for weeks, and D for days. Using T will allow you to specify hours, minutes, and seconds.

Examples:

  • R/2024-07-01/P1D - Every day starting July 1st, 2024
  • R/2024-07-01T12:00:00/PT1H - Every hour starting July 1st, 2024 at 12:00:00
  • R/2024-07-01T12:00:00/PT1H30M - Every hour and a half starting July 1st, 2024 at 12:00:00
  • R/PT1H - Every hour starting now

If you specify a start date, you can also specify the number of recurrences you'd like (R#):

  • R5/2024-07-01/P1D - Every day starting July 1st, 2024, up to 5 times
  • R5/2024-07-01T12:00:00/PT1H - Every hour starting July 1st, 2024 at 12:00:00, up to 5 times

Additionally, we support one non-standard format for this interval, R/<start>/<end>/<interval>:

  • R/2024-07-01/2024-07-05/P1D - Every day from July 1st, 2024 to July 5th, 2024
  • R/2024-07-01T12:00:00/2024-07-01T13:00:00/PT1M - Every minute from July 1st, 2024 at 12:00:00 to July 1st, 2024 at 13:00:00

ISO 8601 Dates and Times

  • 2024-07-01T12:00:00 - July 1st, 2024 at 12:00:00
  • 2020-01-01 - January 1st, 2020
  • new Date('2020-01-01') - JavaScript Dates are also supported

These will not repeat, and will only run once.

Duration Objects

You can also pass in a duration object, which is an object with the following fields:

Field | Description -- | -- years | # of Years months | # of Months weeks | # of Weeks days | # of Days hours | # of Hours minutes | # of Minutes seconds | # of Seconds recurrences | # of Recurrences startDate | The date to start the schedule endDate | The date to end the schedule

Upon calling schedule, the duration object will be converted into an ISO8601 Repeating Interval.

  • { days: 1 } - Every day starting now
  • { days: 1, recurrences: 5 } - Every day starting now, up to 5 times
  • { hours: 1, startDate: '2024-07-01T12:00:00' } - Every hour starting July 1st, 2024 at 12:00:00

Cron Expressions

We use the cron-parser package to parse cron expressions. These are defined by the standard cron syntax (and we specifically recommend the 5 field syntax). For example, 0 0 * * * would run every day at midnight.

Future Additions

We are planning on adding support for more scheduling options, for example: "every sunrise" or "every 3 sunrises". If someone needs this, file an issue and we will prioritize it.

Technical Details

In this package, jobs & schedules are one and the same; there is no distinction between the two, thus allowing us to leverage a single table design.

The table structure is as follows

Field | Description -- | -- queue | The type of task you wish to perform id | An ID to represent the task run_at | Generated field to determine when the task should be run data | The data to be passed to the task expression | The scheduling expression locked_until | The time the task is locked until locked_by | The node that has locked the task attempts | The number of attempts that have been made to run the task

The main index for fetching tasks is on queue and greatest(run_at, locked_until) to make it efficient to fetch tasks that are ready to be run.

We also have a unique index on queue and id to ensure that tasks are not duplicated, and make it efficient to update & delete tasks.

Note: In the future, it might be wise for us to automatically handle partitioning the table as different queues are introduced and such. It does not at the moment, however, we do not anticipate this being a problem for most users. It should be reasonably performant for hundreds of thousands of tasks (your bottleneck will likely not be the scheduler).

Class Configuration

When creating a new instance of the scheduler, there are two main options to pass in:

Option | Description | Default -- | -- | -- Connection Configuration | The configuration for the Postgres connection | {} Table Configuration | { schema?: null | string, table?: string } | { table: 'jobs', schema: null }

By default, it will create a table called jobs in the public schema. If you want to use a different schema or table name, you can pass it in as an option.

Schedule Configuration

When setting up a reader for a queue, you can pass in a configuration object to customize the behavior of the reader. The following options are available:

Option | Description | Type | Default -- | -- | -- | -- batchSize | The number of tasks to fetch at a time | number | 100 concurrentJobs | The number of tasks to run concurrently | number | 10 pollingPeriod | The amount of time to wait between polling for tasks | number (milliseconds) or string ('15 seconds') | 15000 flushPeriod | The amount of time before writing finished tasks to the database | number (milliseconds) or string ('15 seconds') | 1000 lockPeriod | The amount of time to lock a task for | number (milliseconds) or string ('15 seconds') | '1 minutes' skipPast | Whether to schedule tasks in the past, or to continue the schedule after current time. See below. | boolean | true maxHeartbeats | The number of heartbeats before releasing a lock on tasks not actively being worked on in a batch | number | Infinity

Skip Past

By default, the scheduler will not schedule tasks in the past. For example, if you have a 5 minute periodicity, and your service was down for 6h, when the scheduler starts back up, it will schedule 5m from the current time, not 6h ago, this is to avoid running ~72 tasks.

If you want to run all the tasks that were missed, you can set skipPast to false. This will schedule all the tasks that were missed, and then continue the schedule as normal.

Error Handling

You might notice that the module does not have an option like maxAttempts. This is because we believe that the error handling should be done in the task (or its error handler) itself. If you want to retry a task, you can simply throw an error, and the task will be retried.

You can also return an object from a job to modify certain behaviors. For example, you can return { expression: 'cancel' } to cancel the task.

The handler will receive the number of attempts that have been made, and you can use this to determine if you should retry the task.

swag.on('email', async job => {
    if (job.attempts > 3) return { expression: 'cancel' }
    // ...
})

If you wanted, you could shift this behavior into the error handler for a queue,

swag.on('email', async job => {
    // ...
}).onError(async (err, job) => {
    if (job.attempts > 3) return { expression: 'cancel' }
})

Or apply it globally, for all queues,

swag.onError(async (err, job) => {
    if (job.attempts > 3) return { expression: 'cancel' }
})

We've provided a utility function to make this simpler:

import { cancelAfter, Swag } from 'pg-swag'
// ... 
Swag.onError(cancelAfter(3))

However, we STRONGLY advise that you go beyond using cancelAfter and write some more sophisticated error handling logic, so that you can communicate to the user (or your developers) what went wrong.

Similarly, it is possible to use { lockedUntil: Date } to lock the task until a specific time. This can be useful if you want to programmatically delay a task upon failure.

Swag.onError(async (err, job) => {
    // Lock the task by 1 extra minute for each attempt
    return { lockedUntil: new Date(Date.now() + 1000 * 60 * job.attempts) }
})