@dev5c32373043/nestjs-pulsar
v1.0.0
Published
Apache Pulsar module for Nest.js framework
Downloads
3
Maintainers
Readme
Description
Apache Pulsar module for Nest.
Based on https://www.npmjs.com/package/nest-pulsar with dependency updates and small improvements.
Installation
$ npm i @dev5c32373043/nestjs-pulsar
Note
Pulsar Node.js client library is based on the C++ client library. You must install the Pulsar C++ client library before installing a Node.js client. For more details, see pulsar-client-node on GitHub or The Pulsar Node.js client dedicated page on Apache Pulsar documentation.
Getting started
Once the installation process (npm install) is complete, we can import the PulsarModule
into the root AppModule
.
import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forRoot({
serviceUrl: 'pulsar://localhost:6650',
}),
],
})
export class AppModule {}
The forRoot()
method supports all the configuration properties exposed by the Client class constructor from the pulsar-client
package.
Note
forRoot()
inject the Pulsar Client provider globally.
Next, let's look at another module, let's say the UsersModule
.
Once the pulsar Client configured. You can inject the needed Producer
, Consumer
, Reader
using the forFeature()
method:
import { Module } from '@nestjs/common';
import { PulsarModule, MessageId } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forFeature('producer', 'my-producer', {
topic: 'my-topic',
}),
PulsarModule.forFeature('consumer', 'my-consumer', {
topic: 'my-topic',
subscription: 'my-sub',
}),
PulsarModule.forFeature('reader', 'my-reader', {
topic: 'my-topic',
startMessageId: MessageId.earliest(),
}),
],
})
export class UsersModule {}
Warning
Producer, consumer, reader name (2nd param) is mandatory. Please note that you shouldn't have multiple producers, consumers or readers with the same name, otherwise they will get overridden.
The forFeature()
method third param supports all the configuration properties exposed by the following Pulsar Client
factory methods:
producer
feature configuration object corresponds toclient.createProducer()
configuration object.consumer
feature configuration object corresponds toclient.subscribe()
configuration object.reader
feature configuration object corresponds toclient.createReader()
configuration object.
This module uses the forFeature()
method to define which features (producer, consumer or reader) are registered in the current scope. With that in place, we can inject the Producer
, Consumer
and Reader
Pulsar objects into the UsersService using the @PulsarInject() decorator:
import { Injectable } from '@nestjs/common';
import { InjectPulsar, Producer, Consumer, Reader } from '@dev5c32373043/nestjs-pulsar';
@Injectable()
export class UsersService {
constructor(
@InjectPulsar('producer', 'my-producer')
private readonly producer: Producer,
@InjectPulsar('consumer', 'my-consumer')
private readonly consumer: Consumer,
@InjectPulsar('reader', 'my-reader')
private readonly reader: Reader,
) {}
async publish(data: any) {
await this.producer.send({ data: Buffer.from(JSON.stringify(data)) });
}
async consume(timeout: number = 1000) {
const rawMessage = await this.consumer.receive(timeout); // timeout is optional
const data = JSON.parse(rawMessage.getData().toString());
await this.consumer.acknowledge(rawMessage);
return data;
}
async read() {
if (!this.reader.hasNext()) return;
const rawMessage = await this.reader.readNext();
const data = JSON.parse(rawMessage.getData().toString());
return data;
}
}
If you want to use the producer, consumer or reader outside of the module which imports PulsarModule.forFeature()
, you'll need to re-export the providers generated by it. You can do this by exporting the whole module, like this:
import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forFeature('producer', 'my-producer', {
topic: 'my-topic',
}),
PulsarModule.forFeature('consumer', 'my-consumer', {
topic: 'my-topic',
subscription: 'my-sub',
}),
PulsarModule.forFeature('reader', 'my-reader', {
topic: 'my-topic',
}),
],
exports: [PulsarModule],
})
export class UsersModule {}
Async configuration
You may want to pass your module options asynchronously instead of statically. In this case, use the forRootAsync()
method:
import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forRootAsync({
useFactory: () => ({
serviceUrl: 'pulsar://localhost:6650',
}),
}),
],
})
export class AppModule {}
Our factory behaves like any other asynchronous provider (e.g., it can be async and it's able to inject dependencies through inject):
import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
serviceUrl: config.get('SERVICE_URL'),
}),
}),
],
})
export class AppModule {}
Multiple clients
Some projects require multiple pulsar clients. This can also be achieved with this module. To work with multiple clients, first create the clients. In this case, client naming becomes mandatory.
import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forRoot({
serviceUrl: 'pulsar://localhost:6650',
}),
PulsarModule.forRoot(
{
serviceUrl: 'pulsar://other.client:6650',
},
'other-client', // client name
),
],
})
export class AppModule {}
Warning
If you don't set the name for a client, its name is set to default. Please note that you shouldn't have multiple clients without a name, or with the same name, otherwise they will get overridden.
If you are using PulsarModule.forRootAsync()
, you have to also set the client name the same way:
import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';
@Module({
imports: [
PulsarModule.forRootAsync({
useFactory: () => ({
serviceUrl: 'pulsar://localhost:6650',
}),
}),
PulsarModule.forRootAsync(
{
useFactory: () => ({
serviceUrl: 'pulsar://other.client:6650',
}),
},
'other-client',
),
],
})
export class AppModule {}
Testing
When it comes to unit testing an application, we usually want to avoid making a real Pulsar connection, keeping our test suites independent and their execution process as fast as possible. But our classes might depend on producers, consumers or readears that are pulled that are created from the client instance. How do we handle that? The solution is to create mocks. In order to achieve that, we set up custom providers. Each registered producer, consumer or reader is automatically represented by an auto-generated token.
The @dev5c32373043/nestjs-pulsar package exposes the getFeatureToken()
function which returns a prepared token based on a given feature type and name.
@Module({
providers: [
UsersService,
{
provide: getFeatureToken('consumer', 'my-consumer'),
useValue: mockConsumer,
},
],
})
export class UsersModule {}
Now a substitute mockConsumer will be used as the Consumer
named myConsumer
. Whenever any class asks for myConsumer
using an @PulsarInject()
decorator, Nest will use the registered mockConsumer object.
License
Nest Pulsar is MIT licensed.