npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@scaliolabs/nestjs-kafka

v1.3.0

Published

A Kafka wrapper around `kafkajs` that adds Kafka support to NestJs following the `Orchestrator and Explorer` pattern, with minimum boilerplate code needed.

Readme

@scaliolabs/nestjs-kafka

Overview

The @scaliolabs/nestjs-kafka is a Kafka wrapper around kafkajs that adds Kafka support to NestJs following the Orchestrator and Explorer pattern. It provides functionality for integrating with Kafka, a distributed streaming platform, with minimum boilerplate code needed.

Features

  • @SubscribeTo: Process a Kafka topic with your function.
  • ProducerService: Sends messages to Kafka topics

All kafkajs configuration flags are still exposed, allowing you to customize your Kafka initialization without the package getting in your way

Installation

To begin using it, we first install the required dependencies.

yarn add @scaliolabs/nestjs-kafka kafkajs

Configuring the Module

// app.module.ts

import { KafkaModule } from '@atomik-core'

// Basic Config
config: IIKafkaConfig = {
	brokers: ['127.0.0.1:9092'],
	clientId: 'vitamin-seats',
	groupId: 'my-group-id',
}

// Customized Config
config: IIKafkaConfig = {
	brokers: ['127.0.0.1:9092'],
	clientId: 'vitamin-seats',
	groupId: 'my-group-id',]
	consumerConfig: {
		allowAutoTopicCreation: true,
	},
	producerConfig: {
		allowAutoTopicCreation: true,
	},
	kafkaConfig: {
		requestTimeout: 5000
	}
}

@Module({
	imports: [
		KafkaModule.register(config),
	],
})
export class AppModule {}

Additional options can be found in the KafkaJS documentation.

Setup Typings

We suggest, as a best practice, to keep the messages you pass to Kafka and your topics strongly typed, so we encourage you to setup your typings and integrating with our package namespace.

/**
 * Topics
 */

export enum TopicsEnum {
	MY_TOPIC = 'my-topic',
}

/**
 * Message Types
 */

export type IMyMessage = { name: string }

Producing Messages

To send messages to Kafka, you need to inject the KafkaProducerService into your service constructor, create a KafkaPayload and send it.

The sendMessage functions accepts one or multiple messages.

@Injectable()
export class MyService {
	constructor(private readonly producer: KafkaProducerService) {}

	someMethod() {
		const message = KafkaPayload.create<IMyMessage>({
			body: { name: 'Franz' },
		})

		this.producer.sendMessage(TopicsEnum.MY_TOPIC, message)
	}
}

When creating a message, you can provide some advanced options to tailor your usage to specific needs. All of those information will also be available when consuming that same message.

| Property | Type | Description | | -------------------------- | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------- | | body | T | The message payload of generic type T. | | messageType (optional) | string | A string representing the type of the message. This is useful if you want to have different types of messages in the same topic. | | key (optional) | string | A string representing the key of the message. This is used to guarantee that all messages with the same key will be sent to the same partition. | | partition (optional) | number | A number indicating the partition number where the message will be sent. | | headers (optional) | Record<string, any> | An object containing key-value pairs representing the headers of the message. This is used to add metadata to the message. |

Consuming Messages

To consume messages, just must annotate your class with @KafkaListener() and your method with the decorator @ListenTopic(topicName, options). Your function will then receive two parameters: A IKafkaConsumedMessage and a IKafkaContext, that can be used to further interact with your message processing.

@Injectable()
@KafkaListener()
export class MyService {
	constructor(private readonly producer: KafkaProducerService) {}

	@ListenTopic('my-topic')
	async processMyTopic({ body }: IKafkaConsumedMessage<IMyMessage>, context: IKafkaContext) {
		console.log(body.name)
	}
}

Decorator Options

The IKafkaDecoratorOptions type defines the configuration options for a Kafka decorator. This interface is used to specify various settings related to Kafka topic consumption.

| Property | Type | Description | | ---------------------------- | --------- | ---------------------------------------------------------------------------------------------------------------- | | fromBeginning | boolean | If set to true, the consumer will start reading messages from the beginning of the topic. Defaults to false. | | useInstanceConsumerGroup | boolean | If set to true, the consumer will use an instance-specific consumer group. Defaults to false. | | maxRetries | number | Quantity of retries this consumer should do. Set to 0 to disable retries. Defaults to 0. | | retryFactor | number | Factor the retry timeout should be multiplied by on each retry. Defaults to 2. | | retryMinTimeout | number | Minimum time in milliseconds to wait before retrying. Defaults to 1000ms. | | retryMaxTimeout | number | Maximum time in milliseconds to wait before retrying. Defaults to 5000ms. | | retryRandomize | boolean | Randomize the retry timeout with a number between 1 and 2. Defaults to true. |

The decorator also accept all properties from original kafkajs message consumption implementation.

KafkaContext Interface

The IKafkaContext interface defines the structure for Kafka context objects. It includes the following properties and methods:

| Property | Type | Description | | ---------------- | ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | topic | string | The name of the Kafka topic. | | partition | number | The partition number within the Kafka topic. | | heartbeat | () => void | A method to send a heartbeat signal. It's useful for long running processors to signal to Kafka that you're still processing, otherwise Kafka might auto remove your consumer from the pool. | | pause | () => () => void | A method to pause the Kafka consumer. This method returns another function that can be called to resume the consumer. | | retryAttempt | number | Number indicating the retry count of this processing. First time executing, this value is 0 |

DLQ Support

Dead Letter Queue (DLQ) support allows you to handle messages that cannot be processed successfully. When a message fails to be processed after a certain number of retries, it can be sent to a DLQ for further inspection and handling. This ensures that problematic messages do not block the processing of other messages and provides a mechanism for dealing with errors in a controlled manner.

To enable DLQ support, you need to configure the KafkaModule with DLQ settings.

Example configuration:

config: IIKafkaConfig = {
	dlqTopic?: (originalTopic: string, messageType?: string) => 'my-dlq-topic',
	//..other config options go here
}

In this example, messages that fail to process will be sent to the my-dlq-topic for further analysis and handling.

Since dlqTopic is a function, you can add conditional logic to decide which DLQ you want to send based on your topic name or message type.

Future Ideas

License

This project is licensed under the MIT License. See the LICENSE file for details.

Support

If you encounter any issues or have questions, feel free to open an issue on our GitHub repository.

Acknowledgements

We would like to thank the contributors of KafkaJS for their excellent work on the Kafka client for Node.js.

Contact

For more information, please contact us at [email protected].