nats-jobs
v0.8.0
Published
Background job processor using NATS
Downloads
540
Maintainers
Readme
NATS Jobs
Background job processing using NATS JetStream for distributing work.
See examples directory for more examples.
Companion libraries
Usage
This library uses debug. To enable:
DEBUG=nats-jobs node myfile.js
msgpackr is the recommended way to encode complex data structures since it's fast, efficient, and can handle serializing and unserializing dates.
Processing jobs
import { JsMsg } from 'nats'
import { setTimeout } from 'node:timers/promises'
import { expBackoff, jobProcessor } from 'nats-jobs'
const def = {
stream: 'ORDERS',
backoff: expBackoff(1000),
async perform(msg: JsMsg) {
console.log(`Started ${msg.info.streamSequence}`)
console.log(msg.data.toString())
// Simulate work
await setTimeout(5000)
console.log(`Completed ${msg.info.streamSequence}`)
},
}
const processor = await jobProcessor()
const myJob = processor.start(jobDef)
// Gracefully handle shutdown
const shutDown = async () => {
await myJob.stop()
process.exit(0)
}
process.on('SIGTERM', shutDown)
process.on('SIGINT', shutDown)
To gracefully shutdown mutliple jobs and close the NATS connection call
stop
from the object returned by jobProcessor
.
const processor = await jobProcessor()
const jobs = [
processor.start(jobDef1),
processor.start(jobDef2),
processor.start(jobDef3),
]
const shutDown = async () => {
// Shuts down jobs 1, 2, and 3 and closes the NATS connection
await processor.stop()
process.exit(0)
}
process.on('SIGTERM', shutDown)
process.on('SIGINT', shutDown)
Publish via NATS
nats pub ORDERS someText
Strategies
Short-lived jobs
For short-lived jobs like sending an email you shouldn't need to enable autoExtendTimeout
. Processing of a message should happen in less than the 10 second ack_wait
default.
Long-lived jobs
Long-lived jobs should use a short ack_wait
with autoExtendAckTimeout
set to true
.
You may also want to use the timeout
option as a sanity check. A timeout call abort on the abort signal just like calling stop
. However, you can differntiate by checking signal.reason
which will be set to timeout
for a timeout and stop
when stop
is called. This allows the job control of how it wants to handle this situation with two likely scenarios:
(1) Finish processing current record and exit.
(2) Register an onabort
callback that logs an error and calls process.exit()
.
Testing
Run the following in the /docker
directory to start up NATS.
docker compose up