npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-sagas

v17.1.0

Published

Build sagas that consume from a kafka topic

Downloads

3,262

Readme

Kafka Sagas 🌼

Kafka-sagas is a package that allows you to use eerily similar semantics to Redux-Sagas built on top of KafkaJS. With Kafka-Sagas, Kafka topics are treated as streams that a saga can dispatch actions into, as well as tapped for particular actions to initiate a saga.

Usage

1. Install

npm install --save kafka-sagas

2. Make sure peer dependencies are installed

npm install --save kafkajs

API Reference

What's A Saga?

A saga is a generator function that receives a payload from a topic and runs some effects as a response. Effects performed by the saga will all be executed within the same transaction as the initiating action.

Example:

const topics = {
    BEGIN: 'BEGIN',
    STARTED: 'STARTED',
    COMPLETED: 'COMPLETED',
    FAILED: 'FAILED'
};

const saga = function*<Payload>(
    {
        topic,
        transaction_id,
        payload
    }: {
        topic: string;
        transaction_id: string;
        payload: Payload;
    },
    context
) {
    const {effects} = context;

    console.log(`${topic} message received`, {
        transaction_id
    });

    try {
        yield effects.put(topics.STARTED, payload); // This will put send an action to the STARTED topic with our current transaction_id.

        const result = yield effects.callFn(async function() {
            const {data} = await axios.post('/status');
            return data;
        });

        yield effects.put(topics.COMPLETED, result); // This will put send an action to the COMPLETED topic with our current transaction_id.

        console.log(`${topic} message processed`, {
            transaction_id
        });
    } catch (error) {
        yield effects.put(topics.FAILED, {
            // This will put send an action to the FAILED topic with our current transaction_id.
            error: {
                name: error.name,
                message: error.message,
                stack: error.stack
            }
        });
    }
};

What's A Consumer?

A consumer, in this realm, is a Kafka consumer. You may choose to have one or many consumers within a single group. In order to do so, simply create another TopicSagaConsumer with the same topic.

What's An Action?

An action is an event sent to a saga consumer that includes information about the topic, transactionId, and a payload. Under the hood, actions are just specialized kafka messages.

What's an Effect?

An effect is a side-effect a saga may perform within a transaction. Effects may be either intercepted by or stubbed out by using middleware.

What's A Transaction?

A transaction is a string of events that share a transaction_id. By being in the same transaction, we are able to create consumers under-the-hood to other topics while only receiving messages from those topics that are in the current transaction we are working within.

Advanced

Communication between sagas

The following diagram illustrates how 3 independently deployed sagas can interact and react to each other. 3 sagas communicate

Production speed

Due to this bug, the underlying producer batches messages into sets of 10,000 and sends a batch of 10,000 messages per second. This isn't currently configurable, but it is my understanding that this should be no trouble for a Kafka cluster. This means PUT effects may take up to a second to resolve. See the ThrottledProducer class to understand the finer workings of the producer.

Auto Topic Creation

By default, a TopicSagaConsumer will automatically create a topic if it attempts to subscribe to nonexistent one. If you would like to control how topics are created by both the primary consumer and underlying consumers and producers, instantiate the TopicSagaConsumer with your own TopicAdministrator instance.

The following example creates three topics with 10 partitions each:

const topic = 'some_topic_that_does_not_exist_yet';

const topicAdministrator = new TopicAdministrator(kafka, {
    numPartitions: 10
});

const topicConsumer = new TopicSagaConsumer({
    kafka,
    topic,
    topicAdministrator,
    *saga(_, {effects: {put, actionChannel}}) {
        /**
         * A new topic (with 10 partitions) is created here using the provided topicAdministrator.
         */
        yield put('some_other_non_existent_topic');

        /**
         * A new topic (again, with 10 partitions) is created here as well.
         */
        const channel = yield actionChannel('a_third_nonexistent_topic');
    }
});

/**
 * The some_topic_that_does_not_exist_yet topic is created during the consumer startup.
 */
await topicConsumer.run();

The topics in the above example will be created in the following order, since the saga won't execute until messages are flowing in:

  1. some_topic_that_does_not_exist_yet
  2. some_other_non_existent_topic
  3. a_third_nonexistent_topic

Concurrency

By instantiating multiple TopicSagaConsumer instances, you are able consume from the same topic concurrently, given there are partitions to support the number of consumers. This is a scenario you would encountere if you were running multiple Kubernetes pods each of which instantiate a single consumer. In the future, concurrency as a config will be available.