pg-transactional-outbox
v0.5.8
Published
A PostgreSQL based transactional outbox and inbox pattern implementation to support exactly once message processing (with at least once message delivery).
Downloads
3,793
Maintainers
Readme
pg-transactional-outbox
The pg-transactional-outbox
library implements the transactional outbox and
transactional inbox pattern based on PostgreSQL. It ensures that outgoing
messages are only sent when your code successfully commits your transaction. On
the receiving side, it ensures that a message is stored exactly once and that
your handler marks the message only as done when your code finishes
successfully.
Message delivery via the PostgreSQL-based transactional outbox pattern (generated with Midjourney)
Install the library npmjs | pg-transactional-outbox e.g. via
npm i pg-transactional-outbox
or
yarn add pg-transactional-outbox
Table of contents:
Context
A service often needs to update data in its database and send events (when its data was changed) and commands (to request some action in another service). All the database operations can be done in a single transaction. But sending the message must (most often) be done as a separate action. This can lead to data inconsistencies if one or the other fails.
Storing a product order in the Order Service but failing to send an event to the shipping service can lead to products not being shipped - case 1 in the diagram below. This would be the case when the database transaction succeeded but sending the event failed. Sending the event first and only then completing the database transaction does not fix it either. The products might then be shipped but the system would not know the order for which the shipment was done (case 2).
Similar issues exist in the Shipment Service. The service might receive a message and start the shipment but fail to acknowledge the message. The message would then be re-sent and if the operation is not done in an idempotent way there might be two shipments for a single order (case 3). Or the message is acknowledged as handled but submitting the database transaction fails. Then the message is lost and the customer will not receive the goods (case 4).
Potential messaging pitfalls
One way to solve this would be to use distributed transactions using a two-phase commit (2PC). PostgreSQL supports the 2PC but many message brokers and event streaming solutions like RabbitMQ and Kafka do not support it. And even if they do, it would often not be good to use 2PC. Distributed transactions need to lock data until all participating services commit their second phase. This works often well for a small number of services. But with a lot of (micro) services communicating with each other this often leads to severe bottlenecks.
The Patterns
When delivering messages we mostly have three different delivery guarantees:
- At most once - which is the most basic variant. It tries to send the message but if it fails the message is lost.
- At least once - this is the case when we guarantee that the message will be delivered. But we make no guarantee on how often we send the message. At least once - but potentially multiple times.
- Exactly once - this is on the delivery itself most often considered to not be possible. Multiple mechanisms need to play together to ensure that a message is sent (at least) once but guaranteed to be processed (exactly) once.
The transactional outbox and transactional inbox pattern provide such a combination that guarantees exactly-once processing (with at least once delivery).
You might want to use the transactional inbox pattern in scenarios where the reliability and atomicity of message processing are important, such as in an event-driven architecture where the loss or duplication of a message could have significant consequences.
This diagram shows the involved components to implement both the transactional outbox and the transactional inbox pattern. The following chapters explain the components and the interactions in detail.
Components involved in the transactional outbox and inbox pattern implementation
What is the transactional outbox pattern
The transactional outbox pattern is a design pattern that allows you to reliably "send" messages within the context of a database transaction. It is used to ensure that messages are only sent if the transaction is committed and that they are not lost in case of failure.
The transactional outbox pattern uses the following approach to guarantee the exactly-once processing (in combination with the transactional inbox pattern):
- Some business logic needs to update data in its database and send a message (e.g. with details of the changes).
- It uses a database transaction to add all of the business data changes on the
one hand and it inserts the details of the message that should be sent into
an "outbox" table within the same database transaction. The message includes
the necessary information to send the message to the intended recipient(s),
such as the message payload and the destination topic or queue. This ensures
that either everything is persisted in the database or nothing. The
outbox storage
component in the above diagram encapsulates the logic to store messages in the outbox table. - A background process notices when a new entry appears in the "outbox" table and loads the message data.
- Now the message can be sent. This can be done via a message broker, an event
stream, or any other option. The outbox table entry is then marked as
processed only if the message was successfully sent. In case of a
message-sending error, or if the outbox entry cannot be marked as processed,
the message is sent again. The
outbox listener
is responsible for this step.
The third step can be implemented via the Polling-Publisher pattern or the Transactional Log Tailing pattern.
- Polling-Publisher: a polling listener queries the outbox table on a (short) interval. When unprocessed messages are found they can be sent.
- Transactional Log Tailing (also called Capture-Based Log Tailing) reads from the transactional log or change stream that contains the changes to the outbox table. For PostgreSQL, this is handled with the Write-Ahead Log (WAL) and logical replication which is depicted in the diagram and described further down in more detail. Using this approach, the transactional outbox pattern can be implemented with minimal impact on the database, as the WAL tailing process does not need to hold locks or block other transactions.
You can read more about the transactional outbox pattern in this microservices.io article.
What is the transactional inbox pattern
The transactional inbox pattern targets the message consumer side of the process. It is a design pattern that allows you to reliably receive messages and process each message exactly once.
The transactional inbox pattern uses the following approach to guarantee the exactly-once processing (in combination with the transactional outbox pattern):
- A message is sent to a message broker (such as RabbitMQ) and stored in a queue, event stream, or any other location.
- A background process consumes messages from the queue and inserts them into
an "inbox" table in the database. The process uses the unique message
identifier to store the message in the inbox as the primary key. This ensures
that each message is only written once to this database table even if the
same message was delivered multiple times (deduplication). The
inbox storage
component from the above diagram handles this step. - A consumer process receives the messages that were stored in the inbox table
and processes them. It uses a transaction to store all the service-relevant
data and mark the inbox message in the inbox table as processed. If an error
happens during message processing the message can be retried for a
configurable amount of attempts. This is done by the
inbox listener
in the above diagram.
Step three can be implemented again as a Polling-Publisher or via the Transactional Log Tailing approach like for the outbox pattern.
Using PostgreSQL Logical Replication
PostgreSQL logical replication is a method that offers the possibility of replicating data from one PostgreSQL server to another PostgreSQL server or other consumer. It works by streaming the changes that are made to the data in a database in a logical, row-based format, rather than replicating at the physical storage level.
This is achieved by using a feature called "Logical Replication Slot", which allows the primary PostgreSQL server (publisher) to stream changes made to a specific table or set of tables to a replication client. The client can then apply these changes to its own database (effectively replicating the data). Or more generally the client can use those changes for any kind of updates/notifications.
The replication process begins with the creation of a replication slot on the primary database server. A replication slot is a feature on the PostgreSQL server that persists information about the state of replication streams. Replication slots serve to retain WAL (Write-Ahead Logging) files on the publisher, ensuring that the required logs for replication are not removed before the subscribing server received them. It keeps track of the last WAL location that was successfully sent to the subscriber, so that upon reconnection after a disconnect, replication can resume from that position without missing any data. In the transactional outbox/inbox pattern case, the outbox and inbox tables are replicated.
A publisher prepares and sends the stream of data changes from specified tables to the subscribers. For the transactional outbox and inbox scenario the outbox and inbox tables are configured for publication. The publisher creates a set of changes that need to be replicated based on inserts, updates, and deletes on the published tables. These changes are sent in the form of "WAL records" (Write-Ahead Log records). Publications are used in conjunction with subscriptions to set up logical replication from the publisher to the subscriber.
When the subscriber (client) connects to the publisher, it specifies the name of the logical replication slot it wants to use. The publisher uses this information to start streaming changes from the point in the WAL that is stored in the replication slot. As the subscriber receives changes, it updates the position of the replication slot on the publisher to keep track of where it is in the stream. The position defines the last data record that the client successfully consumed and acknowledged. It is not possible to acknowledge only specific WAL records - everything up to this position is acknowledged.
In this way, the publisher and subscriber can maintain a consistent position in the replication stream, allowing the subscriber to catch up with any changes that may have occurred while it was disconnected.
Components involved in the transactional outbox and inbox logical replication implementation
Using database polling
The second approach handles messages when they are added to the transactional inbox or outbox is to use database polling. Your node.js application will query those tables at regular intervals to see if new messages arrived. When unprocessed outbox messages are found they can be sent via e.g. a message broker. For inbox messages, a message handler will be executed to handle the message.
This setup is purely based on the database and does not use logical replication:
Components involved in the transactional outbox and inbox logical replication implementation
Logical replication vs. polling
The logical replication and the polling listeners have different advantages and disadvantages. Please check the following table to get an overview and also the next section about considerations to operate such a solution.
Both listeners can be used interchangeably with minor differences on the concurrency strategies. Switching from one listener type to another during development time requires not much code changes. Switching in production is easy from logical replication to polling. The way around you have to make sure that the replication slot is generated upfront. Then let the listener process the latest messages and restart the service. Some messages will be attempted twice but this is handled by the checks on the processed/abandoned at database fields.
| Area | Logical Replication | Polling | | ------------------ | -------------------------------------------------------------------- | ------------------------------------------------------------------------------------ | | Scaling | Only a single instance can connect to the replication slot. | Multiple instances can poll from the same table. | | Lag | Receives new messages immediately. | Must poll on a (short) timeout for new messages. Can be improved with LISTEN/NOTIFY. | | DB load | Listening to new messages does not use the database itself. | Polling the database for new messages puts load on the database. | | Setup | Logical replication must be enabled. A replication role is required. | No special settings are needed. | | Message priority | Will receive messages sequentially. Picking newer messages is hard. | Can implement any selection algorithm based on segments or other fields. | | Sequential order | Guaranteed sequential ordering same as transactions were committed. | Sequential processing only on created_at date which may not be unique. | | Quick Retries | Receives messages again immediately when some error ocurred. | The locked_until colum might not be reset on error. Next retry needs to wait. | | Scheduled Retries | Processes messages sequentially. May delete and re-add message. | Can use the locked_until field to schedule messages for a later point in time. | | DB degrading | When a replication slot is not active disk space may run out. | When messages are not deleted the polling will slow down over time. | | Migration/Failover | To guarantee same LSNs only some migration types are available. | Can use any available migration or failover option. |
Operational notes
General considerations for both approaches:
- Like any other service functionality it is important to monitor the functionality of the listeners. This could be done by checking the amount of unprocessed messages in the inbox/outbox. Or by sending and receiving a test message and time the duration.
Logical replication approach
The following points are important when operating the logical replication approach for using the transactional outbox and inbox:
- When the logical replication approach is used there must be a consumer that reads from the WAL and acknowledges the LSN numbers. If the consumer is down for longer periods the database server must retain the WAL files which can fill up the disk and bring down the full database server. Good monitoring is mandatory. You can check the maximum wal size, the retention period, and the wal file size configuration values. PostgreSQL will do a roll-over of old WAL files if you configure a maximum size. This would mean old message notifications are lost. Removing and adding all messages again in the order of the created_at field could be used as a recovery strategy.
- The name of a replication slot must be unique at the PostgreSQL server level.
- Migrating a database when using the logical replication approach is tricky. It
tracks the processing progress based on the LSN numbers of the database. When
the database is migrated or a failover happens the replication slot must be
manually recreated on the new instance. Possible solutions:
- Patroni configured slots
- pg_failover_slots
- Manually: Create the replication slot on the new database server. Stop your application so no new inbox/outbox messages arrive. Switch the service to the new database. Messages that arrived during that time will be pushed to the listener. But as the messages were marked as processed, the message processing will stop.
- The replication role has built-in permissions to list and delete all replication slots on the database server. If the database server is shared it is not possible to secure against replication slot deletions.
- With the logical replication approach, it is not that important to delete outbox and inbox messages quickly as the tables are not queried much besides based on their unique ID.
Polling approach
The polling approach has not that many special considerations. It will put higher load on the database due to the polling approach but is generally easier to maintain.
- On the polling listener, it is important to delete messages from the outbox and inbox tables to increase polling performance. A tradeoff must be taken between the performance by deleting messages and the message deduplication possibilities by keeping the messages longer.
The pg-transactional-outbox library overview
This library implements the transactional outbox and inbox pattern using a PostgreSQL server. It implements both the Transactional Log Tailing approach via the PostgreSQL Write-Ahead Log (WAL) and the polling listener.
You can use the library in your node.js projects that use a PostgreSQL database as the data store and some message sending/streaming software (e.g. RabbitMQ or Kafka).
Database server
This library was tested with PostgreSQL servers starting from version 14. But it should most likely work with versions starting from version 12.
Polling Listener Setup
No special extensions or settings are required for the polling listener setup.
Logical Replication Setup
For the logical replication setup, the PostgreSQL database server itself must
have the wal_level
configured as logical
. This enables the use of the WAL to
be notified on e.g. new inserts. You should also check the max_wal_senders
,
the max_wal_size
, and the min_wal_size
settings to contain values that match
your architecture. Setting a large size could consume/max out the disk space.
Setting the value too low could lead to lost events in case your WAL consumer is
down for a longer duration. An example postgres.conf
file is shown in the
./infra
folder. This folder includes also a docker-compose.yml
file to set
up a PostgreSQL server with these default values (and a RabbitMQ instance for
running the examples).
This library uses the standard PostgreSQL logical decoding plugin pgoutput
.
This plugin is directly part of PostgreSQL since version 9.4. Other alternatives
would be wal2json
or decoding-json
but those are not used in this library.
Database setup
To support the transactional outbox and inbox implementation you need to create an outbox and an inbox table in your PostgreSQL database. You should create two database roles: one for the message handler and one for the message listener. You must grant those roles permission to the tables.
The inbox and the outbox tables and the corresponding structure are identical. They can reside in the same database if your service uses both the outbox and the inbox pattern which is often the case for distributed services.
You can manually create the required database structure or (suggested) use this library to help you with this task.
The easiest way is to use the CLI tool to generate the SQL scripts and the .ENV
settings file for you. You can find the CLI tool in the ./examples/setup/
folder. It will guide you by asking the required values from you.
yarn dev:watch
You can find the example SQL scripts in the following two files which you can also use and adjust.
examples/setup/out/example-trx-polling.sql
(for polling listener)examples/setup/out/example-trx-replication.sql
(for replication listener)
If you do not want to run the generated SQL scripts e.g. as part of your
application migration you can also set up the database from a node.js
application. The library offers you a DatabaseSetup
helper to create the
required tables etc. in your database. You can do this from within your codebase
based on the configuration settings that you provide. Both the inbox and outbox
structure are created in the same way so you call the same functions but with
different configurations.
Logical Replication Setup
If you use the logical replication approach, the database listener role needs the replication permission. As this role has a lot of rights it is not advised to give the replication permission to the same role that reads and mutates the business-relevant data.
NOTE: the replication slot name is database server unique! This means if you use the transactional inbox pattern on multiple databases within the same PostgreSQL server instance you must use different replication slot names for each of them. When creating a new replication slot you must run that SQL script in its own database transaction.
Polling Listener Setup
For the polling approach, you could use a single database role but it is still
advised to use two separate database roles. The roles should not have the
REPLICATION
permission.
Implementing the transactional outbox producer
The following code shows the producer side of the transactional outbox pattern.
It shows the usage in the outbox scenario to either use the logical replication
approach with initializeReplicationMessageListener
or the polling approach
with initializePollingMessageListener
. They receive the messages when an
outbox message is written to the outbox table. And the
initializeMessageStorage
generator function to store outgoing messages in the
outbox table.
import process from 'node:process';
import { Pool } from 'pg';
import {
GeneralMessageHandler,
PollingListenerConfig,
ReplicationListenerConfig,
TransactionalMessage,
createReplicationMutexConcurrencyController,
getDefaultLogger,
initializeMessageStorage,
initializePollingMessageListener,
initializeReplicationMessageListener,
} from 'pg-transactional-outbox';
(async () => {
const logger = getDefaultLogger('outbox');
// Initialize the actual message publisher e.g. publish the message via RabbitMQ
const messagePublisher: GeneralMessageHandler = {
handle: async (message: TransactionalMessage): Promise<void> => {
// In the simplest case the message can be sent via inter process communication
process.send?.(message);
},
};
// You can also use ENV variables via getOutboxReplicationListenerSettings and
// getOutboxPollingListenerSettings to create the settings objects.
const dbListenerConfig = {
host: 'localhost',
port: 5432,
user: 'db_outbox_listener',
password: 'db_outbox_listener_password',
database: 'pg_transactional_outbox',
};
const baseSettings = {
dbSchema: 'public',
dbTable: 'outbox',
enableMaxAttemptsProtection: false,
enablePoisonousMessageProtection: false,
};
const replicationConfig: ReplicationListenerConfig = {
outboxOrInbox: 'outbox',
dbListenerConfig,
settings: {
...baseSettings,
dbPublication: 'pg_transactional_outbox_pub',
dbReplicationSlot: 'pg_transactional_outbox_slot',
},
};
const pollingConfig: PollingListenerConfig = {
outboxOrInbox: 'outbox',
dbListenerConfig,
settings: {
...baseSettings,
nextMessagesBatchSize: 5,
nextMessagesFunctionName: 'next_outbox_messages',
nextMessagesPollingIntervalInMs: 250,
},
};
// Initialize and start the listening for outbox messages. This listeners
// receives all the outbox table inserts from the WAL or via polling. It
// executes the messagePublisher handle function with every received outbox
// message. It cares for the at least once delivery.
let shutdownListener: () => Promise<void>;
if (process.env.LISTENER_TYPE === 'replication') {
const [shutdown] = initializeReplicationMessageListener(
replicationConfig,
messagePublisher,
logger,
{
concurrencyStrategy: createReplicationMutexConcurrencyController(),
messageProcessingTimeoutStrategy: (message: TransactionalMessage) =>
message.messageType === 'ABC' ? 10_000 : 2_000,
},
);
shutdownListener = shutdown;
} else {
const [shutdown] = initializePollingMessageListener(
pollingConfig,
messagePublisher,
logger,
{
messageProcessingTimeoutStrategy: (message: TransactionalMessage) =>
message.messageType === 'ABC' ? 10_000 : 2_000,
},
);
shutdownListener = shutdown;
}
// Initialize the message storage function to store outbox messages. It will
// be called to insert the outgoing message into the outbox table as part of
// the DB transaction that is responsible for this event.
const storeOutboxMessage = initializeMessageStorage(
replicationConfig,
logger,
);
// The actual business logic generates in this example a new movie in the DB
// and wants to reliably send a "movie_created" message.
const pool = new Pool({
host: 'localhost',
port: 5432,
user: 'db_outbox_handler',
password: 'db_outbox_handler_password',
database: 'pg_transactional_outbox',
});
const client = await pool.connect();
try {
// The movie and the outbox message must be inserted in the same transaction.
await client.query('START TRANSACTION ISOLATION LEVEL SERIALIZABLE');
// Insert the movie (and query/mutate potentially a lot more data)
const result = await client.query(
`INSERT INTO public.movies (title) VALUES ('some movie') RETURNING id, title;`,
);
// Define the outbox message
const message: TransactionalMessage = {
id: new Crypto().randomUUID(),
aggregateType: 'movie',
messageType: 'movie_created',
aggregateId: result.rows[0].id,
payload: result.rows[0],
createdAt: new Date().toISOString(),
};
// Store the message in the outbox table
await storeOutboxMessage(message, client);
// (Try to) commit the transaction to save the movie and the outbox message
await client.query('COMMIT');
client.release();
} catch (err) {
// In case of an error roll back the DB transaction - neither movie nor
// the outbox message will be stored in the DB now.
await client?.query('ROLLBACK');
client.release(true);
throw err;
}
await shutdownListener();
})();
Please note: This library offers to automatically delete outbox messages from the outbox table. Keeping them for a while can be good to ensure that a message is only generated once. But most often this is more of a concern on the inbox side. Please configure your duration on how long the messages should stay in this table. This can be seconds but also some minutes. You can define different age thresholds for processed messages, abandoned ones, and a general max age setting. To be safe you can add logic to not send messages anymore that are older than these defined durations.
As an alternative you could also create aoutbox_archive
table and write a script to move the messages into that table instead of deleting them.
Outbox Storage
The outbox storage is used to store the message that should later be sent into
the outbox
database table. The initializeMessageStorage
is initialized to
store outbox messages:
message
: the outbox message with a unique message ID and other relevant data. It contains the message payload that is used on the receiving side to act on such a message.client
: the database client with an open database transaction to handle both the business logic database changes as well as store the outgoing message within that transaction.
Outbox Listener
The initializeReplicationMessageListener
is used to create and start the
logical replication-based listener that gets notified when a new outbox message
is written to the outbox table. It takes the message publisher instance as an
input parameter and calls it to send out messages. Other parameters are the
config
object, a logger
instance and an optional strategies object to
fine-tune and customize specific aspects of the outbox listener.
The second option is to use the initializePollingMessageListener
to use the
database polling approach to query for unprocessed outbox messages. It uses the
same handler to send out the message and the same logger. It has a (partly)
different configuration object and can optionally define also different
strategies.
You can build the configuration object from your code. Alternatively, you can
use process.env
variables to provide the configuration values.
The easiest way to generate the .ENV files is to use the CLI tool which also
generates the SQL scripts file. You can find the CLI tool in the
./examples/setup/
folder.
yarn dev:watch
You can find the example ENV files here:
examples/setup/out/example-trx-polling.env
(for polling listener)examples/setup/out/example-trx-replication.sql
(for replication listener)
All ENV variables can use one of the following three prefixes:
TRX_OUTBOX_<variable>
- those variables are used to build the outbox-specific settings for the desired listener.TRX_INBOX_<variable>
- those variables are used to build the inbox-specific settings for the desired listener.TRX_<variable>
- those variable values are used for both the outbox and inbox settings when no outbox or inbox-specific value is provided.
| <PREFIX> + Variable Name | Type | Default | Description | | ---------------------------------------------- | ------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | | TRX_DB_SCHEMA | string | "public" | The database schema name where the table is located. | | TRX_MESSAGE_PROCESSING_TIMEOUT_IN_MS | number | 15000 | Stop the message handler after this time has passed. | | TRX_MAX_ATTEMPTS | number | 5 | The maximum number of attempts to handle a message. With max 5 attempts a message is handled once initially and up to four more times for retries. | | TRX_MAX_POISONOUS_ATTEMPTS | number | 3 | The maximum number of times a message should be attempted which was started but did not finish (neither error nor success). | | TRX_MESSAGE_CLEANUP_INTERVAL_IN_MS | number | 300000 | Time in milliseconds between the execution of the old message cleanups. Set it to zero to disable automatic message cleanup. | | TRX_MESSAGE_CLEANUP_PROCESSED_IN_SEC | number | 604800 | Delete messages that were successfully processed after X seconds. | | TRX_MESSAGE_CLEANUP_ABANDONED_IN_SEC | number | 1209600 | Delete messages that could not be processed after X seconds. | | TRX_MESSAGE_CLEANUP_ALL_IN_SEC | number | 5184000 | Delete all old messages after X seconds. | | TRX_OUTBOX_DB_TABLE | string | "outbox" | The name of the database outbox table. | | TRX_OUTBOX_ENABLE_MAX_ATTEMPTS_PROTECTION | boolean | false | Enable the max attempts protection. | | TRX_OUTBOX_ENABLE_POISONOUS_MESSAGE_PROTECTION | boolean | false | Enable the max poisonous attempts protection. | | TRX_INBOX_DB_TABLE | string | "inbox" | The name of the database inbox table. | | TRX_INBOX_ENABLE_MAX_ATTEMPTS_PROTECTION | boolean | true | Enable the max attempts protection. | | TRX_INBOX_ENABLE_POISONOUS_MESSAGE_PROTECTION | boolean | true | Enable the max poisonous attempts protection. |
The replication listener approach supports the following variables in addition to the above ones:
| <PREFIX> + Variable Name | Type | Default | Description | | ----------------------------------- | ------ | ---------------------------------- | ---------------------------------------------------------------------------------------------------- | | TRX_RESTART_DELAY_IN_MS | number | 250 | When there is a message handling error, how long the listener should wait to restart the processing. | | TRX_RESTART_DELAY_SLOT_IN_USE_IN_MS | number | 10000 | If the replication slot is in used, how long the listener should wait to connect again. | | TRX_OUTBOX_DB_PUBLICATION | string | "transactional_outbox_publication" | The name of the PostgreSQL publication that should be used for the outbox. | | TRX_OUTBOX_DB_REPLICATION_SLOT | string | "transactional_outbox_slot" | The name of the PostgreSQL replication slot that should be used for the outbox. | | TRX_INBOX_DB_PUBLICATION | string | "transactional_inbox_publication" | The name of the PostgreSQL publication that should be used for the inbox. | | TRX_INBOX_DB_REPLICATION_SLOT | string | "transactional_inbox_slot" | The name of the PostgreSQL replication slot that should be used for the inbox. |
The polling listener approach supports the following variables in addition to the above ones:
| <PREFIX> + Variable Name | Type | Default | Description | | ---------------------------------------- | ------ | ---------------------- | ------------------------------------------------------------------------------------ | | TRX_NEXT_MESSAGES_FUNCTION_SCHEMA | string | "public" | The database schema of the next messages function. | | TRX_NEXT_MESSAGES_BATCH_SIZE | number | 5 | The (maximum) amount of messages to retrieve in one query. | | TRX_NEXT_MESSAGES_LOCK_IN_MS | number | 5000 | How long the retrieved messages should be locked before they can be retrieved again. | | TRX_NEXT_MESSAGES_POLLING_INTERVAL_IN_MS | number | 500 | How often should the next messages function be executed. | | TRX_OUTBOX_NEXT_MESSAGES_FUNCTION_NAME | string | "next_outbox_messages" | The database function name to get the next batch of outbox messages. | | TRX_INBOX_NEXT_MESSAGES_FUNCTION_NAME | string | "next_inbox_messages" | The database function name to get the next batch of inbox messages. |
An example ENV file can then be:
TRX_DB_SCHEMA=messaging
TRX_MESSAGE_PROCESSING_TIMEOUT_IN_MS=30000
TRX_OUTBOX_DB_TABLE=outbox
TRX_OUTBOX_ENABLE_MAX_ATTEMPTS_PROTECTION=false
TRX_OUTBOX_ENABLE_POISONOUS_MESSAGE_PROTECTION=false
TRX_INBOX_DB_TABLE=inbox
TRX_INBOX_ENABLE_MAX_ATTEMPTS_PROTECTION=true
TRX_INBOX_MAX_ATTEMPTS=5
TRX_INBOX_ENABLE_POISONOUS_MESSAGE_PROTECTION=true
TRX_INBOX_MAX_POISONOUS_ATTEMPTS=3
...
Message Publisher
The message publisher is responsible for the actual transport of the message to
the recipient(s). It receives the outbox message
with all the fields -
including the metadata. Based on the metadata the message publisher should send
out the message. The messaging publisher logic or the used system is responsible
for guaranteeing at least once message delivery. You can find a RabbitMQ-based
implementation in the examples folder.
Implementing the transactional inbox consumer
A minimalistic example code for the consumer of the published message using the
transactional inbox pattern is included below. The main functions are the
initializeInboxMessageStorage
function that is used by the actual message
receiver like a RabbitMQ-based message handler to store the incoming message
(which was based on an outbox message) in the inbox table.
The other central functions are the initializeReplicationMessageListener
or
the initializePollingMessageListener
. The replication listener uses one
database connection based on a user with replication permission to receive
notifications when a new inbox message is created. The polling listener queries
the database regularly for new inbox messages.
The code uses a second database connection to open a transaction, load the inbox message from the database and lock it, execute the message handler queries/mutations, and finally mark the inbox message as processed in the database.
import { Pool } from 'pg';
import {
DatabaseClient,
IsolationLevel,
PollingListenerConfig,
ReplicationListenerConfig,
TransactionalMessage,
TransactionalMessageHandler,
createReplicationMultiConcurrencyController,
ensureExtendedError,
executeTransaction,
getDefaultLogger,
initializeMessageStorage,
initializePollingMessageListener,
initializeReplicationMessageListener,
} from 'pg-transactional-outbox';
/** The main entry point of the message producer. */
(async () => {
const logger = getDefaultLogger('inbox');
// You can also use ENV variables via getInboxReplicationListenerSettings and
// getInboxPollingListenerSettings to create the settings objects.
// This configuration is used to start a transaction that locks and updates
// the row in the inbox table that was found from the inbox table. This connection
// will also be used in the message handler so every select and data change is
// part of the same database transaction. The inbox database row is then
// marked as "processed" when everything went fine.
const dbHandlerConfig = {
host: 'localhost',
port: 5432,
user: 'db_inbox_handler',
password: 'db_inbox_handler_password',
database: 'pg_transactional_inbox',
};
// Configure the replication role to receive notifications when a new inbox
// row was added to the inbox table. This role must have the replication
// permission.
const dbListenerConfig = {
host: 'localhost',
port: 5432,
user: 'db_inbox_listener',
password: 'db_inbox_listener_password',
database: 'pg_transactional_inbox',
};
const baseSettings = {
dbSchema: 'public',
dbTable: 'inbox',
enableMaxAttemptsProtection: true,
enablePoisonousMessageProtection: true,
};
const replicationConfig: ReplicationListenerConfig = {
outboxOrInbox: 'inbox',
dbHandlerConfig,
dbListenerConfig,
settings: {
...baseSettings,
dbPublication: 'pg_transactional_inbox_pub',
dbReplicationSlot: 'pg_transactional_inbox_slot',
},
};
const pollingConfig: PollingListenerConfig = {
outboxOrInbox: 'inbox',
dbListenerConfig,
settings: {
...baseSettings,
nextMessagesBatchSize: 5,
nextMessagesFunctionName: 'next_inbox_messages',
nextMessagesPollingIntervalInMs: 250,
},
};
// Create the database pool to store the incoming inbox messages
const pool = new Pool(replicationConfig.dbHandlerConfig);
pool.on('error', (err) => {
logger.error(ensureExtendedError(err, 'DB_ERROR'), 'PostgreSQL pool error');
});
// Initialize the inbox message storage to store incoming messages in the inbox
const storeInboxMessage = initializeMessageStorage(replicationConfig, logger);
// Initialize the message receiver e.g. based on RabbitMQ
// In the simplest scenario use the inter process communication:
process.on('message', async (message: TransactionalMessage) => {
await executeTransaction(
await pool.connect(),
async (client): Promise<void> => {
await storeInboxMessage(message, client);
},
IsolationLevel.ReadCommitted,
);
});
// Define an optional concurrency strategy to handle messages with the message
// type "ABC" in parallel while handling other messages sequentially based on
// their `segment` field value.
const concurrencyStrategy = createReplicationMultiConcurrencyController(
(message) => {
switch (message.messageType) {
case 'ABC':
return 'full-concurrency';
default:
return 'segment-mutex';
}
},
);
// Declare the message handler
const movieCreatedHandler: TransactionalMessageHandler = {
aggregateType: 'movie',
messageType: 'movie_created',
handle: async (
message: TransactionalMessage,
client: DatabaseClient,
): Promise<void> => {
// Executes the message handler logic using the same database
// transaction as the inbox message acknowledgement.
const { payload } = message;
if (
typeof payload === 'object' &&
payload !== null &&
'id' in payload &&
typeof payload.id === 'string' &&
'title' in payload &&
typeof payload.title === 'string'
) {
await client.query(
`INSERT INTO public.published_movies (id, title) VALUES ($1, $2)`,
[payload.id, payload.title],
);
}
},
handleError: async (
error: Error,
message: TransactionalMessage,
_client: DatabaseClient,
retry: boolean,
): Promise<void> => {
if (!retry) {
// Potentially send a compensating message to adjust other services e.g. via the Saga Pattern
logger.error(
error,
`Giving up processing message with ID ${message.id}.`,
);
}
},
};
// Initialize and start the inbox listener
let shutdownListener: () => Promise<void>;
if (process.env.LISTENER_TYPE === 'replication') {
const [shutdown] = initializeReplicationMessageListener(
replicationConfig,
[movieCreatedHandler],
logger,
{
concurrencyStrategy,
messageProcessingTimeoutStrategy: (message: TransactionalMessage) =>
message.messageType === 'ABC' ? 10_000 : 2_000,
messageProcessingTransactionLevelStrategy: (
message: TransactionalMessage,
) =>
message.messageType === 'ABC'
? IsolationLevel.ReadCommitted
: IsolationLevel.RepeatableRead,
},
);
shutdownListener = shutdown;
} else {
const [shutdown] = initializePollingMessageListener(
pollingConfig,
[movieCreatedHandler],
logger,
{
messageProcessingTimeoutStrategy: (message: TransactionalMessage) =>
message.messageType === 'ABC' ? 10_000 : 2_000,
messageProcessingTransactionLevelStrategy: (
message: TransactionalMessage,
) =>
message.messageType === 'ABC'
? IsolationLevel.ReadCommitted
: IsolationLevel.RepeatableRead,
},
);
shutdownListener = shutdown;
}
await shutdownListener();
})();
Please note: This library offers to automatically delete inbox messages from the inbox table. Keeping them for a while is required to ensure that a message is only processed once (due to multiple deliveries or replay attacks). Please configure your duration on how long the messages should stay in this table. This can be a few minutes but also some days. You can define different age thresholds for processed messages, abandoned ones, and a general max age setting. To be safe you can add logic to not process messages anymore that are older than these defined durations.
As an alternative you could also create ainbox_archive
table and write a script to move the messages into that table instead of deleting them.
Message receiver
The message receiver is the counterpart to the message publisher. It receives the transferred outbox message and has to store it in the inbox table. It is the responsibility of the receiver to guarantee that the message is written to the inbox table via the inbox storage functionality. It can try it multiple times if needed. You can find a RabbitMQ-based implementation in the examples folder.
Message Storage
When a message is received it must use the inbox storage functionality to store
the message. The initializeInboxMessageStorage
is used to create an inbox
storage instance. The inbox storage takes the incoming message and stores it in
the inbox table.
Inbox Listener
The initializeReplicationMessageListener
is used to create and start the
logical replication-based listener that gets notified when a new inbox message
is written to the inbox table. It takes the message publisher instance as an
input parameter and calls it to send out messages. Other parameters are the
config
object, a logger
instance and an optional strategies object to
fine-tune and customize specific aspects of the inbox listener.
It uses the same logic and settings as the outbox listener. Please check that section for further explanations and available configuration values.
The inbox listener starts the actual message handling logic. This is most often more complex and longer running. Therefor it is suggested to always enable the maximum attempts protection and the poisonous message protection.
Message Handler
The message handler is a component that defines how to process messages. You can
provide a general one (GeneralMessageHandler
) that handles all messages. This
is often used for the outbox message handler which sends all messages to the
same message broker. It has the following interface:
handle
: A function that contains the custom business logic to handle an inbox message. It receives two parameters:message
andclient
. Themessage
parameter is an object that contains the message id, payload, and metadata. Theclient
parameter is a database client that is part of a transaction to safely handle the inbox message. The function should return a promise that resolves when the message is successfully processed, or rejects with an error if the message cannot be processed. The error will cause the message to be retried later.handleError
: An optional function that contains the custom business logic to handle an error that was caused by thehandle
function. It receives four parameters:error
,message
,client
, andattempts
. Theerror
parameter is the error that was thrown in thehandle
function. Themessage
parameter is the same as in thehandle
function. Theclient
parameter is a database client that is part of a new transaction to safely handle the error. Theattempts
parameter is an object that contains the current and maximum number of times the message will be attempted. The function should return a promise that resolves to a flag that defines if the message should be retried (transient_error
) or not (permanent_error
).
You can also provide specific handlers (TransactionalMessageHandler
) where
each handles a specific aggregate type and message type. In addition to the
above interface it includes the following properties:
aggregateType
: The name of the aggregate root type that the message belongs to. For example,movie
,customer
,product
, etc.messageType
: The name of the message type that the handler can handle. For example,movie_created
,customer_updated
,restock_product
, etc.
The message handler is used by the transactional outbox and inbox listeners to
process messages in a reliable and consistent way. The service will invoke the
handle
function of the appropriate handler for each message in the inbox, and
handle any errors or retries using the handleError
function if provided.
Message format
Messages are the means to transport information in a structured way from the message producer to the message consumer.
Both the outbox and inbox message have the following properties:
| Field Name | Field Type | Description | | ---------------- | ---------- | -------------------------------------------------------------------------------------- | | id | string | The unique identifier of the message. Used to ensure a message is only processed once. | | aggregateType | string | The type of the aggregate root (in DDD context) to which this message is related. | | aggregateId | string | The unique identifier of the aggregate. | | messageType | string | The type name of the event or command. | | segment | string | Way to group messages for parallel message execution. | | concurrency | string | Cane be 'sequential' or 'parallel' for message processing | | payload | object | The message payload with the details for an event or instructions for a command. | | metadata | object | Optional metadata used for the actual message transfer. | | lockedUntil | string | The date and time in ISO 8601 UTC format until the message is locked (polling only). | | createdAt | string | The date and time in ISO 8601 UTC format when the message was created. | | startedAttempts | number | The number of times a message was attempted to be processed. | | finishedAttempts | number | The number of times a message was processed (successfully or with a caught error). | | processedAt | string | The date and time in ISO 8601 UTC format when the message was processed. | | abandonedAt | string | The date and time in ISO 8601 UTC format when the message was abandoned. |
Strategies
The strategy pattern is a behavioral design pattern that enables selecting an algorithm at runtime. Instead of implementing a single algorithm directly, the listeners receive run-time instructions as to which algorithm should be used. This allows you to customize different parts of the message handling for concurrency, retries, poisonous message handling, etc. By defining a common interface for different scenarios you can use either existing code or write your custom implementations.
Message handler strategies
Message processing timeout strategy
The messageProcessingTimeoutStrategy
allows you to define a message-based
timeout on how long the message is allowed to be processed (in milliseconds).
This allows you to give more time to process "expensive" messages while still
processing others on a short timeout. By default, it uses the configured
messageProcessingTimeoutInMs
.
Message processing Transaction level strategy
The replication listener lets you define the messageProcessingTransactionLevel
per message. Some message processing may have higher isolation level
requirements than others. If no custom strategy is provided it uses the default
database transaction level via BEGIN
.
Message processing DB client strategy
Messages are processed in a database transaction that verifies from the outbox/inbox table if the message was not processed already and loads the retry-specific data. It handles also the business-logic related database work (especially for the inbox).
Some message handlers may require a database role with elevated permissions
while others can use more restricted users. With this strategy, you can return a
database client from your desired Pool in the getClient
function. When the
replication listener is shut down it will call the shutdown
function where you
can close your database pool and run other cleanup logic.
Message retry strategy
When processing a message an error can be thrown. The transactional listener
catches that error and needs to decide if the message should be processed
again - or not. The messageRetryStrategy
offers the possibility to customize
the decision if a message should be retried or not. By default, the
defaultMessageRetryStrategy
is used. It will retry the message up the
configured value in the maxAttempts
setting (including the initial attempt).
Poisonous message retry strategy
A poisonous message is a message that causes the service to crash repeatedly and prevents other messages from being processed. To avoid this situation, the service tracks the number of times a message is started and finished, regardless of the outcome (success or error). If the service crashes while processing a message, it will retry the message until it succeeds or reaches a maximum number of attempts.
You can customize the behavior of the service by changing the following options:
settings.maxPoisonousAttempts
: This is the maximum number of times the service will retry a message before marking it as poisonous.poisonousMessageRetryStrategy
: This is a function that determines whether a message should be retried or not, based on the started and finished counts. The default function is (started, finished) => started - finished >=maxPoisonousAttempts
, but you can implement your own logic and pass it as an argument to the service constructor.
Replication listener strategies
Replication listener concurrency strategy
The outbox and inbox listeners process messages that are stored in their corresponding tables. When they process the messages, you can influence the level of concurrency of the listeners. The default concurrency controller will use a mutex to guarantee sequential message processing. Concurrency strategies are only used for the replication listener. The polling listener solves this on the polling query side.
There are the following pre-build ones but you can also write your own:
createFullConcurrencyController
- this controller allows the parallel processing of messages without guarantees on the processing order.createMutexConcurrencyController
- this controller guarantees sequential message processing across all messages.createSemaphoreConcurrencyController
- this controller allows the processing of messages in parallel up to a configurable number.createReplicationSegmentMutexConcurrencyController
- this controller enables sequential message processing based on the message "segment" discriminator. The controller still guarantees sequential message processing but only across messages with the same segment value.createMultiConcurrencyController
- this is a combined concurrency controller. You can define which of the above controllers should be used for different kinds of messages.
Replication listener restart time strategy
When the outbox or inbox listener fails due to an error it is restarted. The
listenerRestartStrategy
is used to define how long it should wait before it
attempts to start again. It allows you to decide (based on the error) to log or
track the caught error.
The defaultListenerRestartStrategy
checks if the error message is a PostgreSQL
error. If the PostgreSQL error is about the replication slot being in use, it
logs a trace entry and waits for the configured restartDelaySlotInUseInMs
time. Otherwise, it logs an error entry and waits for the configured
restartDelayInMs
.
The defaultListenerAndSlotRestartStrategy
uses the same logic as the
defaultListenerRestartStrategy
. In addition, it checks if a PostgreSQL error
is about the replication slot not existing (e.g. after a DB failover). Then it
tries to create the replication slot with the connection details of the
replication user slot and waits for the configured restartDelayInMs
to restart
the listener.
Polling listener strategies
Polling listener batch size strategy
The PollingListenerBatchSizeStrategy
defines the batch size strategy how many
messages should be loaded at once.
When using the default defaultPollingListenerBatchSizeStrategy
batch size
strategy it returns the configured value from the nextMessagesBatchSize
. But
the first few times until the batch size is reached it will respond to return
only one message. This protects against poisonous messages: if 5 messages would
be taken during startup all those 5 would be marked as poisonous if one of them
fails.
Extensions
You can (exponentially) increase the time after which a message can be retried again. You can set the locked_until to some configurable time/factor in the future for the polling listener approach. This can be done in the message error handler.
Instead of just deleting old messages, you could insert them into an "outbox_archive" and "inbox_archive" table when they are deleted from the main tables. This would help to debug production issues and allow you to copy messages back to their main tables to do another retry.
This could be done via something like this:
CREATE OR REPLACE FUNCTION trg_outbox_archive()
RETURNS trigger AS
$BODY$
BEGIN
INSERT INTO public.outbox_archive SELECT (OLD).*;
RETURN NULL;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
DROP TRIGGER IF EXISTS after_delete_outbox ON public.outbox;
CREATE TRIGGER after_delete_outbox
AFTER DELETE
ON public.outbox
FOR EACH ROW
EXECUTE PROCEDURE trg_outbox_archive();
CREATE TRIGGER after_delete_outbox
AFTER DELETE
ON public.outbox
FOR EACH ROW
EXECUTE PROCEDURE trg_outbox_archive();
Testing
The library has a large set of unit tests alongside the files that implement the
actual logic. You can run the unit tests from the lib
folder via yarn test
and yarn test:coverage
.
The __tests__
folder contains integration tests that test the functionality of
the outbox and inbox listener implementation. The tests use the testcontainers
library to start up an actual PostgreSQL server. They test the functionality
against an actual database and for resilience tests where the test container is
stopped and restarted to test the library against an unreliable database
instance. You can simply run the test
script to execute the tests.
You can manually test the implementation using the examples/rabbitmq
producer
and consumer implementations. Copy the .env.template
files as .env
files and
adjust these files if needed. Especially the LISTENER_TYPE
is useful to test
the replication vs. polling listener approach. To test the two example
applications you can use yarn dev:watch
.