ngft-kafka-nestjs
v1.0.1
Published
Wrapper around Kafkajs for NGFT services
Downloads
20
Readme
NGFT Kafka-NestJS
Get started
Before you can use the kafka-nestjs module you need to install it first. Make sure that you can access Google Artifact registry.
Install the module:
npm i @ngft/kafka-nestjs
Example / Lib Tester App
see ./example
Module setup
In you AppModule
(or another module where you want to use it) you need to import it.
@Module({
imports: [
// ..
KafkaModule.forRoot(/* configurations */),
// ..
],
// ...
})
export class AppModule {}
The KafkaModule
expects these configurations to run correctly:
- General Kafka Config
- Schema Registry Config
- Consumer Kafka Config
- Producer Kafka Config
Required Configuration
General Kafka Config
see interface KafkaConfig
| Property | Description | | -------- | ----------------------------------------------------- | | brockers | An array of brockers you want to connect to | | isActive | Enable/Disable the whole kafka service | | ssl | Whether you want to use tls encryption | | [...] | ... all valid kafka.js KafkaConfig attributes |
Schema Registry Config
see interface KafkaRegistryConfig
| Property | Description | | -------- | ----------------------------------------------------- | | host | schema registry uri |
Consumer Kafka Config
see interface KafkaConsumerConfig
| Property | Description | | -------- | ------------------------------------------------ | | isActive | Enable/Disable event consumption | | groupId | service group identifier | | debug | Verbose mode to output every Event Received | | from Beginning| starts consuming from beginning of the topic | | isBlocking | Retries an event in case of an exception | | [...] | ... all valid kafka.js ConsumerConfig attributes |
Producer Kafka Config
see interface KafkaProducerConfig
| Property | Description | | -------- | ----------------------------------------------------- | | isActive | Enable/Disable event publishing | | debug | Verbose mode to output every Event Published | | [...] | ... all valid kafka.js ProducerConfig attributes |
Hint: Alternatively you can provide your existing kafkaConfig as configuration argument for KafkaModule
@Module({
imports: [
// ..
KafkaModule.forRoot(
kafkaConfig,
registryConfig,
kafkaProducerConfig,
kafkaConsumerConfig
),
// ..
],
// ..
})
export class AppModule {}
Usage
Event Consumption
- Extend the Consumer class with
AbstractKafkaHandler
- Call super() in the class constuctor
- Apply the Annotation
@KafkaEventHandler
to a method@KafkaEventHandler
registers the method for all specified kafka event types@KafkaEventHandler
can get used multiple times- First Argument is the desired
topic
- Second Argument is the desired
event type
- Method will be called with the following parameters depending on the event type:
- ngft (schema-based):
NgftMessageHandlerPayload<T extends KafkaPayload>
- ngft (schema-based):
@Injectable()
export class AircraftConsumer extends AbstractKafkaHandler {
constructor() {
super()
}
@KafkaEventHandler(kafkaTopic.aircraft, AIRCRAFT_CREATED)
@KafkaEventHandler(kafkaTopic.aircraft, AIRCRAFT_UPDATED)
async onAircraftUpsert(
data: KafkaEvent<AircraftUpsert>,
metadata: KafkaMetadata,
rawMessage: RawKafkaMessage,
): Promise<void> {
console.log(data)
}
@KafkaEventHandler(/event.*/, /.*/)
async catchAllNgftEvents(
data: KafkaEvent<AircraftUpsert>,
// schema based events will also provide the schemaId in metadata
metadata: SchemaBasedKafkaMetadata,
rawMessage: RawKafkaMessage,
): Promise<void> {
console.log(data)
}
}
Event Publishing
@Injectable()
export class Service {
constructor(
private readonly producer: KafkaProducerService,
) {
const dummydata = {
id: 123,
name: 'Hi there',
createdAt: new Date(),
};
const event = {
meta: {
topicKey: 'event.my-shiny-topic',
eventType: 'my-event-type',
createdAt: dummydata.toISOString()
eventKey: 'my-event-identifier',
},
data: dummydata,
} as KafkaPublishableMessage;
this.producer.sendMessage(event);
}
}
Release Cycle
kafka-nestjs is automagically releases using semantic-release
.
Whenever a commit is merged into main
it will be released according to the detected changes.