A middleware that validates Kafka topic's messages for Circe.
A middleware that validates Kafka topic's messages for Circe.
npm install @parkhub/circe
It wraps node-rdkafka with some friendly features like middleware execution, parsing of messages received and to protect from any future breaking changes.
Some of the options are high-level. So make sure to brush up on your Kafka knowledge before trying out any of those features!
Consumer API
createConsumer({ connection, groupId, topicCfgs, globalCfgs })
- connection(String, REQUIRED) The connection string to find the kafka service
- groupId(String, REQUIRED) The Consumer Group this application belongs to.
- topicCfgs(Object, optional) Topic configurations to be applied to any topics subscribed via this consumer.
- globalCfgs(Object, optional) Global configurations to be applied to
this consumer.
- DEFAULTS: { event_cb: true }
import circe from '@parkhub/circe';
// Using async/await
(async function startConsumer() {
const consumer = await circe.createConsumer({
connection: 'kafka:9092',
groupId: 'MY_GROUP_ID'
consumer.subscribe({ topic: 'MY_TOPIC' });
const handler = message => console.log(message);
consumer.consume({ handler });
await consumer.disconnect();
// Using Promises
circe.createConsumer({ connection: 'kafka:9092', groupId: 'MY_GROUP_ID'})
.then(consumer => {
consumer.subscribe({ topic: 'MY_TOPIC' });
const handler = message => console.log(message);
consumer.consume({ handler });
return consumer.disconnect();
.then(() => console.log('DONE!'))
.catch(err => console.log('ERROR!', err));
Consumer Client API
subscribe({ topic }) - Will subscribe this consumer to a topic BUT WILL NOT begin consuming them. You cannot subscribe to multiple topics by calling this method more than once. You must call .unsubscribe() first.
- topic(String[]|String, REQUIRED) - The topic(s) this consumer should subscribe to.
consumer.subscribe({ topic: 'MY_TEST_TOPIC' });
// OR
consumer.subscribe({ topic: ['MY_TEST_TOPIC', 'MY_OTHER_TOPIC'] });
consume({ handler, middleware }) - Will begin consuming the subscribed topics using the handler and middleware(if configured)
- handler(Function, REQUIRED) - A function that accepts a message object that matches the rdkafka structure EXCEPT the "value" property which will instead be a "message" property with the Buffer from the value property already parsed. If it's a JSON string value, you will receive a JS object.
- middleware(Function[], optional) - Functions that match the format of a valid circe middleware functions. Check out the repo for predefined middleware or create your own! (NOTE: The order of the middleware matters and the arity of the middleware much match the one after it) These middleware will get the MESSAGE consumed.
- Pre-middleware - these get applied BEFORE any of your middleware
- buffer to message - Transforms the value property into a message property and parses it
- Pre-middleware - these get applied BEFORE any of your middleware
const handler = (message) => console.log(message);
const middleware = [(message, next) => {
console.log('inside middleware', message);
consumer.consume({ handler, middleware });
disconnect() - RETURNS a PROMISE. The promise will fulfill once the client is fully disconnected.
.then(() => console.log('DISCONNECTED'))
.catch((err) => console.log('Error disconnecting', err));
unsubscribe() - Unsubscribes from a topic(s) (NOTE: This only unsubscribes from the topics BUT if you were to subscribe to some other topics, IT WILL USE THE SAME HANDLER ALREADY DEFINED. This feels like a short-coming of the base library and there is an issue pending response. If you need to redefine a new consumer with new topics and a new handler, it's best to disconnect and create a new one consumer client.)
consumer.subscribe({ topic: 'TOPIC' });
consumer.consume({ handler: ({ topic }) => console.log(topic) }); // Any messages to the topic are now being handled.
consumer.subscribe({ topic: 'ANOTHER_TOPIC' }); // Now the handler defined is going to only trigger for this topic!
addListener(...args) - Add a listener to this consumer. You can use any of the events available for the consumer listed in node-rdkafka documentation. (NOTE: Some of these events require a configuration setting to be set when creating your client)
consumer.addListener('event.log', result => console.log(result));
Producer API
import circe from '@parkhub/circe';
// Using async/await
(async function startProducer() {
const producer = await circe.createProducer({
connection: 'kafka:9092'
publishCfgs: {
message: {
test: 'message'
await producer.disconnect();
// Using Promises
.createProducer({ connection: 'kafka:9092' })
.then((producer) => {
publishCfgs: {
message: {
test: 'message'
return producer.disconnect();
.then(() => console.log('DONE!'))
.catch(err => console.log('ERROR!', err));
publish({ publishCfgs, middleware }) - Publish a message to a new topic
- publishCfgs(Object, REQUIRED) - Configurations used to publish your message
- topic(String, REQUIRED) - Topic to publish this message to
- message(String|Object, REQUIRED) - Message to publish
- partition(number, optional) - A specific partition for the message.
- key(String, optional) - Specify a key for this message
- timestamp(number, optional) - Timestamp(EPOCH in milliseconds, ie Date.now()) to set for this message
- opaqueToken(token, optional) - An opaque token that gets passed along your delivery reports.
- middleware(Function[], optional) - Functions that match the format of a valid circe middleware function. Check out the repo for predefined middleware or create your own! (NOTE: The order of the middleware matters and the arity of the middleware much match the one after it) These will receive the publishCfgs.
- Post-middleware - These get applied AFTER any defined middleware you use
- Message to Buffer - Transforms your message into a buffer
- timestamp - attaches a timestamp(if none is already defined)
- Post-middleware - These get applied AFTER any defined middleware you use
publishCfgs: {
topic: 'TEST_TOPIC',
message: {
test: 'message'
timestamp: Date.now()
middleware: [(publishCfgs, next) => {
console.log('publish configurations', publishCfgs);
disconnect() - RETURNS a PROMISE. The promise will fulfill once the client is fully disconnected.
.then(() => console.log('DISCONNECTED'))
.catch((err) => console.log('Error disconnecting', err));
addListener(...args) - Add a listener to this producer. You can use any of the events available for the producer listed in node-rdkafka documentation. (NOTE: Some of these events require a configuration setting to be set when creating your client)
producer.addListener('event.log', result => console.log(result));