kafka-bridge
v0.6.2
Published
npm package kafka-bridge for easy integration and use in your Node.js projects that work together with ApacheKafka
Downloads
3
Readme
npm package "kafka-bridge"
Description
kafka-bridge - это небольшой модуль который предоставляет готовые программные решения для более простого взаимодействия с некоторыми функциями ApacheKafka. В данный момент данный проект подойдет для ведения журналов логов или даже сбора статистики по запросам.
Начнем работу:
Установка npm пакета:
npm i kafka-bridge
Использование:
Создание клиента kafka, через которого можно обратиться к Consumer или Producer
//Импортируем необходимые модули
import { KafkaClient } from 'kafka-bridge';
//Получите необходимые вам пути брокеров
const urls: string[] = ['10.10.10.10:8080','10.10.10.10:8081'];
//Задайте clientId
const clientId: string = 'MyClientId';
//Задайте levelLog для логирования в stdout
//Возможны варианты:
//NOTHING = 0,
//ERROR = 1,
//WARN = 2,
//INFO = 4,
//DEBUG = 5,
const logLevel: number = 5; (default = 4)
const isTopic: boolean = true; //Обозначает флаг, который решает необходимость создания топика, если его нет
//По умолчанию 'false' - топик не будет создан, если его нет
//Создайте экземпляр клиента
const client: KafkaClient = new KafkaClient(urls, clientId, isTopic?, logLevel?);
Используя клиент kafka, вы можете создать Consumer или Producer
Create consumer
//Импортируем необходимые модули
import { ConsumerKafka, KafkaClient } from 'kafka-bridge';
//Создайте экземпляр клиента
const client KafkaClient = new KafkaClient(urls, clientId, logLevel?);
//Создание Consumer:
//Укажите groupId для вашего consumer
const groupId: string = 'MyGroupId';
//Укажите topics для вашего consumer
const topics: string[] = ['myTopic1', 'myTopic2'];
const consumer: ConsumerKafka = client.createConsumer(groupId, topics);
Методы которые можно использовать в Consumer
await consumer.kafkaConsumer(handler: IMessageHandler);
//В отдельном файле создайте
class Handler implements IMessageHandler {
//Опишите необходимую вам обработку
async messageHandler (logData: MessageLog): Promise<void> {
for(let i = 0; i < logData.steps.length; i++) {
logData.steps[i].dataStep.result = JSON.stringify(logData.steps[i].dataStep.result);
}
RequestDB.sendDB(logData);
}
}
export default new Handler();
//Импортируйте созданный класс
import Handler from './handler/handler';
const topics: string[] = ['myTopic1', 'myTopic2'];
const consumer: ConsumerKafka = client.createConsumer(groupId, topics);
await consumer.kafkaConsumer(Handler);
//Данная функция начинает прослушивание всех топиков из topics
await consumer.kafkaConsumerSync(handler: IMessageHandlerSync);
В отличае от 'kafkaConsumer()' 'kafkaConsumerSync()' использует IMessageHandlerSync который не требует await
//В отдельном файле создайте
class Handler implements IMessageHandlerSync {
//Опишите необходимую вам обработку
async messageHandler (logData: MessageLog): void {
concole.log(logData);
}
}
export default new Handler();
import Handler from './handler/handler';
const topics: string[] = ['myTopic1', 'myTopic2'];
const consumer: ConsumerKafka = client.createConsumer(groupId, topics);
await consumer.kafkaConsumerSync(Handler);
await consumer.disconnectConsumer();
const consumer: ConsumerKafka = client.createConsumer(groupId, topics);
await consumer.disconnectConsumer(); // Позволяет отключить consumer вручную при небходимости
//Например:
process.on('SIGINT', async() => {
await consumer.disconnectConsumer();
process.exit(1);
});
Create producer
Параметры метода:
topic: string - Название топика куда producer будет слать сообщения
serverName: string - Название сервера с которого собираются логи
//Импортируем необходимые модули
import { ProducerKafka, KafkaClient } from 'kafka-bridge';
//Создайте экземпляр клиента
const client KafkaClient = new KafkaClient(urls, clientId, logLevel?);
//Создание Producer:
//Укажите topic для вашего producer
const topic: string = 'myTopic1';
const serverName: string = 'myService',
const producer: ProducerKafka = client.createProducer(topic, serverName);
Методы которые можно использовать в Producer
await producer.addLog(logs: Step[], dataStep: DataStep, logLevel: string)
Парамерты метода:
logs - массив с типом Step, который определяется в начале выполнения того или иного алгоритма шаги которого необходимо зафиксировать в этом массиве
Step = {
level: string,
dataStep: DataStep
}
dataStep - данные, которые описывают основлню суть шага выполнения (является частью Step)
DataStep = {
data: string,
result?: any - необязательный параметр
}
logLevel - название уровня шага в зависимости от его контекста (error, info, debug и т.д.)
Функция позволяет вести подробный просмотр каждого шага программы там где это нужно (например: запрос к БД или запрос к другому серверу)
const producer: ProducerKafka = client.createProducer(topic);
const logs: Step[] = [];
try {
const requestDB = pg.query('SELECT * FROM Users');
producer.addLog(logs, { data: 'Выполнение запроса на получение всех пользователей', result: requestDB }, 'debug')
} catch(error) {
producer.addLog(logs, { data : 'Ошибка получения данных из таблицы Users' result: error, 'error' } , error)
} finally {
producer.addLog(logs, { data: 'Закрытие соединения с БД' }, 'debug')
}
//В результате logs может быть следующим:
// logs = [
// {
// "level": "debug",
// "dataStep": [
// {
// "data": "Выполнение запроса на получение всех пользователей",
// "result": undefined,
// }
// ],
// },
// {
// "level": "error",
// "dataStep": [
// {
// "data": "Ошибка получения данных из таблицы Users",
// "result": error get data from DB,
// }
// ],
// },
// {
// "level": "debug",
// "dataStep": [
// {
// "data": "Закрытие соединения с БД",
// "result": [],
// }
// ],
// }
// ]
await producer.sendLog(req: any, res: Response, body: any | string, stepsLog: Step[]);
Параметры метода:
req - { Request } from 'express' +
export interface ExtendedRequest extends Request {
time?: any;
}
res - { Response } from 'express'
body - any | string - тело ответа от сервера
stepsLog - Step[] - заполненный данными шагов выполнения алгоритма массив
Функция выполняет заполнение массива шагов выполнения алгоритма stepsLog
import { DataStep, MessageLog, Step } from "kafka-bridge";
const producer: ProducerKafka = client.createProducer(topic, isTopic);
const logs: MessageLog[] = [];
controller(req, res) {
const body = {
error: true,
message: 'Error message',
DATA: {
result: NO
}
warning: 'warning'
}
await producer.sendLog(req, res, body, logs);//Выполнение функции заполненния массива
}
await producer.disconnectProducer();
Позволяет отключить producer вручную при небходимости
//Пример использования
const consumer: ProducerKafka = client.createProducer(topic, isTopic);
process.on('SIGINT', async() => {
await consumer.disconnectProducer(); //Функция отключения
process.exit(1);
});
Версия проекта
v0.6.2