@scaliolabs/nestjs-kafka
v1.3.0
Published
A Kafka wrapper around `kafkajs` that adds Kafka support to NestJs following the `Orchestrator and Explorer` pattern, with minimum boilerplate code needed.
Readme
@scaliolabs/nestjs-kafka
Overview
The @scaliolabs/nestjs-kafka
is a Kafka wrapper around kafkajs
that adds Kafka support to NestJs following the Orchestrator and Explorer
pattern. It provides functionality for integrating with Kafka, a distributed streaming platform, with minimum boilerplate code needed.
Features
- @SubscribeTo: Process a Kafka topic with your function.
- ProducerService: Sends messages to Kafka topics
All
kafkajs
configuration flags are still exposed, allowing you to customize your Kafka initialization without the package getting in your way
Installation
To begin using it, we first install the required dependencies.
yarn add @scaliolabs/nestjs-kafka kafkajs
Configuring the Module
// app.module.ts
import { KafkaModule } from '@atomik-core'
// Basic Config
config: IIKafkaConfig = {
brokers: ['127.0.0.1:9092'],
clientId: 'vitamin-seats',
groupId: 'my-group-id',
}
// Customized Config
config: IIKafkaConfig = {
brokers: ['127.0.0.1:9092'],
clientId: 'vitamin-seats',
groupId: 'my-group-id',]
consumerConfig: {
allowAutoTopicCreation: true,
},
producerConfig: {
allowAutoTopicCreation: true,
},
kafkaConfig: {
requestTimeout: 5000
}
}
@Module({
imports: [
KafkaModule.register(config),
],
})
export class AppModule {}
Additional options can be found in the KafkaJS documentation.
Setup Typings
We suggest, as a best practice, to keep the messages you pass to Kafka and your topics strongly typed, so we encourage you to setup your typings and integrating with our package namespace.
/**
* Topics
*/
export enum TopicsEnum {
MY_TOPIC = 'my-topic',
}
/**
* Message Types
*/
export type IMyMessage = { name: string }
Producing Messages
To send messages to Kafka, you need to inject the KafkaProducerService
into your service constructor, create a KafkaPayload
and send it.
The sendMessage
functions accepts one or multiple messages.
@Injectable()
export class MyService {
constructor(private readonly producer: KafkaProducerService) {}
someMethod() {
const message = KafkaPayload.create<IMyMessage>({
body: { name: 'Franz' },
})
this.producer.sendMessage(TopicsEnum.MY_TOPIC, message)
}
}
When creating a message, you can provide some advanced options to tailor your usage to specific needs. All of those information will also be available when consuming that same message.
| Property | Type | Description |
| -------------------------- | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------- |
| body | T
| The message payload of generic type T
. |
| messageType (optional) | string
| A string representing the type of the message. This is useful if you want to have different types of messages in the same topic. |
| key (optional) | string
| A string representing the key of the message. This is used to guarantee that all messages with the same key will be sent to the same partition. |
| partition (optional) | number
| A number indicating the partition number where the message will be sent. |
| headers (optional) | Record<string, any>
| An object containing key-value pairs representing the headers of the message. This is used to add metadata to the message. |
Consuming Messages
To consume messages, just must annotate your class with @KafkaListener()
and your method with the decorator @ListenTopic(topicName, options)
.
Your function will then receive two parameters: A IKafkaConsumedMessage
and a IKafkaContext
, that can be used to further interact with your message processing.
@Injectable()
@KafkaListener()
export class MyService {
constructor(private readonly producer: KafkaProducerService) {}
@ListenTopic('my-topic')
async processMyTopic({ body }: IKafkaConsumedMessage<IMyMessage>, context: IKafkaContext) {
console.log(body.name)
}
}
Decorator Options
The IKafkaDecoratorOptions
type defines the configuration options for a Kafka decorator. This interface is used to specify various settings related to Kafka topic consumption.
| Property | Type | Description |
| ---------------------------- | --------- | ---------------------------------------------------------------------------------------------------------------- |
| fromBeginning | boolean
| If set to true
, the consumer will start reading messages from the beginning of the topic. Defaults to false
. |
| useInstanceConsumerGroup | boolean
| If set to true
, the consumer will use an instance-specific consumer group. Defaults to false
. |
| maxRetries | number
| Quantity of retries this consumer should do. Set to 0 to disable retries. Defaults to 0
. |
| retryFactor | number
| Factor the retry timeout should be multiplied by on each retry. Defaults to 2
. |
| retryMinTimeout | number
| Minimum time in milliseconds to wait before retrying. Defaults to 1000ms
. |
| retryMaxTimeout | number
| Maximum time in milliseconds to wait before retrying. Defaults to 5000ms
. |
| retryRandomize | boolean
| Randomize the retry timeout with a number between 1 and 2. Defaults to true
. |
The decorator also accept all properties from original kafkajs
message consumption implementation.
KafkaContext Interface
The IKafkaContext
interface defines the structure for Kafka context objects. It includes the following properties and methods:
| Property | Type | Description |
| ---------------- | ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| topic | string
| The name of the Kafka topic. |
| partition | number
| The partition number within the Kafka topic. |
| heartbeat | () => void
| A method to send a heartbeat signal. It's useful for long running processors to signal to Kafka that you're still processing, otherwise Kafka might auto remove your consumer from the pool. |
| pause | () => () => void
| A method to pause the Kafka consumer. This method returns another function that can be called to resume the consumer. |
| retryAttempt | number
| Number indicating the retry count of this processing. First time executing, this value is 0 |
DLQ Support
Dead Letter Queue (DLQ) support allows you to handle messages that cannot be processed successfully. When a message fails to be processed after a certain number of retries, it can be sent to a DLQ for further inspection and handling. This ensures that problematic messages do not block the processing of other messages and provides a mechanism for dealing with errors in a controlled manner.
To enable DLQ support, you need to configure the KafkaModule
with DLQ settings.
Example configuration:
config: IIKafkaConfig = {
dlqTopic?: (originalTopic: string, messageType?: string) => 'my-dlq-topic',
//..other config options go here
}
In this example, messages that fail to process will be sent to the my-dlq-topic
for further analysis and handling.
Since dlqTopic
is a function, you can add conditional logic to decide which DLQ you want to send based on your topic name
or message type
.
Future Ideas
- Add support to Kafka Schemas using Confluent Schema Registry
License
This project is licensed under the MIT License. See the LICENSE file for details.
Support
If you encounter any issues or have questions, feel free to open an issue on our GitHub repository.
Acknowledgements
We would like to thank the contributors of KafkaJS for their excellent work on the Kafka client for Node.js.
Contact
For more information, please contact us at [email protected].