exframe-etl-queue
v2.3.3
Published
exframe-etl-queue description
Downloads
260
Readme
ETL Queue
Initialization
const { EtlQueue, EtlSteps } = require('exframe-etl-queue');
const { db } = require('./lib/db');
const queue = new EtlQueue(logger, EtlSteps.Transform, db);
Enqueue
The enqueuing of an etl message will only need to occur during the extraction process. Messages will automatically advance to the next ETL step queue when it is acknowledged after processing.
const {fullDocument, updateDescription, _id: resumeToken, operationType, clusterTime, ns: { coll: collection }} = result;
await queue.enqueue(context, collection, fullDocument.quoteNumber, resumeToken, {
operationType,
operationTimestamp: clusterTime,
fullDocument,
updateDescription
});
Listen
The listen method accepts a callback to preform work on the message. The messsage should be update with any new data for the downstream processes. The message will automatically be acknowledged and move to the next process status. If an error occurs the message will automatically be placed in an error state and further processing will not occur.
await queue.listen(context, async (context, message) => {
const { documentType, documentId, date, sourceOperation: { operationType }} = message;
message.targets = [
{ target: nanoid()}
];
})