@castore/in-memory-message-bus-adapter
v1.25.3
Published
DRY Castore MessageBus definition using EventEmitter
Downloads
151
Readme
In Memory Message Bus Adapter
DRY Castore MessageBus
definition using Event Emitters.
📥 Installation
# npm
npm install @castore/in-memory-message-bus-adapter
# yarn
yarn add @castore/in-memory-message-bus-adapter
This package has @castore/core
as peer dependency, so you will have to install it as well:
# npm
npm install @castore/core
# yarn
yarn add @castore/core
👩💻 Usage
The simplest way to use this adapter is to use the attachTo
static method:
// 👇 Note: EventEmitter is a native NodeJS library
// Outside of NodeJS (like browsers) you can use the event-emitter package
import { EventEmitter } from 'events';
import { InMemoryMessageBusAdapter } from '@castore/in-memory-message-bus-adapter';
const eventEmitter = new EventEmitter();
const messageBusAdapter = InMemoryMessageBusAdapter.attachTo(
appMessageBus,
{ eventEmitter }, // <= Constructor arguments
);
This will make your messageBusAdapter
inherit from your appMessageBus
types while plugging them together 🙌
You can also instanciate one on its own, but notice the code duplication:
import type { MessageBusMessage } from '@castore/core';
import { InMemoryMessageBusAdapter } from '@castore/in-memory-message-bus-adapter';
const messageBusAdapter = new InMemoryMessageBusAdapter<
MessageBusMessage<typeof appMessageBus>
>({ eventEmitter });
appMessageBus.messageBusAdapter = messageBusAdapter;
👂 Add listeners
Similarly to event emitters, the inMemoryMessageBusAdapter
exposes an on
method that takes two arguments:
- A filter patterns to optionally specify an
eventStoreId
and an eventtype
to listen to (NotificationEventBus
andStateCarryingEventBus
only), and wether replayed events should be included - An async callback to execute if the message matches the filter pattern
// 👇 Listen to all messages
messageBusAdapter.on({}, async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
});
// 👇 Listen only to pokemons messages
messageBusAdapter.on({ eventStoreId: 'POKEMONS' }, async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
});
// 👇 Listen only to POKEMON_APPEARED created messages
messageBusAdapter.on(
{ eventStoreId: 'POKEMONS', eventType: 'POKEMON_APPEARED' },
async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
},
);
// 👇 Include replayed events
messageBusAdapter.on(
{ eventStoreId: 'POKEMONS', eventType: 'POKEMON_APPEARED', onReplay: true },
async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
},
);
For more control, the callback has access to more context through its second argument:
messageBusAdapter.on(
...,
async (message, context) => {
const { eventStoreId, event } = message;
const {
// 👇 See "Retry policy" section below
attempt,
retryAttemptsLeft,
// 👇 If event is replayed
replay,
} = context;
},
);
The same callback can be re-used with different filter patterns. If a message matches several of them, it will still be triggered once:
const logSomething = async () => {
console.log('Received message!');
};
messageBusAdapter.on({ eventStoreId: 'POKEMONS' }, logSomething);
messageBusAdapter.on(
{ eventStoreId: 'POKEMONS', eventType: 'POKEMON_APPEARED' },
logSomething,
);
await appMessageBus.publishMessage(pokemonAppearedEvent);
// 👇 Console output (only once):
// "Received message!"
Listeners cannot be removed for now.
♻️ Retry policy
This adapter will retry failed messages handling on a per listener basis. You can specify a different retry policy than the default one via its constructor arguments:
- retryAttempts (?number = 2): The maximum number of retry attempts for a message in case of listener execution failure. If all the retries fail, the error is logged with
console.error
, and the message ignored. - retryDelayInMs (?number = 30000): The minimum delay in milliseconds between a listener execution failure and its first retry.
- retryBackoffRate (?number = 2): A factor applied to the
retryDelayInMs
at each subsequent retry.
const messageBusAdapter = InMemoryMessageBusAdapter.attachTo(appMessageBus, {
eventEmitter,
retryAttempts: 3,
retryDelayInMs: 10000,
retryBackoffRate: 1.5,
});
// 👇 Alternatively
const messageBusAdapter = new InMemoryMessageBusAdapter<
MessageBusMessage<typeof appMessageBus>
>({
eventEmitter,
retryAttempts: 3,
retryDelayInMs: 10000,
retryBackoffRate: 1.5,
});
For instance, if a message is listened by two listeners A and B, with listener A continously failing, the sequence of code execution (with the default retry policy) will look like this:
- Listener A execution: ❌ Failure
- Listener B execution: ✅ Success
- 30 seconds of delay
- Listener A execution: ❌ Failure
- 60 seconds of delay (30x2)
- Listener A execution: ❌ Failure
- No more retry attempt, error is logged