@pestras/micro-kafka
v1.1.1
Published
pestras microservice plugin for kafka messaging support
Downloads
1
Maintainers
Readme
Pestras Micro Kafka
Pestras microservice plugin for akafka messaging support.
install
npm i @pestras/micro @pestras/micro-kafka
Plug In
import { SERVICE, Micro } from '@pestras/micro';
import { MicroKafka } from '@pestras/micro-kafka;
Micro.plugin(new MicroKafka(config));
@SERVICE()
class test {}
Micro.start(Test);
MicroKafka Parameters:
Name | Type | Default | Description ---- | ---- | ---- | ---- kafkaConfig | KafkaConfig | Required | see KafkaJs Client consumerConfig | ConsumerConfig | null | see KafkaJs Consumer ConsumerRunConfig | ConsumerRunConfig | null | see KafkaJs Consumer
Consumer:
Micro-kafka helps us consume topics by using TOPIC Decorator just like the following
import { SERVICE, Micro } from '@pestras/micro';
import { MicroKafka, TOPIC, EachMessagePayload } from '@pestras/micro-kafka';
let kafka = new Microakafka(config);
Micro.plugin(kafka);
@SERVICE()
class test implements MicroKafkaEvents {
@TOPIC("topicName", { subscribeOptions: { fromBeginning: true } })
topicHandler(data: EachMessagePayload) {
// our code
}
}
Micro.start(test);
We can run multiple hooks before handling the topic.
@SERVICE()
class test implements MicroKafkaEvents {
// hooks can be async
async auth(data: EachMessageData, handlerName: string) {
// some code
// returning false will terminate the workflow
return true;
}
validate(data: EachMessageData, handlerName: string) {
// validation code
return true;
}
@TOPIC("topicName", {
hooks: ["auth", "vallidate"]
subscribeOptions: { fromBeginning: true }
})
topicHandler(data: EachMessagePayload) {
// our code
}
}
Hooks must return or resolve (async) to true on success or false on failure.
Multible Topics
Multible topics can be used on the same handler.
@SERVICE()
class test implements MicroKafkaEvents {
@TOPIC("topicName", { subscribeOptions: { fromBeginning: true } })
@TOPIC("otherTopic")
topicHandler(data: EachMessagePayload) {
// our code
}
}
Micro.start(test);
Sub Services
// comments.service.ts
import { TOPIC } from '@pestras/micro-nats';
import { Client, Payload} from 'ts-nats';
export class Comments {
validate(data: EachMessagePayload, handlerName: string) { return true }
@TOPIC('newComment', {
// auth hook from the main service
// validate hook from the local service (sub service)
hooks: ['auth', 'validate']
})
create(data: EachMessagePayload) {
}
}
// main.ts
Micro.plugin(new MicroNats());
@SERVICE()
class Articles {
onInit() {
Micro.store.someSharedValue = "shared value";
}
async auth(data: EachMessagePayload, handlerName: string) {
return true;
}
validate(data: EachMessagePayload, handlerName: string) {
return true;
}
@TOPIC('newArticle', {
// both hooks from the main service
hooks: ['auth', 'validate']
})
create(data: EachMessagePayload) {
}
}
// pass sub services as an array to the second argument of Micro.start method
Micro.start(Articles, [Comments]);
- Local hooks has the priority over main service hooks.
- Subservices have their own lifecycle events.
Create Producer:
By default micro-kafka does not create producer unless we ask for it.
import { SERVICE, Micro } from '@pestras/micro';
import { MicroKafka, MicroKafkaEvents, TOPIC, EachMessagePayload } from '@pestras/micro-kafka';
let kafka = new Microakafka(config);
kafka.createProducer(producerConfig);
Micro.plugin(kafka);
@SERVICE()
class test implements MicroKafkaEvents {
@TOPIC('newArticle', {
// both hooks from the main service
hooks: ['auth', 'validate']
})
create(data: EachMessagePayload) {
// kafka.producer.send(...)
}
}
Micro.start(Test);
lifecycle Events
onCunsomerConnected:
Called when kafka consumer is connected successfully
import { SERVICE, Micro } from '@pestras/micro';
import { MicroKafka, MicroKafkaEvents } from '@pestras/micro-kafka';
let kafka = new Microakafka(config);
Micro.plugin(kafka);
@SERVICE()
class test implements MicroKafkaEvents {
onCunsomerConnected() {}
}
onProducerConnected:
Called when kafka consumer is connected successfully
import { SERVICE, Micro } from '@pestras/micro';
import { MicroKafka, MicroKafkaEvents, Producer } from '@pestras/micro-kafka';
let kafka = new Microakafka(config);
kafka.createProducer(producerConfig);
Micro.plugin(kafka);
@SERVICE()
class test implements MicroKafkaEvents {
onProducerConnected(producer: Producer) {
}
}
Thank you.