bf-lib-kafkacouch
v1.0.12
Published
Block-5 Kafka Couch Library
Downloads
1
Readme
Description
This Library should handle all interactions between kafka and couch
Data Asumptions,
It Monitor's created and updated,
- Created assume's no document exists, and creates a new one
- Updated assumes Document exits
- message has an id field, that is the id for the document
#Usage
import { taskSchema } from "bf-legal-task";
import { KafkaCouchWatcher } from "lib-kafkacouch";
const TaskWatcher = new KafkaCouchWatcher({
subject: "task",
couch: `workflow-task_management`,
kafka: { topic: `WORKFLOW.TASK_MANAGEMENT`, groupId: "app-asdf" },
bodyValidation: taskSchema,
id: "id",
filter: (body, type, header) => {
return true;
},
map: (body, type, header) => {
return body;
}
});
//After a commit happens
TaskWatcher.on("updated", async (body, type, header) => {
console.log(packet);
});
Without couch ( for command's ext... )
import { taskSchema } from "bf-legal-task";
import { KafkaCouchWatcher } from "lib-kafkacouch";
const TaskWatcher = new KafkaCouchWatcher({
subject: "task",
couch: false,
kafka: { topic: `WORKFLOW.TASK_MANAGEMENT`, groupId: "app-asdf" },
bodyValidation: taskSchema,
filter: (body, type, header) => {
return true;
},
map: (body, type, header) => {
return body;
}
});
//After a commit happens
TaskWatcher.on("updated", async (body, type, header) => {
console.log(packet);
});
Config Stuff
Couch by default get's it's creds from enviromental variables
const COUCH_USER: string = process.env.COUCH_USER || "admin";
const COUCH_PASSWORD: string = process.env.COUCH_PASSWORD || "password";
const COUCH_PORT: string = process.env.COUCH_PORT || "5984";
const COUCH_HOST: string = process.env.COUCH_HOST || "couchdb";
const COUCH_PROTO: string = process.env.COUCH_PROTO || "http";
KafkaCouchWatcher.kafka
| Key | Description | Required | | ------------------------ | ----------------------------------- | -------- | | topic | Kafka Topic | true | | groupId | Group ID ( for Round Robin Groups ) | true | | host | Kafka Host | false | | sessionTimeout | | false | | protocol | | false | | fromOffset | | false | | protocol | | false | | commitOffsetsOnFirstJoin | | false | | outOfRangeOffset | | false | | onRebalance | | false |
const k = {
topic: `WORKFLOW.TASK_MANAGEMENT`,
host: process.env.ZOOKEEPER_HOST || "zookeeper:2181",
groupId: process.env.KAFKA_GROUP_ID || "default-groupa",
sessionTimeout: 15000,
protocol: ["roundrobin"],
fromOffset: "earliest",
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: "earliest",
onRebalance: (isAlreadyMember: any, callback: any) => {
callback();
} // or null
};