pc-seneca-kafka-transport
v1.1.12
Published
A traqnsport plugin for seneca to send and receive messages via kafka topics
Downloads
6
Maintainers
Readme
seneca-kafka-transport
This plugin allows to send messages to kafka topic and listen for messages from kafka topics.
Communication is async by design so no response is ever expected.
Under the wood it uses https://github.com/oleksiyk/kafka to communicate with kafka
Sending messages
Messages will be sent as JSON string to the Kafka topic specified in the client.
Only save the actual message will be sent with no seneca metadata!
Receiving messages
We define a Kafka topic we are interested and a groupId so Kafka can keep track of our service offset for the specified topic.
We also define a pin to indicate the message pattern we are interested.
This is because we do not expect a 1:1 mapping between kafka topic and seneca patterns.
Every time a message is received we will always first (at most one pattern) acknowledge the receipt to kafka (which will increase our groupId offset) and then if the message content matches the pattern pinned the plugin will simply call the seneca.act with the messages.
If maxTime
Usage
This is the most basic usage and will expect kafka on localhost on its default port 9092
Consumer
require('seneca')()
.use('pc-seneca-kafka-transport')
.add('cmd:register,type:user', (msg, reply) => {
// TRIGGER BUSINESS LOGIC FOR NEW USER REGISTRATION
reply()
})
.listen({
type: 'kafka',
pin: 'cmd:register,type:user',
kafkaTopic: 'user'
});
A consumer will open a consumer connection to the topic specified and will consume any message that matches the pin
Producer
const client = require('seneca')()
.use('pc-seneca-kafka-transport')
.client({
type: 'kafka',
pin: 'cmd:register,type:user',
kafkaTopic: 'user'
});
A producer will send any messages that matches the pin to the configured kafka topic
Global kafka Options
By default this plugin will try to connect to a kafka broker on localhost:9092. It is using under the wood https://github.com/oleksiyk/kafka so please refer there for details
To set global kafka options you can pass an options object to the .use declaration that contains the following properties:
{
global:{},
producer:{},
consumer:{}
}
global can be any property that is common to both producer and consumer!
require('seneca')()
.use('pc-seneca-kafka-transport', {
global:{
connectionString: 'kafka-host1:9092,kafka-host2:9092'
}
});
Transport Options
Kafka producer requires the topic to connect to and the pin pattern of the message to send to this topic.
You can pass any options supported by the no-kafka library.
The producer obj will be merged to the kafka global options provided in the use. call
const client = require('seneca')()
.use('pc-seneca-kafka-transport')
.client({
type: 'kafka',
pin: 'cmd:register,type:user',
kafkaTopic: 'user',
producer: {
}
});
Kafka Consumer requires the topic to connect to and the pin pattern of the message it is listening to.
You can pass any options supported by the no-kafka library.
The consumer obj will be merged to the kafka global options provided in the use. call
if discardIfOlderThan is set, whenever the kafka listener receives a message, if the message has a timestamp property and it is older than the configured value the message will be discarded.
if discardIfOlderThan is not set or the message does not have a timestamp property all message will be processed normally.
discardIfOlderThan uses https://www.npmjs.com/package/timestring so it supports any keywords supported by timestring.
const server = require('seneca')()
.use('pc-seneca-kafka-transport')
.listen({
type: 'kafka',
pin: 'cmd:register,type:user',
kafkaTopic: 'user',
consumer: {
groupId: 'userManager'
},
discardIfOlderThan: '5 minutes'
});
It's important to understand that every message received from specified topic will be always acknowledge to kafka and only if the message pattern matched the specified pin will trigger a corresponding eneca act
This means that you can't have multiple listen with different pins listening on the same topic as they will interfere each other and you can lose messages.
Instead you should define a wildcard pin that will handle all the topics and call the appropriate act to process the message
If wildcard is not enough you can simply pass an array of pins. In this case use the prop pins instead of pin.
Another option, if you have multiple listen calls, is to make sure each one define its own groupId
Please note also, that if you intend to have multiple instance of the same consumer service for HA you need to make sure to set the groupId to avoid consuming the same message multiple times
const server = require('seneca')()
.use('pc-seneca-kafka-transport')
.listen({
type: 'kafka',
pin: 'cmd:register,type:*',
kafkaTopic: 'user',
consumer: {
groupId: 'userManager'
}
});
Issues
At the moment we are using no-kafka whose producer implementation has a bug and does not set the message timestamp We are bypassing the issue by adding the timestamp to the payload instead.