npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@racunis/postgresql

v1.0.0

Published

<p align='center'> <img src='https://github.com/mrcointreau/racunis/raw/main/assets/logo.svg' alt='Racunis Logo' width='200' height='200'> </p>

Downloads

1

Readme

@racunis/postgresql

@racunis/postgresql is a TypeScript-based, robust, and flexible job queueing system designed for PostgreSQL databases. It facilitates easy management of background tasks in Node.js applications using PostgreSQL. The system abstracts the complexities of database operations and job processing, offering a straightforward and powerful API for job queueing and processing.

Features

  • PostgreSQL-specific implementations of core abstractions.
  • Type definitions for jobs and events.
  • Easy to extend and integrate into existing applications.
  • Configurable options for queues and workers.
  • Support for handling multiple job types within a worker.
  • Event handling for job states and errors.

To install @racunis/postgresql, use one of the following commands:

# npm
npm install @racunis/postgresql

# yarn
yarn add @racunis/postgresql

# pnpm
pnpm add @racunis/postgresql

# bun
bun add @racunis/postgresql

Usage

Example

Here is a complete example of how to use @racunis/postgresql to manage a job queue and worker with PostgreSQL.

import { PostgreSqlQueue, PostgreSqlWorker } from '@racunis/postgresql'

interface JobPayload {
  task: string
}

const main = async () => {
  // Create the queue
  const queue = await PostgreSqlQueue.create<JobPayload>('Queue', {
    connectionString: 'postgresql://user:password@localhost:port/db',
  })

  // Add a job to the queue
  await queue.add({ task: 'processData' }, 10)

  // Create a worker to process jobs from the queue
  const worker = PostgreSqlWorker.create(queue, (job) => {
    console.log('Processing job:', job.payload.task)
  })
}

try {
  await main()
}
catch (error) {
  console.error(error)
}

Handling Multiple Job Types

To handle different job types within a worker, you can use a switch statement to process each job type accordingly.

import { PostgreSqlQueue, PostgreSqlWorker } from '@racunis/postgresql'
import type { Job } from '@racunis/core'

// Define job payload interfaces for different job types
export interface EmailProcessingJobPayload {
  type: 'EmailProcessing'
  details: {
    recipient: string
    content: string
  }
}

export interface DataMigrationJobPayload {
  type: 'DataMigration'
  details: {
    source: string
    destination: string
  }
}

export interface ReportGenerationJobPayload {
  type: 'ReportGeneration'
  details: {
    reportId: string
    requestedBy: string
  }
}

// Union type for job payloads
export type JobPayload = EmailProcessingJobPayload | DataMigrationJobPayload | ReportGenerationJobPayload

const main = async () => {
  // Create the queue
  const queue = await PostgreSqlQueue.create<JobPayload>('Queue', {
    connectionString: 'postgresql://user:password@localhost:port/db',
  })

  // Add jobs to the queue
  await queue.add({ type: 'EmailProcessing', details: { recipient: '[email protected]', content: 'Hello, World!' } }, 5)
  await queue.add({ type: 'DataMigration', details: { source: 'System A', destination: 'System B' } }, 3)
  await queue.add({ type: 'ReportGeneration', details: { reportId: 'R123', requestedBy: 'admin' } }, 4)

  // Create a worker to process jobs from the queue
  const worker = PostgreSqlWorker.create(queue, (job) => {
    const { type, details } = job.payload
    switch (type) {
      case 'EmailProcessing':
        console.log(`Processing Email Job: Recipient - ${details.recipient}, Content - ${details.content}`)
        break
      case 'DataMigration':
        console.log(`Processing Data Migration Job: Source - ${details.source}, Destination - ${details.destination}`)
        break
      case 'ReportGeneration':
        console.log(`Processing Report Generation Job: Report ID - ${details.reportId}, Requested By - ${details.requestedBy}`)
        break
    }
  })
}

try {
  await main()
}
catch (error) {
  console.error(error)
}

Queue and Worker Options

@racunis/postgresql provides configurable options for queues and workers to customize their behavior.

Queue Options

  • autostart: Automatically start processing jobs when the queue is created. Default is true.

    const queue = await PostgreSqlQueue.create<JobPayload>('Queue', {
      connectionString: 'postgresql://user:password@localhost:port/db',
    }, {
      autostart: false, // Do not start automatically
    })

Worker Options

  • autostart: Automatically start the worker when it is created. Default is true.

  • processingInterval: Interval in milliseconds between job processing attempts. Default is 0.

  • waitingInterval: Interval in milliseconds for checking new jobs when the worker is waiting. Default is 1000.

  • maxRetries: Maximum number of retries for processing a job. Default is 3.

  • retryInterval: Interval in milliseconds between retry attempts. Default is 500.

    const worker = PostgreSqlWorker.create<JobPayload>(queue, (job) => {
      console.log('Processing job:', job.payload.task)
    }, {
      autostart: false, // Do not start automatically
      processingInterval: 1000, // Process jobs every 1 second
      waitingInterval: 2000, // Check for new jobs every 2 seconds
      maxRetries: 5, // Retry failed jobs up to 5 times
      retryInterval: 1000, // Wait 1 second between retries
    })

These options allow you to tailor the behavior of queues and workers to fit your specific requirements.

Queue and Worker Events

@racunis/postgresql supports various events for job queues and workers to handle job states and errors effectively. Below is an overview of the events and how you can use them.

Queue Events

These events are triggered by the queue during different stages of job processing.

  • activated: Emitted when a job is activated.
  • completed: Emitted when a job is completed.
  • failed: Emitted when a job fails.
  • error: Emitted when an error occurs.
import { PostgreSqlQueue } from '@racunis/postgresql'
import type { Job } from '@racunis/core'

interface JobPayload {
  type: string
  details: any
}

const main = async () => {
  // Create the queue
  const queue = await PostgreSqlQueue.create<JobPayload>('Queue', {
    connectionString: 'postgresql://user:password@localhost:port/db',
  })

  // Event listener for when a job is activated
  queue.on('activated', ({ job }) => {
    console.log(`Job activated: ${job.id}`)
  })

  // Event listener for when a job is completed
  queue.on('completed', ({ job }) => {
    console.log(`Job completed: ${job.id}`)
  })

  // Event listener for when a job fails
  queue.on('failed', ({ job, error }) => {
    console.log(`Job failed: ${job.id}, Error: ${error.message}`)
  })

  // Event listener for queue errors
  queue.on('error', ({ error }) => {
    console.error(`Queue error: ${error.message}`)
  })

  // Add jobs to the queue
  await queue.add({ type: 'EmailProcessing', details: { recipient: '[email protected]', content: 'Hello, World!' } }, 5)
}

try {
  await main()
}
catch (error) {
  console.error(error)
}

Worker Events

These events are triggered by the worker while processing jobs.

  • waiting: Emitted when the worker is waiting for a job.
  • activated: Emitted when a job is activated.
  • completed: Emitted when a job is completed.
  • failed: Emitted when a job fails.
import { PostgreSqlQueue, PostgreSqlWorker } from '@racunis/postgresql'
import type { Job } from '@racunis/core'

interface JobPayload {
  type: string
  details: any
}

const main = async () => {
  // Create the queue
  const queue = await PostgreSqlQueue.create<JobPayload>('Queue', {
    connectionString: 'postgresql://user:password@localhost:port/db',
  })

  // Create a worker to process jobs from the queue
  const worker = PostgreSqlWorker.create(queue, (job) => {
    const { type, details } = job.payload
    switch (type) {
      case 'EmailProcessing':
        console.log(`Processing Email Job: Recipient - ${details.recipient}, Content - ${details.content}`)
        break
      case 'DataMigration':
        console.log(`Processing Data Migration Job: Source - ${details.source}, Destination - ${details.destination}`)
        break
      case 'ReportGeneration':
        console.log(`Processing Report Generation Job: Report ID - ${details.reportId}, Requested By - ${details.requestedBy}`)
        break
    }
  }, { autostart: false })

  // Event listener for when the worker is waiting for a job
  worker.on('waiting', () => {
    console.log('Worker is waiting for a job...')
  })

  // Event listener for when a job is activated
  worker.on('activated', ({ job }) => {
    console.log(`Job activated: ${job.id}`)
  })

  // Event listener for when a job is completed
  worker.on('completed', ({ job }) => {
    console.log(`Job completed: ${job.id}`)
  })

  // Event listener for when a job fails
  worker.on('failed', ({ job, error }) => {
    console.log(`Job failed: ${job.id}, Error: ${error.message}`)
  })

  // Start the worker
  worker.start()
}

try {
  await main()
}
catch (error) {
  console.error(error)
}

Queue and Worker Control

@racunis/postgresql provides mechanisms to control the execution of queues and workers, as well as methods to manage the state of jobs in the queue.

Getting Job Counts

You can get the count of jobs in various states using the getJobCounts method. This is useful for monitoring and managing the state of your job queue.

const jobCounts = await queue.getJobCounts('waiting', 'active', 'completed', 'failed')
console.log(`Waiting: ${jobCounts.waiting}`)
console.log(`Active: ${jobCounts.active}`)
console.log(`Completed: ${jobCounts.completed}`)
console.log(`Failed: ${jobCounts.failed}`)

This will give you an object with the counts of jobs in the specified states, helping you keep track of the queue's status.

Starting and Stopping Queues and Workers

You can start and stop job processing in queues and workers using the start and stop methods.

  • Queue Start/Stop: Use start() to begin processing jobs in the queue and stop() to halt job processing.

    await queue.start() // Start processing jobs
    await queue.stop()  // Stop processing jobs
  • Worker Start/Stop: Use start() to begin job processing by the worker and stop() to halt job processing.

    worker.start() // Start the worker
    await worker.stop() // Stop the worker

If you attempt to start a worker on a stopped queue, the worker will not start. Conversely, if you restart a queue that has a previously stopped worker, the worker will automatically start processing jobs again.

Draining and Emptying the Queue

@racunis/postgresql provides methods to manage the jobs in the queue:

  • drain(): Deletes all waiting jobs in the queue.

    await queue.drain() // Clear all waiting jobs
  • empty(): Deletes all jobs in all states (waiting, active, completed, and failed).

    await queue.empty() // Clear all jobs in the queue

These methods allow you to maintain control over the job queue, ensuring you can clear jobs as needed.

Closing the Queue

The close method allows you to close the queue and release all associated resources. This is particularly useful for clean-up operations and ensuring that all connections are properly terminated.

await queue.close() // Close the queue and release resources

This method stops the queue from processing jobs, closes all associated workers, and releases any resources used by the queue, such as database connections.

Contributing

I appreciate your interest in contributing! Please see the CONTRIBUTING.md file for guidelines on how to contribute.

License

@racunis/core is licensed under the MIT License. See the LICENSE file for more details.