npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

kafka-crab-js

v1.0.3

Published

<div align="center">

Downloads

170

Readme

🦀 Kafka Crab JS 🦀

A lightweight, flexible, and reliable Kafka client for JavaScript/TypeScript. It is built using Rust and librdkafka, providing a high-performance and feature-rich Kafka client.

npm version License: MIT

Features

  • 🦀 Simple and intuitive API
  • 🚀 High-performance message processing
  • 🔄 Automatic reconnection handling
  • 🎯 Type-safe interfaces (TypeScript support)
  • ⚡ Async/await support
  • 🛠️ Configurable consumer and producer options
  • 📊 Stream processing support
  • 📦 Message batching capabilities
  • 🔍 Comprehensive error handling

Table of Contents

  1. Installation
  2. Quick Start
  3. Consumer Examples
  4. Producer Examples
  5. Stream Processing
  6. Configuration
  7. Best Practices
  8. Contributing
  9. License

Installation

npm install kafka-crab-js
# or
yarn add kafka-crab-js

Quick Start

Basic Consumer Setup

import { KafkaClient } from 'kafka-crab-js';
async function run() {
  const kafkaClient = new KafkaClient({
    brokers: 'localhost:29092',
    clientId: 'foo-client',
    logLevel: 'debug',
    brokerAddressFamily: 'v4',
  });

  // Create consumer
  const consumer = kafkaClient.createConsumer({
    groupId: 'foo-group',
  });

  await consumer.subscribe([{ topic: 'foo' }]);

  const message = await consumer.recv();
  const { payload, partition, offset } = message;
  console.log({
    partition,
    offset,
    value: payload.toString()
  });

  consumer.unsubscribe();

}

await run();

Basic Producer Setup

import { KafkaClient } from 'kafka-crab-js';

const kafkaClient = new KafkaClient({
  brokers: 'localhost:29092',
  clientId: 'my-client-id',
  logLevel: 'info',
  brokerAddressFamily: 'v4',
});

const producer = kafkaClient.createProducer({ configuration: { 'message.timeout.ms': '5000' } });

const message = {
  id: 1,
  name: "Sample Message",
  timestamp: new Date().toISOString()
};

const result = await producer.send({
  topic: 'my-topic',
  messages: [{
    payload: Buffer.from(JSON.stringify(message))
  }]
});

const errors = result.map(r => r.error).filter(Boolean);
if (errors.length > 0) {
  console.error('Error sending message:', errors);
} else {
  console.log('Message sent. Offset:', result);
}

Stream Processing

Stream Consumer Example

import { KafkaClient } from 'kafka-crab-js';

const kafkaClient = new KafkaClient({
  brokers: 'localhost:29092',
  clientId: 'my-client-id',
  logLevel: 'info',
  brokerAddressFamily: 'v4',
});

const kafkaStream = kafkaClient.createStreamConsumer({
  groupId: `my-groud-id`,
  enableAutoCommit: true,
});

await kafkaStream.subscribe([{ topic: 'foo' }, { topic: 'bar' }])

kafkaStream.on('data', (message) => {
  console.log('>>> Message received:', { payload: message.payload.toString(), offset: message.offset, partition: message.partition, topic: message.topic })
  if (message.offset > 10) {
    kafkaStream.destroy();
  }
})

kafkaStream.on('close', () => {
  kafkaStream.unsubscribe();
  console.log('Stream ended')
})

Producer Examples

Batch Message Production

const kafkaClient = new KafkaClient({
  brokers: 'localhost:29092',
  clientId: 'my-client-id',
  brokerAddressFamily: 'v4',
});
const producer = kafkaClient.createProducer({});

const messages = Array.from({ length: 100 }, (_, i) => ({
  payload: Buffer.from(JSON.stringify({
    _id: i,
    name: `Batch Message ${i}`,
    timestamp: new Date().toISOString()
  }))
}));

try {
  const result = await producer.send({
    topic: 'my-topic',
    messages
  });
  console.log('Batch sent. Offset:', result);
  console.assert(result.length === 100);
} catch (error) {
  console.error('Batch error:', error);
}

Producer with Keys and Headers

async function produceWithMetadata() {
  const producer = await kafkaCrab.createProducer({ config });

  try {
    await producer.send({
      topic,
      messages: [{
        key: 'user-123',
        payload: Buffer.from(JSON.stringify({
          userId: 123,
          action: 'update'
        })),
        headers: {
          'correlation-id': 'txn-123',
          'source': 'user-service'
        }
      }]
    });
  } catch (error) {
    console.error('Error:', error);
  }
}

Reconnecting Kafka Consumer

import { KafkaClient } from 'kafka-crab-js'

const kafkaClient = new KafkaClient({
  brokers: 'localhost:29092',
  clientId: 'reconnect-test',
  logLevel: 'debug',
  brokerAddressFamily: 'v4',
  configuration: {
    'auto.offset.reset': 'earliest',
  },
})

/**
 * Creates and configures a new Kafka stream consumer
 */
async function createConsumer() {
  const kafkaStream = kafkaClient.createStreamConsumer({
    groupId: 'reconnect-test',
    enableAutoCommit: true,
  })
  await kafkaStream.subscribe([
    { topic: 'foo' },
    { topic: 'bar' },
  ])
  return kafkaStream
}

/**
 * Starts a Kafka consumer with auto-restart capability
 */
async function startConsumer() {
  let counter = 0
  let retryCount = 0
  const MAX_RETRIES = 5
  const RETRY_DELAY = 5000 // 5 seconds

  async function handleRetry() {
    if (retryCount < MAX_RETRIES) {
      retryCount++
      console.log(
        `Attempting to restart consumer (attempt ${retryCount}/${MAX_RETRIES}) in ${RETRY_DELAY / 1000} seconds...`,
      )
      setTimeout(setupConsumerWithRetry, RETRY_DELAY)
    } else {
      console.error(`Maximum retry attempts (${MAX_RETRIES}) reached. Stopping consumer.`)
      process.exit(1)
    }
  }

  async function setupConsumerWithRetry() {
    try {
      const kafkaStream = await createConsumer()
      retryCount = 0 // Reset retry count on successful connection

      console.log('Starting consumer')

      kafkaStream.on('data', (message) => {
        counter++
        console.log('>>> Message received:', {
          counter,
          payload: message.payload.toString(),
          offset: message.offset,
          partition: message.partition,
          topic: message.topic,
        })
      })

      kafkaStream.on('error', async (error) => {
        console.error('Stream error:', error)
        handleRetry()
      })

      kafkaStream.on('close', () => {
        console.log('Stream ended')
        try {
          kafkaStream.unsubscribe()
        } catch (unsubError) {
          console.error('Error unsubscribing:', unsubError)
        }
      })
    } catch (error) {
      console.error('Error setting up consumer:', error)
      handleRetry()
    }
  }

  await setupConsumerWithRetry()
}

await startConsumer()

Examples

You can find some examples on the example folder of this project.

Configuration

Configuration properties

KafkaConfiguration

| Property | Type | Default | Description | | --- | --- | --- | --- | | brokers | string || List of brokers to connect to | | clientId | string || Client id to use for the connection | | securityProtocol | SecurityProtocol || Security protocol to use (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) | | logLevel | string | info | Client id to use for the connection | | brokerAddressFamily | string | "v4" | Address family to use for the connection (v4, v6) | | configuration | Map<string, string> | {} | Additional configuration options for the client. See librdkafka |

You can see the available options here: librdkafka.

Best Practices

Error Handling

  • Always wrap async operations in try-catch blocks
  • Implement proper error logging and monitoring
  • Handle both operational and programming errors separately

Performance

  • Use batch operations for high-throughput scenarios
  • Configure appropriate batch sizes and compression
  • Monitor and tune consumer group performance

Message Processing

  • Validate message formats before processing
  • Implement proper serialization/deserialization
  • Handle message ordering when required

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.


Built with Rust