amqplib-base-adapter
v1.2.6
Published
base adapter form amqplib
Downloads
10
Readme
Overview
Base adapter for amqplib
With all connection logic and logs!
As logger it uses winston.
You can find producer and consumer example in /examples directory
Config
In the BaseConsumerConfig
and BaseProducerConfig
you could pass an array or an object to rmq
const ExampleConfig = {
...
rmq: [
{
host: env.RMQ_CLUSTER_ADDRESS_1,
password: env.RMQ_CLUSTER_PASSWORD_1,
port: env.RMQ_CLUSTER_PORT_1,
username: env.RMQ_CLUSTER_USERNAME_1,
},
{
host: env.RMQ_CLUSTER_ADDRESS_2,
password: env.RMQ_CLUSTER_PASSWORD_2,
port: env.RMQ_CLUSTER_PORT_2,
username: env.RMQ_CLUSTER_USERNAME_2,
},
],
...
};
Consumer example
const ConsumerExampleConfig: BaseConsumerConfig = {
queue: 'example.queue',
exchange: 'example_exchange',
exchangeType: 'topic',
routingKey: 'example_route',
prefetch: 1,
rmq: {
host: env.RMQ_CLUSTER_ADDRESS,
password: env.RMQ_CLUSTER_PASSWORD,
port: env.RMQ_CLUSTER_PORT,
username: env.RMQ_CLUSTER_USERNAME,
},
environment: env.ENVIRONMENT,
};
class ConsumerExample extends BaseConsumer {
async handleMessage(message: any) {
try {
this.logger.info({ message });
} catch (err) {
this.logger.error('[ConsumerExample] error handle message', err);
}
}
onClose() {
this.logger.error('[ConsumerExample] Connection closed, reconnecting', { errorCode: this.errorCode });
}
onError(error: any) {
this.logger.error('[ConsumerExample] Connection error', error, { errorCode: this.errorCode });
}
onConnectionFailed(error: Error) {
this.logger.error('[ConsumerExample] Connection failed:', error);
}
}
const consumerExample = new ConsumerExample(ConsumerExampleConfig);
void consumerExample.run();
Producer example
const ProducerExampleConfig = {
exchange: 'example_exchange',
exchangeType: 'topic',
routingKey: 'example_route',
rmq: {
host: env.RMQ_CLUSTER_ADDRESS,
password: env.RMQ_CLUSTER_PASSWORD,
port: env.RMQ_CLUSTER_PORT,
username: env.RMQ_CLUSTER_USERNAME,
},
environment: env.ENVIRONMENT,
};
class ProducerExample extends BaseProducer {
async publish() {
try {
const message: Buffer = Buffer.from(JSON.stringify({
test: 'testdata1',
}));
const result = this.channel?.publish(this.exchange, this.routingKey, message);
this.logger.info('[ProducerExample] publish result: ', { result, message });
} catch (error) {
this.logger.error('[ProducerExample] error publish messages', error);
}
}
onClose() {
this.logger.error('[ProducerExample] Connection closed, reconnecting', { errorCode: this.errorCode });
}
onError(error: any) {
this.logger.error('[ProducerExample] Connection error', error, { errorCode: this.errorCode });
}
onConnectionFailed(error: Error) {
this.logger.error('[ProducerExample] Connection failed:', error);
}
}
const producerExample = new ProducerExample(ProducerExampleConfig);
void producerExample.run();