npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-ts

v1.1.2

Published

**KafkaTS** is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.

Downloads

11,065

Readme

KafkaTS

KafkaTS is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.

Supported Kafka versions: 3.6 and later

Installation

npm install kafka-ts

Quick start

Create kafka client

export const kafka = createKafkaClient({
    clientId: 'my-app',
    bootstrapServers: [{ host: 'localhost', port: 9092 }],
});

Consuming messages

const consumer = await kafka.startConsumer({
    groupId: 'my-consumer-group',
    topics: ['my-topic'],
    onBatch: (messages) => {
        console.log(messages);
    },
});

Producing messages

export const producer = kafka.createProducer();

await producer.send([{ topic: 'my-topic', key: 'key', value: 'value' }]);

Low-level API

const cluster = kafka.createCluster();
await cluster.connect();

const { controllerId } = await cluster.sendRequest(API.METADATA, {
    allowTopicAutoCreation: false,
    includeTopicAuthorizedOperations: false,
    topics: [],
});

await cluster.sendRequestToNode(controllerId)(API.CREATE_TOPICS, {
    validateOnly: false,
    timeoutMs: 10_000,
    topics: [
        {
            name: 'my-topic',
            numPartitions: 10,
            replicationFactor: 3,
            assignments: [],
            configs: [],
        },
    ],
});

await cluster.disconnect();

Graceful shutdown

process.once('SIGTERM', async () => {
    await consumer.close(); // waits for the consumer to finish processing the last batch and disconnects
    await producer.close();
});

See the examples for more detailed examples.

Logging

By default KafkaTS logs out using a JSON logger. This can be globally replaced by calling setLogger method (see src/utils/logger.ts)

Retries

By default KafkaTS retries onBatch using an exponential backoff delay up to 5 times (see src/utils/retrier.ts). In case of failure the consumer is restarted.

In case you want to skip failed messages or implement a DLQ-like mechanism, you can overwrite retrier on startConsumer() and execute your own logic onFailure.

Example if you simply want to skip the failing messages:

await kafka.startConsumer({
    // ...
    retrier: createExponentialBackoffRetrier({ onFailure: () => {} }),
});

Partitioning

By default, messages are partitioned by message key or round-robin if the key is null or undefined. Partition can be overwritten by partition property in the message. You can also override the default partitioner per producer instance kafka.createProducer({ partitioner: customPartitioner }).

A simple example how to partition messages by the value in message header x-partition-key:

import type { Partitioner } from 'kafka-ts';
import { defaultPartitioner } from 'kafka-ts';

const myPartitioner: Partitioner = (context) => {
    const partition = defaultPartitioner(context);
    return (message) => partition({ ...message, key: message.headers?.['x-partition-key'] });
};

const producer = kafka.createProducer({ partitioner: myPartitioner });

await producer.send([{ topic: 'my-topic', value: 'value', headers: { 'x-partition-key': '123' } }]);

Motivation

The existing low-level libraries (e.g. node-rdkafka) are bindings on librdkafka, which doesn't give enough control over the consumer logic. The existing high-level libraries (e.g. kafkajs) are missing a few crucial features.

New features compared to kafkajs

  • Static consumer membership - Rebalancing during rolling deployments causes delays. Using groupInstanceId in addition to groupId can avoid rebalancing and continue consuming partitions in the existing assignment.
  • Consuming messages without consumer groups - When you don't need the consumer to track the partition offsets, you can simply create a consumer without groupId and always either start consuming messages from the beginning or from the latest partition offset.
  • Low-level API requests - It's possible to communicate directly with the Kafka cluster using the kafka api protocol.

Configuration

createKafkaClient()

| Name | Type | Required | Default | Description | | ---------------- | ---------------------- | -------- | ------- | ---------------------------------------------------- | | clientId | string | false | null | The client id used for all requests. | | bootstrapServers | TcpSocketConnectOpts[] | true | | List of kafka brokers for initial cluster discovery. | | sasl | SASLProvider | false | | SASL provider | | ssl | TLSSocketOptions | false | | SSL configuration. | | requestTimeout | number | false | 60000 | Request timeout in milliseconds. |

Supported SASL mechanisms

  • PLAIN: saslPlain({ username, password })
  • SCRAM-SHA-256: saslScramSha256({ username, password })
  • SCRAM-SHA-512: saslScramSha512({ username, password })

Custom SASL mechanisms can be implemented following the SASLProvider interface. See src/auth for examples.

kafka.startConsumer()

| Name | Type | Required | Default | Description | | ---------------------- | -------------------------------------- | -------- | ------------------------------- | ------------------------------------------------------------------------------------ | | topics | string[] | true | | List of topics to subscribe to | | groupId | string | false | null | Consumer group id | | groupInstanceId | string | false | null | Consumer group instance id | | rackId | string | false | null | Rack id | | isolationLevel | IsolationLevel | false | IsolationLevel.READ_UNCOMMITTED | Isolation level | | sessionTimeoutMs | number | false | 30000 | Session timeout in milliseconds | | rebalanceTimeoutMs | number | false | 60000 | Rebalance timeout in milliseconds | | maxWaitMs | number | false | 5000 | Fetch long poll timeout in milliseconds | | minBytes | number | false | 1 | Minimum number of bytes to wait for before returning a fetch response | | maxBytes | number | false | 1_048_576 | Maximum number of bytes to return in the fetch response | | partitionMaxBytes | number | false | 1_048_576 | Maximum number of bytes to return per partition in the fetch response | | allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist | | fromTimestamp | bigint | false | -1 | Start consuming messages from timestamp (-1 = latest offsets, -2 = earliest offsets) | | onBatch | (batch: Message[]) => Promise | true | | Callback executed when a batch of messages is received |

kafka.createProducer()

| Name | Type | Required | Default | Description | | ---------------------- | ----------- | -------- | ------------------ | --------------------------------------------------------------------------------------- | | allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist | | partitioner | Partitioner | false | defaultPartitioner | Custom partitioner function. By default, it uses a default java-compatible partitioner. |

producer.send(messages: Message[], options?: { acks?: -1 | 1 })

| Name | Type | Required | Default | Description | | --------- | ---------------------- | -------- | ------- | -------------------------------------------------------------------------------------------------------------------------- | | topic | string | true | | Topic to send the message to | | partition | number | false | null | Partition to send the message to. By default partitioned by key. If key is also missing, partition is assigned round-robin | | timestamp | bigint | false | null | Message timestamp in milliseconds | | key | Buffer | null | false | null | Message key | | value | Buffer | null | true | | Message value | | headers | Record<string, string> | false | null | Message headers |