nestjs-nsq-transporter
v2.0.0
Published
## Overview
Downloads
129
Readme
Nestjs Transporter For NSQ
Overview
This library provides a nestjs transporter by taking NSQ as its message broker. We could simply take the concept of a transporter as an event-driven framework as follows:
- As a message producer, I could send a message to NSQ to various topics by emitting various events based on the topic names.
- As a message consumer, I could receive a message by subscribing to those emitted events.
For more details on the overall architecture, please kinldy refer to this article
Restriction
This library has not implemented the request-response communication style and currently it only supports the event-dispatching style.
Produce a message
Add the nsq client provider in your module's definition:
import { DynamicModule, Module } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ClientNsq, NsqOptions } from 'nestjs-nsq-transporter';
@Module({
providers: [
{
provide: 'NSQ_CLIENT',
useFactory: (): ClientProxy => new ClientNsq(options),
}
SomeService,
],
})
export class SomeModule {}
Then you can inject the ClientNsq instance and use it to emit the message event:
import { Injectable } from '@nestjs/common';
@Injectable()
export class SomeService {
constructor(@Inject('NSQ_CLIENT') private nsqProducer: ClientNsq) {}
sendMessage(topic: string, msg: any) {
return this.nsqProducer.emit(topic, msg);
}
}
Notes: You can not blindly pass any key/value as the 2nd argument(msg
) of the emit
function because there are two special reserved keys: meta
and options
. The meta
is used by the default OutboundEventSerializer
to attach extra informations; options
is for setting the options for the nsq message producing, see the sample below.
this.nsqProducer.emit(topic,
{ foo: 1, bar: 2, meta: { component: 'XYZ'}, options: {retry: { retries: 3 } }
);
// The `emit` above will send a nsq message with payload as follow with 3 times retry strategy.
{ data: { foo: 1, bar: 2 }, meta: { component: 'XYZ'} }
Receive a message
Connect to the nsq microservice in your app and specify the nsq options as follows:
import { MicroserviceOptions } from '@nestjs/microservices';
import { serverNsq } from 'nestjs-nsq-transporter';
app.connectMicroservice<MicroserviceOptions>({
strategy: new ServerNsq(options),
});
Then use @EventPattern to mark a function as the handler for messages from specific event pattern:
import { Injectable } from '@nestjs/common';
import { EventPattern, Ctx, Payload } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern({
topic: 'topic1',
channel: 'channel1',
options: { // optional
maxAttempts: 3
}
})
messageHandlerForTopic1(@Payload() payload: any, @Ctx() context: NsqContext)
// Handle messages
// Notes: if you throw an Error from this messageHandler, it's better to use `RpcException` so that we have explicit log in nsq-transporter.
}
}
Definition of NsqOptions
The options
that passed to either ServerNsq
or ClientNsq
has the type as NsqOptions and the available fields are as folows:
| Field Name | Is Required | Type | Description | Example |
| :------------------: | :---------: | :---------------------------------------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------------------------: |
| lookupdHTTPAddresses | No | string[]
| http address list for nsq lookupds | ['http://localhost:4161']
|
| strategy | No | 'round_robin' or 'fan_out'
| message sending strategy | round_robin
|
| discardHandler | No | (arg: any) => void
| handler function to process when message is discarded | (arg: any) => console.log(arg)
|
| maxInFlight | No | number
| The maximum number of messages to process at once | 1
|
| requeueDelay | No | number
| The default amount of time (milliseconds) a message requeued should be delayed by before being dispatched by nsqd. | 90000
|
| lookupdPollInterval | No | number
| The frequency in seconds for querying lookupd instances. | 60
|
| maxAttempts | No | number
| The number of times a given message will be attempted (given to MESSAGE handler) before it will be handed to the DISCARD handler and then automatically finished | 3
|
| serializer | No | Serializer in @nestjs/microservices
| The instance of Serializer
class which provides a serialize
method to serialize the outbound message | serialize(value: any) => value
|
| deserializer | No | ConsumerDeserializer in @nestjs/microservices
| The instance of ConsumerDeserializer
class which provides a deserialize
method to deserialize the inbound message | deserialize(value: any) => value
|