kafdecor-js
v1.0.8
Published
Uma biblioteca para integração com Kafka usando decoradores e configuração modular.
Downloads
10
Readme
KafDecorJS
Uma biblioteca para integração simples e eficaz do Kafka. Esta biblioteca permite que você consuma mensagens do Kafka usando decoradores e funções, facilitando a configuração e o gerenciamento de consumidores Kafka em projetos Node.js com Express.
Conteúdo
- Introdução
- Instalação
- Configuração do Servidor Express
- Uso de Decoradores para Processamento de Mensagens
- Registro de Funções para Processamento de Mensagens
- Exemplo Completo
- Exemplo com Commit Manual de Offset
- Contribuição
- Licença
Introdução
Esta biblioteca facilita a integração do Kafka, permitindo que você use decoradores e funções para processar mensagens Kafka de maneira simples e organizada. Ideal para desenvolvedores que desejam uma configuração rápida e uma API intuitiva para consumir mensagens Kafka.
Instalação
Para começar a usar a biblioteca, instale as dependências necessárias com o npm:
npm install kafdecor-js kafkajs
Ou com o yarn:
yarn add kafdecor-js kafkajs
Configuração do Servidor Express
Crie um servidor Express básico e configure a integração com o Kafka:
import express from "express";
import { KafkaRegistry, IKafkaMessage } from "kafdecor-js";
const app = express();
const port = 3002;
app.listen(port, async () => {
console.log(`Servidor rodando em http://localhost:${port}`);
await KafkaRegistry.start({
brokers: ["localhost:9092"],
clientId: "express-test",
});
});
Uso de Decoradores para Processamento de Mensagens
Use decoradores para associar métodos de uma classe a tópicos Kafka:
import { KafkaListener, IKafkaMessage } from "kafdecor-js";
export class Controller {
@KafkaListener({ topic: "test-topic", groupId: "test-group" })
static handleMessage(payload: IKafkaMessage) {
console.log("Mensagem recebida do Kafka (test-topic):", payload.message);
}
}
Registro de Funções para Processamento de Mensagens
Você também pode registrar funções diretamente para processar mensagens:
import { KafkaRegistry, IKafkaMessage } from "kafdecor-js";
function exampleFunctionKafka(payload: IKafkaMessage) {
console.log(
"Mensagem recebida do Kafka (test-function-topic):",
JSON.stringify(payload.message)
);
}
app.listen(port, async () => {
console.log(`Servidor rodando em http://localhost:${port}`);
KafkaRegistry.register(exampleFunctionKafka, {
groupId: "test-function-groupId",
topic: "test-function-topic",
});
await KafkaRegistry.start({
brokers: ["localhost:9092"],
clientId: "express-test",
});
});
Exemplo Completo
Aqui está um exemplo completo combinando tudo o que foi discutido:
import express from "express";
import { KafkaRegistry, KafkaListener, IKafkaMessage } from "kafdecor-js";
const app = express();
const port = 3002;
export class Controller {
@KafkaListener({ topic: "test-topic", groupId: "test-group" })
static handleMessage(payload: IKafkaMessage) {
console.log("Mensagem recebida do Kafka (test-topic):", payload.message);
}
}
function exampleFunctionKafka(payload: IKafkaMessage) {
console.log(
"Mensagem recebida do Kafka (test-function-topic):",
JSON.stringify(payload.message)
);
}
app.listen(port, async () => {
console.log(`Servidor rodando em http://localhost:${port}`);
KafkaRegistry.register(exampleFunctionKafka, {
groupId: "test-function-groupId",
topic: "test-function-topic",
});
await KafkaRegistry.start({
brokers: ["localhost:9092"],
clientId: "express-test",
});
});
Exemplo com Commit Manual de Offset
Neste exemplo, adicionamos o commit manual de offset, útil para garantir que a mensagem só seja marcada como consumida após o processamento ser concluído com sucesso:
import express, { Request, Response } from "express";
import { KafkaRegistry, IKafkaMessage } from "kafdecor-js";
const app = express();
const port = 3002;
// Exemplo usando função de callback com commit manual
function create(payload: IKafkaMessage) {
console.log(
"Mensagem recebida do Kafka (test-function-topic):",
payload.message.value
);
// Fazendo o commit manual do offset
payload.ctx.consumer.commitOffsets([
{
topic: payload.ctx.topic, // O tópico da mensagem
partition: payload.ctx.partition, // A partição da mensagem
offset: (parseInt(payload.message.offset) + 1).toString(), // O próximo offset
},
]);
}
app.listen(port, async () => {
// Registra a função com commit manual de offset
KafkaRegistry.register(create, {
topic: "test-function-topic",
groupId: "group-id",
});
await KafkaRegistry.start({
brokers: ["localhost:9092"],
clientId: "express-test",
config: {
autoCommit: false, // Desativa o auto-commit automático
},
});
console.log(`Servidor rodando em http://localhost:${port}`);
});
Contribuição
Se você deseja contribuir para a biblioteca, sinta-se à vontade para abrir uma issue ou enviar um pull request. Toda ajuda é bem-vinda!
Licença
Este projeto está licenciado sob a MIT License - veja o arquivo LICENSE para mais detalhes.