@astodi/embedded-queue
v0.0.3
Published
Embedded job/message queue
Downloads
46
Readme
embedded-queue
embedded-queue is job/message queue for any platform. It does not required any other repository for storing data, like Redis, MySQL, and so on.
It currently has the following repository implementations:
- nedb embedded repository,
- vanilla in-memory repository,
- or you can implement your own repository
Installation
npm install embedded-queue
or
yarn add embedded-queue
Basic Usage
import { Queue, InMemoryJobRepository } from 'embedded-queue'
(async () => {
const queue = await Queue.createQueue(
new InMemoryJobRepository()
// or new NedbJobRepository({ /* you can pass the nedb options here */ })
);
// set up job processor for "adder" type, concurrency is 1
queue.process(
"adder",
async (job) => job.data.a + job.data.b,
1
);
// handle job complete event
queue.on(
EmbeddedQueue.Event.Complete,
(job, result) => {
console.log("Job Completed.");
console.log(` job.id: ${job.id}`);
console.log(` job.type: ${job.type}`);
console.log(` job.data: ${JSON.stringify(job.data)}`);
console.log(` result: ${result}`);
}
);
// create "adder" type job
await queue.createJob({
type: "adder",
data: { a: 1, b: 2 },
});
// shutdown queue
setTimeout(async () => { await queue.shutdown(1000); }, 1);
})();
Basic
- Create Queue
- Set Job Processor
- Set Job Event Handler
- Create Job
- Shutdown Queue
Create Queue
You can create a new queue by calling Queue.createQueue(repository)
. Queue.createQueue
returns a Promise
, await
it for initialize finish.
Set Job Processor
Job processor is a function that process single job. It is called by Worker
and pass Job
argument, it must return Promise<any>
. It runs any process(calculation, network access, etc...) and call resolve(result)
. Required data can pass by Job.data
object. Also you can call Job.setProgress
for notify progress, Job.addLog
for logging.
You can set any number of job processors, each processor is associate to single job type
, it processes only jobs of that type
.
If you want to need process many jobs that same type
in concurrency, you can launch any number of job processor of same type
.
Finally, queue.process
method signature is quque.process(type, processor, concurrency)
.
Set Job Event Handler
Queue
implements EventEmitter
, when job is completed or job is failed, job progress updated, etc..., you can observe these events by set handlers Queue.on(Event, Handler)
.
| Event | Description | Handler Signature |
|------------------|--------------------------------------------------|---------------------------|
| Event.Enqueue
| Job add to queue | (job) => void
|
| Event.Start
| Job start processing | (job) => void
|
| Event.Failure
| Job process fail | (job, error) => void
|
| Event.Complete
| Job process complete | (job, result) => void
|
| Event.Remove
| Job is removed from queue | (job) => void
|
| Event.Error
| Error has occurred (on outside of job processor) | (error, job?) => void
|
| Event.Progress
| Job progress update | (job, progress) => void
|
| Event.Log
| Job log add | (job, message) => void
|
| Event.Priority
| Job priority change | (job, priority) => void
|
Event.Complete
event handler is most commonly used, it can receive job result from job processor.
Create Job
You can create a job by calling Queue.createJob(data)
. data
argument is object that contains type
, priority
and data
.
| Field | Type | Description |
|------------|------------|-------------|
| type
| string
| Identifier for select job processor |
| priority
| Priority
| Queue
picks up job that has high priority first |
| data
| object
| Data that is used by job processor, you can set any data |
Queue.createJob(data)
returns Promise<Job>
object, this job is associated to Queue
.
Priority
is any of Priority.LOW
, Priority.NORMAL
, Priority.MEDIUM
, Priority.HIGH
, Priority.CRITICAL
.
Shutdown Queue
If you want stop processing jobs, you have to call Queue.shutdown(timeoutMilliseconds, type) => Promise<void>
. Queue
starts to stop running job processor, and all job processors are stopped, Promise is resolved. If stopping job processor takes long time, after timeoutMilliseconds
Queue
terminate job processor, and set Job.state
to State.FAILURE
.
You can stop specified type job processor by passing second argument type
. If undefined
is passed, stop all type job processors.
API
Queue API
createJob(data)
: Create a newJob
, see above for usage.process(type, processor, concurrency)
: Set job processor, see above for usage.shutdown(timeoutMilliseconds, type)
: Start shutting downQueue
, see above for usage.findJob(id)
: Search queue byJob.id
. If found returnJob
, otherwise returnnull
.listJobs(state)
: List all jobs that has specified state. If passedundefined
return all jobs.removeJobById(id)
: Remove aJob
from queue that specified id.removeJobsByCallback(callback)
: Remove all jobs thatcallback
returnstrue
. Callback signature is(job) => boolean
.
Job API
setProgress(completed, total)
: Set progress, arguments are convert to percentage value(completed / total).addLog(message)
: Add log.save()
: After call it, job put into associateQueue
, and waiting for process by job processor.remove()
: Remove job fromQueue
, it will not be processed anymore.setPriority(value)
: Setpriority
value.isExist()
: ReturnJob
is inQueue
. Before callingsave()
or after callingremove()
returnsfalse
, otherwisetrue
.- Getters
id
: String that identifiesJob
.type
: String that Identifier for select job processor.data
: Object that is used by job processor, you can set any data.priority
: Number that determines processing order.createdAt
: Date that job is created.updatedAt
: Date that job is updated.startedAt
: Date that job processor start process. Before job start, value isundefined
.completedAt
: Date that job processor complete process. Before job complete or job failed, value isundefined
.failedAt
: Date that job processor occurred error. Before job complete or job complete successfully, value isundefined
.state
: String that represents currentJob
state, any ofState.INACTIVE
,State.ACTIVE
,STATE.COMPLETE
,State.FAILURE
.duration
: Number that processing time ofJob
in milliseconds. Before job complete or job failed, value isundefined
.progress
: Number thatJob
progress in percentage. You can set value by callingJob.setProgress
. When job complete, set 100 automatically.logs
: Array of String. You can add log by callingJob.addLog
.
Advanced
Unexpectedly Termination
If your program suddenly terminated without calling Queue.shutdown
while your processor was processing jobs. These jobs remain State.ACTIVE
in queue. When next time Queue.createQueue
is called, these jobs are updated to State.FAILURE
automatically.
If you want reprocessing these jobs, please call Queue.createJob
with same parameter.
License
MIT