npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nightcoral

v0.7.0

Published

a tiny inline durable timer

Downloads

15

Readme

nightcoral

NightCoral is a job system in typescript for tiny-to-small servers. It consists of:

  • a worker pool (of threads)
  • a persistent queue (of jobs)
  • persistent timers

It uses sqlite3 for storing the jobs and timers, and optionally reports metrics using crow. Jobs are executed "at least once", meaning any jobs that were in progress during a sudden shutdown (or crash) will be run again when the server restarts. If a job is able to run and complete during a single server session, it may optionally return status information to the primary server thread.

Only one process at a time can run nightcoral; multi-node coordination is a non-goal.

Build

npm install
npm test

Use

Here's a quick example that starts up a pool of 5 worker threads, hands them 10 jobs, and sets an 11th job to execute after one minute:

type Job = { task: string, value: number };

// put persistent storage in `./data/nightcoral.db`. workers will each
// receive their own copy of the `state` object.
const nc = new NightCoral<Job>({
  databaseFilename: "./data/nightcoral.db",
  state: { redisPassword: "welcome" },
});

// start 5 threads, each running "worker.mjs"
await nc.launch("./lib/worker.mjs", 5);

// post 5 jobs, and one timer that will trigger in one minute.
for (let i = 0; i < 10; i++) nc.add({ task: "count", value: i });
nc.addTimer(60_000, { task: "alarm", value: 500 });

API

new NightCoral<J>(options: Partial<NightCoralOptions<J>> = {})

Configure the job system without launching any tasks. The sqlite database will be created if necessary.

There are two important fields in the options object:

  • databaseFilename: string

    Filename for the sqlite database. ":memory:" will use a non-persistent in-memory store that evaporates when the process ends, which may be useful for testing.

  • state: any

    Each worker will receive this state object (passed via JSON) when it starts up.

And there are several others you might want to change:

  • jobIsJson: boolean (true)

    The persistent queue normally converts jobs to their JSON string representation in the database. If your jobs are already strings, set this to false.

  • cacheJobs: number (50)

    The database will batch-read this many jobs at a time, to avoid hitting the database often when the queue is moving fast.

  • logger?: (text: string) => void

  • errorLogger?: (error: Error | undefined, text: string) => void

  • traceLogger?: (text: string) => void

    These functions are called to log events, if defined. The error logger is called on error, and the trace logger is called for detailed operations that usually only matter if you're debugging a problem.

  • notifyError?: (error: WorkerPoolError) => void

    This function is called if a worker thread dies or is killed. It may be useful if you have an event log or an alerting system.

  • describeJob: (job: J) => string (default: JSON.stringify)

    Use this to customize the description of a job in trace logging.

  • metrics?: Metrics

    To track counters and gauges, a crow-metrics object can be used.

  • pingFrequency: number (60000 = 1 minute)

  • pingTimeout: number (30000 = 30 seconds)

    How often (in milliseconds) should we ping workers to make sure they're responsive, and how long should we wait for a response before considering that worker to be dead?

  • workerDeathThreshold: number (10)

  • workerDeathDuration: number (15000 = 15 seconds)

    If workerDeathThreshold workers die within workerDeathDuration, assume something is wrong, and shutdown.

  • fakeWorker?: (job: J) => Promise<void>

    For testing, you can stub out the worker pool completely and use an inline callback.

  • localHandler?: (job: J) => A | null

    If you'd like to handle certain returned jobs locally, instead of pushing them to new workers, define this function and make it return null if it handled the job locally. Otherwise the job will be handed to workers. Usually this is just for tests.

launch(workerCodePath: string, count: number): Promise<void>

Start the worker pool, with count workers executing the javascript file at workerCodePath. The promise is fulfilled once all the workers are active and ready to receive jobs. The worker interface is described below.

add(job: J): QueuedJob<J>

Post a job to the worker pool. The returned object contains its unique id and a delete() method to try to delete the job before it's executed.

addQuery(job: J): QueuedQueryJob<J>

Post a job to the worker pool, like add, but also expect a reply in the reply field of the returned QueuedQueryJob. This is a promise that is fulfilled (with either the reply, or undefined) when the job is complete, or with an error if the worker threw an error.

addTimerAt(expires: number, job: A): TimedJob<A>

Store job until the absolute time in expires. The returned object has a delete() method to cancel the job before the timer expires.

addTimer(timeout: number, job: A): TimedJob<A>

Store job until timeout milliseconds from now. The returned object has a delete() method to cancel the job before the timer expires.

idle(): Promise<void>

Resolves the next time all workers are idle (no jobs are running).

stop(): Promise<void>

Shuts down the pool.

Writing worker code

The workerCodePath passed to launch must be a javascript file with these exported functions:

  • export async function setup<J, S>(state: any, portal: WorkerPortal<J>): Promise<S> (optional)

    If defined, this function is called with the state object passed into nightcoral's constructor, and must return a new object that will be passed to the handler function with each job. This can be used to build global state (like a database connection) that will be used for each job the worker handles.

  • export async function handler<J, S>(job: J, state: S, portal: WorkerPortal<J>): Promise<J | undefined>

    This function is called for each job received. A worker is considered "busy" until this function's promise resolves, and won't be given another job until then. state is whatever was returned from the setup function, if that existed; otherwise, it's the original state from the nightcoral constructor.

    If the handler returns a job (J), it will be used to fulfill the promise returned from the top-level add() function. This can be a useful optimization for higher-level APIs where you'd like to return a successful status message if the background task completes within a short time: add your status field to the job and return it. Also, this will only work if the job is created and executed within the same server execution -- in other words, if the server hasn't crashed or shutdown in the meantime; otherwise, the job will execute when the server restarts, but there will be no one around to hear the reply.

A WorkerPortal is passed to each function, for communicating with the pool's coordinator. It has these fields:

  • workerId: number -- the id of this worker
  • postJob(job: J): void -- post a new job to the queue
  • postJobs(jobs: J[]): void -- post a batch of new jobs to the queue
  • postJobAfter(timeout: number, job: J): void -- post a job to be executed after a delay (on a timer)
  • postJobAt(expires: number, job: J): void -- post a job to be executed at (or after) a specific absolute time

Metrics collected

  • pool-workers: total number of worker threads running
  • pool-workers-idle: number of worker threads available (not currently working)
  • pool-jobs-retired: count of jobs executed since startup
  • pool-job-time: timing distribution of how long jobs take when executed
  • pool-ping-msec: timing distribution of how long workers are taking to respond to periodic pings
  • pool-workers-died: count of workers that have died unexpectedly or were killed for not responding to a ping in time
  • queue_size: number of jobs waiting in the queue (normally 0)
  • queue_processing: number of jobs currently executing
  • queue_latency: milliseconds the last job sat in the queue before being executed
  • timer_count: number of outstanding (not expired) timers
  • timer_idle_workers: number of tasks currently polling timers (normally 1)
  • timer_latency: milliseconds between the last expired timer and executing its job

License

Apache 2 (open-source) license, included in LICENSE.txt.

Authors