nightcoral
v0.7.0
Published
a tiny inline durable timer
Downloads
15
Readme
nightcoral
NightCoral is a job system in typescript for tiny-to-small servers. It consists of:
- a worker pool (of threads)
- a persistent queue (of jobs)
- persistent timers
It uses sqlite3 for storing the jobs and timers, and optionally reports metrics using crow. Jobs are executed "at least once", meaning any jobs that were in progress during a sudden shutdown (or crash) will be run again when the server restarts. If a job is able to run and complete during a single server session, it may optionally return status information to the primary server thread.
Only one process at a time can run nightcoral; multi-node coordination is a non-goal.
Build
npm install
npm test
Use
Here's a quick example that starts up a pool of 5 worker threads, hands them 10 jobs, and sets an 11th job to execute after one minute:
type Job = { task: string, value: number };
// put persistent storage in `./data/nightcoral.db`. workers will each
// receive their own copy of the `state` object.
const nc = new NightCoral<Job>({
databaseFilename: "./data/nightcoral.db",
state: { redisPassword: "welcome" },
});
// start 5 threads, each running "worker.mjs"
await nc.launch("./lib/worker.mjs", 5);
// post 5 jobs, and one timer that will trigger in one minute.
for (let i = 0; i < 10; i++) nc.add({ task: "count", value: i });
nc.addTimer(60_000, { task: "alarm", value: 500 });
API
new NightCoral<J>(options: Partial<NightCoralOptions<J>> = {})
Configure the job system without launching any tasks. The sqlite database will be created if necessary.
There are two important fields in the options object:
databaseFilename: string
Filename for the sqlite database.
":memory:"
will use a non-persistent in-memory store that evaporates when the process ends, which may be useful for testing.state: any
Each worker will receive this state object (passed via JSON) when it starts up.
And there are several others you might want to change:
jobIsJson: boolean
(true)The persistent queue normally converts jobs to their JSON string representation in the database. If your jobs are already strings, set this to
false
.cacheJobs: number
(50)The database will batch-read this many jobs at a time, to avoid hitting the database often when the queue is moving fast.
logger?: (text: string) => void
errorLogger?: (error: Error | undefined, text: string) => void
traceLogger?: (text: string) => void
These functions are called to log events, if defined. The error logger is called on error, and the trace logger is called for detailed operations that usually only matter if you're debugging a problem.
notifyError?: (error: WorkerPoolError) => void
This function is called if a worker thread dies or is killed. It may be useful if you have an event log or an alerting system.
describeJob: (job: J) => string
(default: JSON.stringify)Use this to customize the description of a job in trace logging.
metrics?: Metrics
To track counters and gauges, a crow-metrics object can be used.
pingFrequency: number
(60000 = 1 minute)pingTimeout: number
(30000 = 30 seconds)How often (in milliseconds) should we ping workers to make sure they're responsive, and how long should we wait for a response before considering that worker to be dead?
workerDeathThreshold: number
(10)workerDeathDuration: number
(15000 = 15 seconds)If
workerDeathThreshold
workers die withinworkerDeathDuration
, assume something is wrong, and shutdown.fakeWorker?: (job: J) => Promise<void>
For testing, you can stub out the worker pool completely and use an inline callback.
localHandler?: (job: J) => A | null
If you'd like to handle certain returned jobs locally, instead of pushing them to new workers, define this function and make it return
null
if it handled the job locally. Otherwise the job will be handed to workers. Usually this is just for tests.
launch(workerCodePath: string, count: number): Promise<void>
Start the worker pool, with count
workers executing the javascript file at workerCodePath
. The promise is fulfilled once all the workers are active and ready to receive jobs. The worker interface is described below.
add(job: J): QueuedJob<J>
Post a job to the worker pool. The returned object contains its unique id
and a delete()
method to try to delete the job before it's executed.
addQuery(job: J): QueuedQueryJob<J>
Post a job to the worker pool, like add
, but also expect a reply in the reply
field of the returned QueuedQueryJob
. This is a promise that is fulfilled (with either the reply, or undefined
) when the job is complete, or with an error if the worker threw an error.
addTimerAt(expires: number, job: A): TimedJob<A>
Store job
until the absolute time in expires
. The returned object has a delete()
method to cancel the job before the timer expires.
addTimer(timeout: number, job: A): TimedJob<A>
Store job
until timeout
milliseconds from now. The returned object has a delete()
method to cancel the job before the timer expires.
idle(): Promise<void>
Resolves the next time all workers are idle (no jobs are running).
stop(): Promise<void>
Shuts down the pool.
Writing worker code
The workerCodePath
passed to launch
must be a javascript file with these exported functions:
export async function setup<J, S>(state: any, portal: WorkerPortal<J>): Promise<S>
(optional)If defined, this function is called with the
state
object passed into nightcoral's constructor, and must return a new object that will be passed to the handler function with each job. This can be used to build global state (like a database connection) that will be used for each job the worker handles.export async function handler<J, S>(job: J, state: S, portal: WorkerPortal<J>): Promise<J | undefined>
This function is called for each job received. A worker is considered "busy" until this function's promise resolves, and won't be given another job until then.
state
is whatever was returned from thesetup
function, if that existed; otherwise, it's the originalstate
from the nightcoral constructor.If the handler returns a job (
J
), it will be used to fulfill the promise returned from the top-leveladd()
function. This can be a useful optimization for higher-level APIs where you'd like to return a successful status message if the background task completes within a short time: add your status field to the job and return it. Also, this will only work if the job is created and executed within the same server execution -- in other words, if the server hasn't crashed or shutdown in the meantime; otherwise, the job will execute when the server restarts, but there will be no one around to hear the reply.
A WorkerPortal
is passed to each function, for communicating with the pool's coordinator. It has these fields:
workerId: number
-- the id of this workerpostJob(job: J): void
-- post a new job to the queuepostJobs(jobs: J[]): void
-- post a batch of new jobs to the queuepostJobAfter(timeout: number, job: J): void
-- post a job to be executed after a delay (on a timer)postJobAt(expires: number, job: J): void
-- post a job to be executed at (or after) a specific absolute time
Metrics collected
pool-workers
: total number of worker threads runningpool-workers-idle
: number of worker threads available (not currently working)pool-jobs-retired
: count of jobs executed since startuppool-job-time
: timing distribution of how long jobs take when executedpool-ping-msec
: timing distribution of how long workers are taking to respond to periodic pingspool-workers-died
: count of workers that have died unexpectedly or were killed for not responding to a ping in timequeue_size
: number of jobs waiting in the queue (normally 0)queue_processing
: number of jobs currently executingqueue_latency
: milliseconds the last job sat in the queue before being executedtimer_count
: number of outstanding (not expired) timerstimer_idle_workers
: number of tasks currently polling timers (normally 1)timer_latency
: milliseconds between the last expired timer and executing its job
License
Apache 2 (open-source) license, included in LICENSE.txt
.
Authors
- https://messydesk.social/@robey - Robey Pointer [email protected]