npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

legman-kafka

v1.0.0

Published

Kafka Consumer and Producer for Legman streams

Downloads

54

Readme

Legman-Kafka

Legman for Kafka is a simple library build for streaming in strictly object mode with Legman. You can consumer Kafka messages as a stream and write it as objects into a Legman stream or produce message objects from a Legman stream into Kafka.

How to use

At first you have to install this module and Legman into your application:

npm i --save legman legman-kafka
# OR
yarn add legman legman-kafka

After that you can import and use Legman in your code.

Using Legman Kafka as a consumer in typescript

import Legman from "legman";
import { LegmanKafkaConsumer } from "legman-kafka";

interface IExampleKafkaPayload {
    action: string;
    id: string;
    // ...
}

const loggerLeg = new Legman({app: "Identifier for my application"});
const kafkaLeg = new LegmanKafkaConsumer(
    ["my_kafka_topic", "another_kafka_topic"],
    {
        noptions: {
            "group.id": "example-consumer",
            "metadata.broker.list": "kafka:9092",
        },
    },
);
kafkaLeg.connect()
    .then(() => console.log("connected to Kafka"))
    .catch((err) => console.error("error while connecting", err));
const processLog = loggerLeg.influx({context: "processing"});

kafkaLeg
    .filter((message: IExampleKafkaPayload) => message.action === "create")
    .on("data", async (message: IExampleKafkaPayload) => {
        const logger = processLog.influx({correlationId: message.id});
        logger.write({msg: "Start processing"});
        await someProcessingFn(message);
        logger.end({msg: "Processing finished"});
    });

Using Legman Kafka as a consumer in JavaScript

const Legman = require("legman");
const { LegmanKafkaConsumer } = require("legman-kafka");

const loggerLeg = new Legman({app: "Identifier for my application"});
const kafkaLeg = new LegmanKafkaConsumer(
    ["my_kafka_topic", "another_kafka_topic"],
    {
        noptions: {
            "group.id": "example-consumer",
            "metadata.broker.list": "kafka:9092",
        },
    },
);
kafkaLeg.connect()
    .then(() => console.log("connected to Kafka"))
    .catch((err) => console.error("error while connecting", err));
const processLog = loggerLeg.influx({context: "processing"});

kafkaLeg
    .filter((message) => message.action === "create")
    .on("data", async (message) => {
        const logger = processLog.influx({correlationId: message.id});
        logger.write({msg: "Start processing"});
        await someProcessingFn(message);
        logger.end({msg: "Processing finished"});
    });

Using Legman Kafka as a producer in TypeScript

import Legman from "legman";
import { LegmanKafkaProducer } from "legman-kafka";

interface IExampleKafkaPayload {
    action: string;
    id: string;
    // ...
}

const kafkaProducerLeg = new LegmanKafkaProducer({
    "my_kafka_topic",
    {
        noptions: {
            "client.id": "example-producer",
            "metadata.broker.list": "kafka:9092",
        },
    },
});
const createLeg = new Legman({action: "create"});
const deleteLeg = new Legman({action: "delete"});

createLeg.pipe(kafkaProducerLeg);
deleteLeg.pipe(kafkaProducerLeg);

kafkaProducerLeg.connect()
    .then(() => console.log("connected to Kafka"))
    .catch((err) => console.error("error while connecting", err));

Using Legman Kafka as a producer in JavaScript

const Legman = require("legman");
const { LegmanKafkaProducer } = require("legman-kafka");

const kafkaProducerLeg = new LegmanKafkaProducer({
    "my_kafka_topic",
    {
        noptions: {
            "client.id": "example-producer",
            "metadata.broker.list": "kafka:9092",
        },
    },
});
const createLeg = new Legman({action: "create"});
const deleteLeg = new Legman({action: "delete"});

createLeg.pipe(kafkaProducerLeg);
deleteLeg.pipe(kafkaProducerLeg);

kafkaProducerLeg.connect()
    .then(() => console.log("connected to Kafka"))
    .catch((err) => console.error("error while connecting", err));

Script tasks

  • transpile: Transpiles the library from TypeScript into JavaScript with type declarations
  • lint: Lints your code against the recommend TSLint ruleset.
  • test: Transpiles, lints and runs software-tests with coverage.
  • leakage: Transpiles, lints and runs software-tests with leakage tests.
  • docker:lint: Runs the lint task in a docker environment.
  • docker:test: Runs the test task in a docker environment.
  • docker:leakage: Runs the leakage task in a docker environment.
  • docker:example: Runs a simple example in a docker environment.

License

This module is under ISC license copyright 2018 by Arne Schubert