moleculer-psql-queue
v0.2.0
Published
Task queue service with graphile-worker
Downloads
2
Readme
Job queue mixin for graphile-worker.
Install
$ npm install moleculer-psql-queue
Configuration
Start your PostgreSQL db.
Create an empty db
psql -U postgres -c 'CREATE DATABASE task_queue'
.Replace
task_queue
with your db nameUse graphile-worker CLI to init the schema for the jobs
npx graphile-worker -c "postgres://postgres:postgres@localhost:5444/task_queue" --schema-only
.Set your connection URL (more info: check PSQL docs) and replace
task_queue
with db name that you've defined instep 2)
Usage
Create queue worker service
const PsqlQueueService = require("moleculer-psql-queue");
broker.createService({
name: "task-worker",
mixins: [
PsqlQueueService(
"postgres://postgres:postgres@localhost:5444/task_queue",
// Default opts
{
// Name of the property in service schema.
schemaProperty: "queues",
// Name of the method in Service to create jobs
createJobMethodName: "createJob",
// Name of the property in Service to produce jobs
producerPropertyName: "$producer",
// Name of the property in Service to consume jobs
consumerPropertyName: "$consumer",
// Name of the internal queue that's used to store the job handlers
internalQueueName: "$queue",
// Name of the property in Service settings to register job event handlers
jobEventHandlersSettingsProperty: "jobEventHandlers",
// Optional producer configs: More info: https://github.com/graphile/worker#workerutilsoptions
producerOpts: {},
// Optional worker configs. More info: https://github.com/graphile/worker#runneroptions
queueOpts: {
concurrency: 5,
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
noHandleSignals: false,
pollInterval: 1000,
},
}
),
],
queues: {
/**
* @param {Object} payload Message payload
* @param {import('graphile-worker').JobHelpers} helpers graphile-worker
* More info about helpers: https://github.com/graphile/worker#creating-task-executors
*/
"sample.task"(payload, helpers) {
// if (Math.random() > 0.5) {
this.logger.info('New "simple.task" received!', payload);
return;
// } else {
// throw new Error('Random "sample.task" error');
// }
},
"another.task": {
/**
* @param {Object} payload Message payload
* @param {import('graphile-worker').JobHelpers} helpers Postgres helpers
* More info about helpers: https://github.com/graphile/worker#creating-task-executors
*/
process(payload, helpers) {
this.logger.info('New "another.task" job received!', payload);
},
},
},
});
Customize worker logger
const PsqlQueueService = require("moleculer-psql-queue");
broker.createService({
name: "task-worker",
mixins: [
PsqlQueueService(
"postgres://postgres:postgres@localhost:5444/task_queue"
),
],
methods: {
/**
* Replaces Default logger with custom one.
* By default uses Moleculer logger instance
* More info: https://github.com/graphile/worker#logger
*/
initLogger() {
/**
* @param {String} level Log level
* @param {String} message Message to log
* @param {Object} meta Additional metadata
*/
return (level, message, meta) => {
this.loggerQueue[level](message);
};
},
},
// Add Workers here
queues: {},
});
Listen to queue events
const PsqlQueueService = require("moleculer-psql-queue");
broker.createService({
name: "task-worker",
mixins: [
PsqlQueueService(
"postgres://postgres:postgres@localhost:5444/task_queue"
),
],
settings: {
/**
* @type {Record<String, Function>}
* For a complete list of events see: https://github.com/graphile/worker#workerevents
*/
jobEventHandlers: {
/**
* @param {{
* worker: import('graphile-worker').Worker,
* job: import('graphile-worker').Job
* }}
* @this {import('moleculer').Service}
*/
"job:success": function ({ worker, job }) {
this.logger.info(
`Worker ${worker.workerId} completed job ${job.id}`
);
},
},
},
// Add Workers here
queues: {},
});
Create Task
const PsqlQueueService = require("moleculer-psql-queue");
broker.createService({
name: "pub",
mixins: [
PsqlQueueService(
"postgres://postgres:postgres@localhost:5444/task_queue"
),
],
/**
* Service started lifecycle event handler
* @this {import('moleculer').Service}
*/
async started() {
try {
/**
* @param {String} name Task name
* @param {Object} payload Payload to pass to the task
* @param {import('graphile-worker').TaskSpec?} opts
*/
await this.createJob("sample.task", {
id: 1,
name: "simple.task",
});
} catch (error) {
this.logger.error('Error creating "sample.task" job', error);
}
},
});
Advanced Usage
The graphile-worker lib provides some advanced features like administration functions. These functions can be used to manage the queue and can be accessed via the this.$producer
property of the service.
const PsqlQueueService = require("moleculer-psql-queue");
broker.createService({
name: "pub",
mixins: [
PsqlQueueService(
"postgres://postgres:postgres@localhost:5444/task_queue"
),
],
/**
* Service started lifecycle event handler
* @this {import('moleculer').Service}
*/
async started() {
// Add the job via raw graphile-worker client
// For more info check the docs: https://github.com/graphile/worker#administration-functions
this.$producer.addJob("sample.task", {
id: 1,
name: "simple.task",
});
},
});
Test
$ npm test
In development with watching
$ npm run ci
License
The project is available under the MIT license.
Contact
Copyright (c) 2016-2022 MoleculerJS