@boringnode/bus
v0.7.0
Published
A simple and lean driver-based service bus implementation
Downloads
10,494
Readme
@boringnode/bus
is a service bus implementation for Node.js. It is designed to be simple and easy to use.
Currently, it supports the following transports:
Table of Contents
Installation
npm install @boringnode/bus
Usage
The module exposes a manager that can be used to register buses.
import { BusManager } from '@boringnode/bus'
import { redis } from '@boringnode/bus/transports/redis'
import { mqtt } from '@boringnode/bus/transports/mqtt'
import { memory } from '@boringnode/bus/transports/memory'
const manager = new BusManager({
default: 'main',
transports: {
main: {
transport: memory(),
},
redis: {
transport: redis({
host: 'localhost',
port: 6379,
}),
},
mqtt: {
transport: mqtt({
host: 'localhost',
port: 1883,
}),
},
}
})
Once the manager is created, you can subscribe to channels and publish messages.
manager.subscribe('channel', (message) => {
console.log('Received message', message)
})
manager.publish('channel', 'Hello world')
By default, the bus will use the default
transport. You can specify different transport by using the use
method.
manager.use('redis').publish('channel', 'Hello world')
manager.use('mqtt').publish('channel', 'Hello world')
Without the manager
If you don't need multiple buses, you can create a single bus directly by importing the transports and the Bus class.
import { Bus } from '@boringnode/bus'
import { RedisTransport } from '@boringnode/bus/transports/redis'
const transport = new RedisTransport({
host: 'localhost',
port: 6379,
})
const bus = new Bus(transport, {
retryQueue: {
retryInterval: '100ms'
}
})
Retry Queue
The bus also supports a retry queue. When a message fails to be published, it will be moved to the retry queue.
For example, your Redis server is down.
const manager = new BusManager({
default: 'main',
transports: {
main: {
transport: redis({
host: 'localhost',
port: 6379,
}),
retryQueue: {
retryInterval: '100ms'
}
},
}
})
manager.use('redis').publish('channel', 'Hello World')
The message will be moved to the retry queue and will be retried every 100ms.
You have multiple options to configure the retry queue.
export interface RetryQueueOptions {
// Enable the retry queue (default: true)
enabled?: boolean
// Defines if we allow duplicates messages in the retry queue (default: true)
removeDuplicates?: boolean
// The maximum size of the retry queue (default: null)
maxSize?: number | null
// The interval between each retry (default: false)
retryInterval?: Duration | false
}
Test helpers
The module also provides some test helpers to make it easier to test the code that relies on the bus. First, you can use the MemoryTransport
to create a bus that uses an in-memory transport.
You can also use the ChaosTransport
to simulate a transport that fails randomly, in order to test the resilience of your code.
import { Bus } from '@boringnode/bus'
import { ChaosTransport } from '@boringnode/bus/test_helpers'
const buggyTransport = new ChaosTransport(new MemoryTransport())
const bus = new Bus(buggyTransport)
/**
* Now, every time you will try to publish a message, the transport
* will throw an error.
*/
buggyTransport.alwaysThrow()