fetchq
v4.2.0
Published
FetchqDB Client
Downloads
3,890
Readme
Fetchq Node Client
Provides a NodeJS interface to manage a Fetchq database instace.
NOTE: Fetchq does not solve all the problems in the world.
Before using it, read the following paragraph that explains Fetchq usecase, and take a look at other great alternatives like RabbitMQ or Hangfire.
Table of Contents
- What is Fetchq
- When to use Fetchq?
- When NOT to use Fetchq
- Live Demos
- DB Configuration
- Queues Configuration
- Add Documents to a Queue
- Workers Configuration
- The Handler Function
- Returning Actions
- Context Decoration
- Client Configuration
- Maintenance Configuration
- Logger Configuration
- Initialization & Startup
- A word on
init()
- Error Handling
- Workflow API
What is Fetchq?
You can think of Fetchq as a big calendar for running tasks.
With Fetchq, you can push a document into a queue and associate it with a point in time, just a date. Could be now, could be a year from now, could be 100 years ago
Fetchq will then try to execute documents that are due for execution starting from the older one. Well, it's not really Fetchq that executes it, it's one function that you provide. We call that function a worker
or a handler
.
👉 Fetchq guarantees that a document will NOT be executed before it's due date expires.
When your handler executes a document, it can take decision based on the document's data and even on previous execution state.
After your handler does what needs to be done, it can reschedule
, drop
, complete
or kill
the document. More about this later in the docs.
NOTE: Here you'll learn to use it with NodeJS, but it really is just a smart way to use Postgres and could be paired with any programming language that can connecto to it.
When to use Fetchq?
You should consider using Fetchq when the answert to either of the following question is true:
- Is FIFO not the best option?
- Do you need to reschedule documents execution?
- Do you need uniqueness of documents in a queue?
- Do you want to keep your costs low? (*)
- Do you need/like to backup/restore your queues?
(*) PostgreSQL performs unbelievably well even with very little resources.
When NOT to use Fetchq?
In case you have a massive amount of work that needs to be taken care by a massive amount of independent workers. In such a case (classic with digital producers such in a IoT project), RabbitMQ or similar alternatives are a much more suitable option.
PostgreSQL can handle a lot of data, easily billions of items in a single queue, and still operate fast enough. Nevertheless, if you go BIG and don't need repetition, uniqueness and planning of tasks, I'd choose a different tool.
Live Demos
Fetchq works on NodeJS and the only external dependency is a connection to a PostgreSQL instance. Such a lightweight requirements make it possible to run fully working free in-browser demoes leveraging on CodeSandbox and ElephantSQL.
Client Demonstration:
Source Code Examples
Fetchq comes shipped with many source code examples that you can find here.
Fetchq as ForrestJS App:
DB Configuration
The Fetchq client library gives you a function that returns a configured client that implements Fetchq API:
const fetchq = require('fetchq');
const client = fetchq({ config });
The only requirement is a running Postgres instance. If Fetchq extension does not exits, the client will initialize the database for you.
Any new table will be created under the fetchq_catalog
schema and all the
PSQL functions are created in the default schema (public) prefixed as fetchq_xxx
to avoid collisions.
Using ENV Variables
By default Fetchq Client tries to use standard Postgres environment variables to setup the connection, so that you don't have to bother with it programmatically:
- PGUSER
- PGPASSWORD
- PGHOST
- PGPORT
- PGDATABASE
From v2.4.0
you can simply define a PGSTRING
env variable that contains a complete
connection uri as documented here.
Configure the connection programmatically:
You can set the connection's configuration programmatically:
const client = fetchq({
connectionParams: {
user: 'dbuser',
host: 'database.server.com',
database: 'mydb',
password: 'secretpassword',
port: 3211,
},
});
Or you can pass a connectionString
:
const client = fetchq({
connectionString: 'postgres://postgres:postgres@localhost:5432/postgres',
});
Fetchq will attempt to connect to your Postgres instance multiple times and you can control this behavior with the
connectionRetry
configuration:
const client = fetchq({
connectionRetry: {
retries: 30,
factor: 1,
minTimeout: 1 * 1000,
maxTimeout: 30 * 1000,
},
});
👉 More info about the params here.
Connection Pooling
You can read about pooling in the PG documentation, if you decide to diverge from the default settings, just pass a
pool
option:
const client = fetchq({
connectionString: 'postgres://postgres:postgres@localhost:5432/postgres',
pool: { max: 1, ... },
});
If you use a free tier database (ex from https://elephantsql.com) your
connections settings may be limited, so I suggest you set
pool { max: 1 }
in such early development phases.
NOTE: Fetchq client will setup at least 2 connections, one of them is dedicated to the event system, the other os for normal querying.
👉 More info about pooling here.
Raw Queries
Fetchq uses the famous library pg
to connect to the Postgres instance,
once your client is up and running you can issue raw queries as:
await client.pool.query('SELECT NOW()');
👉 More info about raw queries here.
Queues Configuration
A Fetchq queue is represented at database level as a set of tables and entries in some other system tables.
A queue collects:
- documents
- logs
- stats
- settings
You can create as many queues you may need (as long it is sustainable by your db, anyway, it could be in the thousands) representing them as a simple list of configuration objects.
Here is an example that uses all the current default values.
const client = fetchq({
queues: [
{
// name of the queue, used later on to interact with it
// (must be a valid Postgres table name)
name: 'q1',
// when false, any active worker will pause
isActive: true,
// speeds up FIFO performances immensely but it uses a bit more CPU.
enableNotifications: true,
// fail tolerance of the queue, before considering a document dead
maxAttempts: 5,
// max log duration in a per-queue logs table
errorsRetention: '24h',
// settings of the per-queue maintenance jobs
maintenance: {
// document status maintenance
mnt: { delay: '100ms', duration: '1m', limit: 500 },
// queue stats screenshots for plotting perfomances through time
sts: { delay: '5m', duration: '5m' },
// computed stats job
cmp: { delay: '10m', duration: '5m' }, // ???
// errors and metrics cleanup job
drp: { delay: '10m', duration: '5m' },
},
},
],
});
NOTE: the default values are suitable for most use cases so to obtain a responsive queue that is taking metrics snapshot every 5 minutes. If you have a massive amount of data to process, we suggest you increase
mnt.delay
and monitor your PostgreSQL performances to find the best value for it.
enableNotifications
When this option is set to true
Fetchq activates triggers and notifications for the
queue, and the client subscribes to those notifications to wake up after idle time.
👉 enable when: This is a perfect setting for queues that may stay idle for long periods of time, or for queues that must respond quickly to user's actions.
👉 disable when:
Queues that need to handle repetitive but not near-real-time critical tasks may
decide not to use this feature and just rely on simple polling. This has proven to
be more effective expecially when dealing with massive data into a queue.
In this case, you should also increase the value of mnt.delay
for this queue.
Maintenance Settings
Each queue's health relies on a list of maintenance tasks that must be executed in time by each Fetchq Client's maintenance service.
You can fine tune how often those jobs should be executed and therefore fine tune the reactiveness of each queue and the load on the system.
The mtn
jobs updates the document's status, the faster it goes the more reactive the
queue when it comes to execute a scheduled document that became pending. It also increases CPU and I/O so you must find a good balance based on your needs.
The sts
job takes screenshots of the queue metrics and stores it into a timeserie
table that you may want to use for plotting chards and visualize the queue's status.
Run this as often as you need, just be careful because it may produce a lot of data.
The cmp
job works on the queue timeserie stats table and creates computed metrics
such how many documents per minute or so. This job may be heavy.
The drp
job tries to drop data that is not necessary anymore. It removes old error
logs and metrics. This is not a critical job, but it is definetly good to run it every
few minutes to keep your database lighter.
Add Documents to a Queue
Once you have defined a working queue, you probably want to add data into it for later processing.
There are 2 possible ways add documents into a queue:
append()
push()
Append a Document:
Use the append
API if you want your document to be processed as
soon as possible, but after the current workload.
// Signature:
fetchq.doc.append(targetQueue, documentPayload [, options])
Example:
const result = await client.doc.append('q1', {
createdAt: Date.now(),
});
// RESULT:
// {
// subject: 'xxxx-yyy-ddd'
// }
//
// "subject" is a UUIDv1
👉 For a better list of examples please take a look at the integration test
Push a Document:
Use the push
API if you want to be in control of:
- the subject of the document, which is unique for any given queue
- the point in time when the document should be processed
Signature:
fetchq.doc.append(targetQueue, document [, options])
Example:
const res = await client.doc.push('q1', {
subject: 'd1',
payload: { createdAt: Date.now() },
nextIteration: '+1 year',
});
// RESULT:
// {
// queued_docs: 1
// }
👉 For a better list of examples please take a look at the integration test
Push Multiple Documents:
[[TO BE COMPLETED]]
Upsert a Document:
[[TO BE COMPLETED]]
Workers Configuration
const client = fetchq({
workers: [
{
queue: 'q2',
name: 'my-first-worker',
// how many concurrent service instances to run.
// this is not parallel execution, just concurrent. It will speed up a lot
// when workers deal with I/O operations such disk or network.
// to achieve real parallelization, use Docker or add worker servers to your
// cluster.
concurrency: 1,
// how many documents to pick in a single query
// the more the documents, the less the workload on the database, but also
// the higher the chance of producing orphans that will eventually reject
batch: 1,
// esitmated max duration of a batch operation.
// if the worker doesn't complete within this timeframe, the document
// will considered rejected and cumulates errors
lock: '5m',
// idle time between documents execution
delay: 250,
// idle time after completing a queue
sleep: 5000,
// the function that handles a document
// see next chapter
handler: () => {},
},
],
});
The Handler Function
The worker's handler is an asynchronous function that is triggered by the Fetchq client any time a document is ready for execution.
You can focus on "what to do on a single document" and let Fetchq deal with the complexity of applying it to millions of them, within a single machine, or spread across a cluster of workers.
It receives a document
which is a
Javascript object and should return an action
(inspired by Redux) which
is another object that describes how Fetchq should handle the document
itself, after our custom logic is completed.
const handler = async (doc, ctx) => {
// use the builtin logger
client.logger.info(`handling ${doc.queue}::${doc.subject}`);
// forward the document into another queue, with the
// possibility of simple payload decoration:
await doc.forward('another-queue', {
decorate: 'the payload',
});
// Run a custom SQL query
await ctx.client.pool.query(`SELECT NOW()`);
// Use an `action creator` to describe how you expect Fetchq
// to handle the document after the custom logic completes.
return doc.reschedule('+1 week');
};
👉 It's important to understand that Fetchq's handler execution is statefull:
during the execution of a handler you can take decision based on
previous executions, leveraging on the internal properties, or
manipulating the document's payload.
The DOC parameter
The document
parameter (first in order) contains relevant information for the logical
execution of the worker function:
| name | type | description |
| -------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| queue | string | Document's queue name |
| subject | string | Document's unique identifier in the queue |
| payload | object | Document's custom data object (stored as jsonb) |
| version | number | Document's version number describes the payload's signature |
| iterations | number | Successfully processed counter |
| attempts | number | Failed processed counter gets reset after a successful execution |
| created_at | date | Document's first appeareance in the queue |
| last_iteration | date | Document's last attempted processing date could be null |
| next_iteration | date | Document's next planned processing date Used in case of unhandled exception, could be modified by the doc.reschedule()
method. |
The CTX parameter
The context
parameter gives you access to APIs that are not strictly
related to the document under execution.
| name | type | description | | -------- | ------ | ---------------------------------------------------------------------------------------------------------------- | | client | ref | Memory reference to the Fetchq's client instanceit give you full access to the client's API | | worker | ref | Memory reference to the Worker's instance | | workflow | object | Worflow APIexperimental feature |
NOTE: You can freely extend the context that is given to any handlers by using the
client.decorateContext()
API or thedecorateContext
setting. Read more underhandler context decoration
paragraph.
Returning Actions
The handler function should return an object that defines which action should be
performed on the document. In order to facilitate this activity and avoid actions
names misspell, you can use action creators from the document
object:
const handler = (doc, ctx) => {
return doc.reschedule('+1 week');
return doc.reject('error message...');
return doc.complete();
return doc.kill();
return doc.drop();
};
reschedule(nextIteration, [options])
The document will be scheduled for further execution.
You should provide a
nextIteration
option that could be a Javascript Date object or a valid
Postgres interval string such: + 1 minute
, -20y
, ...
return doc.reschedule('+1 week', {
// decorate the document's payload
payload: { ...doc.payload, newField: 'hoho' },
// write a custom persistent log while rescheduling
refId: 'custom reference',
message: 'yes, do it again',
details: { count: 22 },
});
drop([options])
The document will be deleted from the queue's table.
return doc.drop({
// write a custom persistent log before droppint the document
refId: 'custom reference',
message: 'dropped a document',
details: doc,
});
NOTE: That means that the same
subject
can be re-queued by adoc.push()
action.
complete([options])
The document will be marked with a status = 3
and will never be executed again.
return doc.complete({
// decorate the document's payload
payload: { ...doc.payload, newField: 'hoho' },
// write a custom persistent log before marking the document as complete
refId: 'custom reference',
message: 'there was no more stuff to do',
details: { count: 22 },
});
NOTE: Because the document itself is retained by the queue's table, any attempt to
doc.push()
it back into the queue will silently fail, returning aqueued_docs = 0
.
kill([options])
The document will be marked with a status = -1
and will never be executed again.
return doc.kill({
// decorate the document's payload
payload: { ...doc.payload, newField: 'hoho' },
// write a custom persistent log before marking the document as complete
refId: 'custom reference',
message: 'there was NOTHING ELSE to do',
details: { count: 22 },
});
NOTE: Because the document itself is retained by the queue's table, any attempt to
doc.push()
it back into the queue will silently fail, returning aqueued_docs = 0
.
reject(errorMessage, [options])
The document will be scheduled for another execution attempt according to the queue's
settings and lock duration. The attempts
counter will increase by
one unit, and if it exceeds the maxAttempts
threshold as set for the queue,
it will be automatically killed (mark as status = -1
).
You normally use this API within a catch (err) {}
statement, when
you have a clear idea of what went wrong, and want to customize the error log.
return doc.reject('I know exactly what went wrong', {
// Add details to the log error message
refId: 'custom reference',
details: { count: 22 },
});
NOTE: Any unhandled exception that may happen within the handler's function is considered an implicit rejection and an error log is automatically appended to the queue's logs.
Context Decoration
More often than not your workers' handlers need to deal with external API or other parts of your application.
Fetchq encourages you to think your handlers as pure functions to simplify Unit Testing and avoid the most common side effects based bugs.
A common solution is to provide a custom set of capabilities to your handlers while configuring the Fetchq instance:
fetchq({
decorateContext: {
faa: 1,
},
workers: [
{
queue: 'q1',
decorateContext: {
foo: 2,
},
handler: (doc, { faa, foo }) => {
console.log(faa, foo);
return doc.drop();
},
},
],
});
You can apply this settings at client level, injecting custom stuff into every worker, or worker-by-worker by providind the setting within the worker's configuration.
Client Configuration
skipUpsertFetchq
When true
, the client will not attempt to initialize or upgrade the Fetchq library that is installed on the Postgres db.
[default: false]
You can set this from the environment:
FETCHQ_SKIP_UPSERT_FETCHQ=true
skipUpsertQueues
When true
, the client will not attempt to upsert the configured queues, nor to update their configuration.
[default: false]
You can set this from the environment:
FETCHQ_SKIP_UPSERT_QUEUES=true
skipMaintenance
When true
, the client will not participate in the maintenance of the queues.
[default: false]
You can set this from the environment:
FETCHQ_SKIP_MAINTENANCE=true
skipEmitter
When true
, the client will not create an emitter server, hence it will establish one less connection to the Postgres instance.
[default: false]
You can set this from the environment:
FETCHQ_SKIP_EMITTER=true
Maintenance Configuration
const client = fetchq({
maintenance: {
limit: 3,
delay: 250,
sleep: 5000,
},
});
Logger Configuration
Fetchq comes with a simple logger utility that spits out different levels of informations to the console
. It is a very minimal implementation of the famous Winston library.
Setting the logLevel
You can provide the value through the environment variable process.env.LOG_LEVEL
or programmatically via configuration:
fetchq({
logLevel: 'error',
});
Fetchq falls back on the level
error
if nothing else is specified.
Providing a custom logger library
Although the built in logger is ok for development, it's likely that you want to bring your real logging library for production. You can do that while setting up the client:
const winston = require('winston');
fetchq({
logger: {
instance: winston.createLogger({
level: 'info',
// other winston configuration
}),
},
});
Initialization & Startup
The easiest way to run Fetchq is with the autoStart
setting:
fetchq({
...config,
autoStart: true,
onReady: (client) => client.logger.info('Fetchq is ready'),
});
In case you want to delay the execution of it, you can use the
boot()
function:
// Create the client instance
const client = fetchq({ ...config });
// Start the client instance
client.boot().then((client) => client.logger.info('Fetchq is ready'));
A manual boot sequence would be obtained with:
const client = fetchq({ ...config });
await client.connect();
await client.init();
await client.start();
client.logger.info('Fetchq is ready');
client.init
will apply all the provided configuration to the Fetchq db:
- instanciate your PostgreSQL instance with Fetchq
- create missing queues
- apply queue related settings
- apply queue related jobs settings
- recalculate indexes if needed
- apply maintenance settings
client.start
will spin up the active services like:
- queue workers instances
- queue maintenance workers
A word on init()
The init()
method is useful to distribute Fetchq configuration programmatically
and apply it to the database without messing with SQL and Postgres clients.
The entire initialization happens inside a BEGIN - COMMIT
block so to minimize
the risk for racing conditions. Nevertheless, we suggest to minimize the amount of
clients that run this function.
In a common situation, there shold be just one single process that is
responsible for running the init()
API, you can actually think of it
as some kind of migration as both the basic Fetchq schema and queue definitions are
upserted at this point in time.
In case of racing conditions, the system will detect the issue and re-attempt the initialization (using promise-retry).
You can change the retry configuration editing the setting initializationRetry
:
const client = fetchq({
initializationRetry: {
retries: 30,
factor: 1,
minTimeout: 1 * 1000,
maxTimeout: 30 * 1000,
},
});
👉 More info about the params here.
Error Handling
Since v3.2.0, Fetchq client offers hooks to intercept and handle errors as they happen in the system.
Refer to
examples/on-error
.
[[TO BE IMPROVED]]
Workflow API
You can use a workflow to distribute work into one or more workers and await for the entire process to finish.
A signup process may involve several steps, performed in a specific order, and each step may fail due to many different reasons.
Normally you write all those steps into an asynchronous route handler that will consume quite a few resources from your user facing server... That may result into an unresponsive or slow website.
With Fetchq Workflow you can free your main process of any computational burden and ejnoy the isolation and horizontal scalability of a queue system!
Refer to:
const workflow = client.createWorkflow({
queue: 'signup',
timeout: 1000, // defaults to 20s
payload: {
username: 'marcopeg',
},
});
workflow
.run()
.then((res) => console.log('Workflow completed:', res))
.catch((err) => console.error('Workflow exited:', err));
Basically a workflow is a big promise that wraps the execution of one or more workers across your queue processing cluster.
The signup worker may look something like:
const signupWorker = {
queue: 'signup',
handler: (doc, { workflow }) => {
const { username } = doc.payload;
// Break the workflow in case of errors:
if (username.length < 5) {
return workflow.reject('Username too short');
}
// Pipe the document into another queue to
// continue the workflow:
return workflow.forward('signup_save_user', {
payload: { validated: true },
});
},
};
You can also nest workflows one into another in order to parallelize the execution of tasks:
const signupWorker = {
queue: 'signup_save_user',
handler: async (doc, { workflow }) => {
const { username } = doc.payload;
// Persist the user into the database
let userRecord = null;
try {
userRecord = await db.saveUser(username);
} catch (err) {
return workflow.reject(err);
}
// Run a few parallel workflows
const w1 = workflow
.create({
queue: 'signup_send_welcome_email',
payload: userRecord,
})
.run();
const w2 = workflow
.create({
queue: 'signup_process_user_icon',
payload: userRecord,
})
.run();
const w3 = workflow
.create({
queue: 'signup_fetch_user_profile',
payload: userRecord,
})
.run();
// The sub-workflows run in parallel and the work is actually distributed
// horizontally across your worker's cluster.
//
// Nevertheless, you can simply await all that work to complete
// before completing and releasing the main signup workflow.
try {
await Promise.all([w1, w2, w3]);
} catch (err) {
return workflow.reject(err);
}
// Finally complete the workflow
return workflow.resolve(userRecord);
},
};