npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

syrup-kafka

v0.0.9

Published

This is the kafka util used inside for the syrup project.

Downloads

43

Readme

syrup-kafka

node.js 에서 kafka 사용시 컨슈머와 프로듀서를 쉽게 생성하고 관리해주는 유틸 모듈 입니다.

Motivation

syrup-kafkakafkajs 오픈소스 라이브러리를 기반으로 제작된 kafka 라이브러리 입니다. 기존 kafkajs 라이브러리의 컨슈머, 프로듀서 기능과 더불어 독립 실행 컨슈머 생성을 간단하게 구성 가능하며 redis 를 활용한 현재 컨슈머 상태 저장 기능을 지원합니다. 이 문서에서는 토픽, 프로듀서, 컨슈머, 컨슈머 그룹, 파티션 등과 같은 kafka 의 기본적인 개념은 이해하고 있다고 가정하고 이러한 내용들을 따로 다루고 있지는 않습니다. Kafka의 기본 개념과 용어에 관해서는 Kafka가 제공하는 문서인 IntroductionMain Concpets and Terminology 를 참고해 주세요.

Installation

npm install syrup-kafka

Quick Start

빠른 시작 예제를 ./examples/ 폴더에서 확인할수 있습니다. ./examples/*.js 에는 producerconsumer 생성 예제가 있습니다.

소스코드를 수정없이 바로 사용하는 경우에 const kafka = require('../lib/index.js') 부분을 const kafka = require('syrup-kafka') 로 수정해야 합니다.

Usage

syrup-kafka를 사용하는 예시 코드는 다음과 같습니다.

const kafka = require('syrup-kafka')

const startMain = async () => {
    const redisConfig = {
        host: "localhost",
        port: 6379,
        dbNumber: 0,
        key: "kafka-consumer-exmaple-key",
    }

    const kafkaConfig = {
        clientId: `example-client`,
        brokers: [
            {
                host: "localhost",
                port: 9092,
            },
        ],
    }

    const producerconfig = {
        allowAutoTopicCreation: false,
        transactionTimeout: 30000,
    }

    await kafka.init(redisConfig, kafkaConfig, producerconfig)

    // - 메시지 전송 및 소비 실행

    // 프로세스 종료 전 kafka.exit() 함수를 호출 합니다.
    kafka.exit((error) => {
        // kafka.exit 처리 이후 수행되여야 할 코드
    })
}

startMain()

syrup-kafka 에서 사용되는 producer 역할을 하는 함수인 send, consumer 를 생성하는 setConsumer 함수를 호출하기 전, 반드시 init 함수를 호출하여야 합니다. init 함수를 선행하지 않고 send, setConsumer 함수를 호출하면 throw - exception 이벤트가 발생합니다.

Table of contents

Config

아래는 kafkajsredis 사용을 위한 Config 객체 정의 방법을 설명합니다.

Redis Config

const redisConfig = {
    host: "localhost",
    port: 6379,
    dbNumber: 0,
    key: "kafka-consumer-exmaple-key",
    username: "username",
    password: "password",
}

각 옵션에 대한 설명은 다음과 같습니다.

  • host redis 연결시 필요한 host 주소 입니다. 미입력시 기본값 localhost
  • port redis 연결시 필요한 port 번호 입니다. 미입력시 기본값 6379
  • dbNumber consumer 정보가 저장되는 DB 번호 입니다. 미입력시 기본값 0
  • key consumer 정보가 저장되는 key 이름 입니다. 미입력시 기본값 kafka-consumer-info
  • username redis ACL 유저 이름
  • password redis ACL 암호 혹은 이전 --requirepass 암호

Create Consumerconsumer 를 생성하면, consumer 에 대한 정보가 Redis Config 에 정의된 key 에 hash 자료구조로 저장됩니다.

HGETALL key 등의 명령어로 현재 생성된 consumer 정보를 확인 가능하며, hash 자료구조 안에서의 key 는 Kafka Config 에서 정의한 clientId 로 설정됩니다.

아래는 kafka-consumer-info key 를 가진 hash 자료구조 내에서 consumer-server-1 key 의 예시 내용입니다.

{
  "topicA": {
    "partitions": [
      1
    ],
    "groupId": "topicA-server:1-group",
    "memberId": "consumer-server-1-a796c4eb-3655-4e2e-b00e-3f40a94680ab",
    "leaderId": "consumer-server-1-a796c4eb-3655-4e2e-b00e-3f40a94680ab",
    "isLeader": true,
    "groupProtocol": "custom-assigner"
  },
  "topicB": {
    "groupId": "topicB-consumer-group",
    "memberId": "consumer-server-1-cb3ce39f-e325-427c-b02c-8d0837595ec8",
    "leaderId": "consumer-server-0-52b26999-34d6-409a-8e00-ef5f075e7716",
    "isLeader": false,
    "groupProtocol": "RoundRobinAssigner"
  }
}

위 예시에서 다음과 같은 정보를 확인할 수 있습니다.

  • 객체에서 key(field) 는 토픽의 이름입니다. Kafka Config 에서 clientIdconsumer-server-1 로 정의한 서버는 topicA, topicB 에 대한 토픽을 수신합니다.
  • topicA 토픽을 소비하는 consumergroupProtocol 값이 custom-assigner 이므로 Custom Consumer 로 생성되었습니다.
  • topicA 토픽에서 1번 파티션을 지정하여 메시지를 소비합니다.

위의 예시에서 topicA key 에 대한 value 는 topicA 토픽을 소비하는 consumer 정보를 나타낼 뿐이며, key 의 개수가 2개 (topicA, topicB) 라고 해서 consumer 가 2개 생성되는 것은 아닙니다. 실제 생성되는 consumer 의 개수는 Create Consumer 에서 생성한 컨슈머 그룹 ID 를 정의한 개수만큼 생성됩니다.

Kafka Config

const kafkaConfig = {
    clientId: `example-client`,
    brokers: [
        {
            host: "localhost",
            port: 9092,
        },
        {
            host: "localhost",
            port: 9093,
        },
    ],
    ssl: {
        rejectUnauthorized: false,
        ca: [fs.readFileSync('/my/custom/ca.crt', 'utf-8')],
        key: fs.readFileSync('/my/custom/client-key.pem', 'utf-8'),
        cert: fs.readFileSync('/my/custom/client-cert.pem', 'utf-8')
    },
    sasl: {
        mechanism: 'plain', // scram-sha-256 or scram-sha-512
        username: 'my-username',
        password: 'my-password'
    }
}

각 옵션에 대한 설명은 다음과 같습니다.

  • clientId 프로세스를 구별하기 위한 식별 id 입니다. 해당값은 필수이며 미입력시 exception 이 발생합니다.
  • brokers kafka 연결시 필요한 broker 목록 입니다. 배열이 없가나 배열 아이템 개수가 0인경우 기본적으로 localhost:9092 로 연결을 시도합니다.
  • brokers.host broker host 주소 입니다. 유효하지 않은 값을 입력시 exception 오류가 발생할 수 있습니다.
  • brokers.port broker port 번호 입니다. 유효하지 않은 값을 입력시 exception 오류가 발생할 수 있습니다.
  • ssl TLS 통신을 위한 ssl 설정 속성 입니다. 사용하지 않을시 값을 생략 가능합니다.
  • sasl kafka SASL 클라이언트 인증을 위한 속성 입니다. 사용하지 않을시 값을 생략 가능합니다.

sslsasl 적용에 대한 내용은 각 링크를 확인해 주세요

Producer Config

const producerconfig = {
    allowAutoTopicCreation: false,
    transactionTimeout: 30000,
}

메시지를 발행할때 사용되는 producer 의 옵션입니다. init 함수 호출시 세번째 인자로 사용되며 생략할 수 있습니다. 각 옵션에 대한 설명은 다음과 같습니다.

| 옵션 | 설명 | 기본값 | | ---------------------- | --- | ---- | | createPartitioner | 커스텀 파티셔너 생성시 사용됩니다. | null | retry | 재시도 정책 설정시 사용됩니다. | null | metadataMaxAge | metadata 에 대한 변경이 오랜시간 감지되지 않아도 metadata 를 강제로 새로 고치는 밀리초 시간 | 300000 (5분) | allowAutoTopicCreation | 존재하지 않는 topic 에 대해 message 를 발행하는 경우 자동으로 topic 을 생성할지에 대한 여부 | true | transactionTimeout | 트랜잭션 상태 업데이트를 기다리는 최대 시간, broker transaction.max.timeout.ms 의 설정 보다 크면 오류 InvalidTransactionTimeout 와 함께 요청이 실패합니다. | 60000

idempotentmaxInFlightRequests 와 같은 설명되지 않은 추가 속성에 대한 자세한 사항은 해당 문서 를 참고해주세요.

Create Consumer

syrup-kafka 에서 consumer 를 생성하는 방법은 setConsumer 함수를 이용하는 것입니다. 아래는 기본 컨슈머인 RoundRobin Consumer 와 특정 파티션을 지정해서 메시지를 소비하는 Custom Consumer 를 정의하는 방법을 설명합니다.

컨슘된 메시지 수신은 EACH_MESSAGE 이벤트를 참고해주세요.

RoundRobin Consumer

const kafka = require('syrup-kafka')

// - kafka.init 함수 호출이 필요합니다.
// - ...

const consumerGroup = {
    "topic-consumer-group-id": {
        isPartitionAssign: false,
        fromBeginning: true,
        topics: [
            {
                name: `example-topic`,
            },
        ],
    }
}

await kafka.setConsumer(consumerGroup)

// - ...

RoundRobin 컨슈머를 생성하는 일부 소스코드 입니다. isPartitionAssign 속성을 false 로 선언시에 기본적으로 해당 컨슈머는 RoundRobin 컨슈머로 생성되며 컨슈머 그룹 아이디 는 위 코드 topic-consumer-group-id 에서 확인할 수 있듯이 속성 key 로 지정하면 됩니다. 이때 key 에 대한 value 는 컨슈머 그룹에 참여할 컨슈머의 정보를 포함합니다.

  • isPartitionAssign 파티션을 직접 지정하여 메시지를 소비할지 여부
  • fromBeginning 컨슈머 그룹에서 오프셋 커밋이 없는 경우 메시지를 파티션 처음부터 소비할지 여부
  • topics 컨슈머가 소비할 토픽정보 배열
  • topics.name 메시지를 소비할 토픽의 이름

Custom Consumer

기본 RoundRobin Consumer 를 사용하는 것이 아닌, 사용자가 직접 토픽의 특정 파티션을 지정하여 소비하는 경우가 필요한 경우 Custom Consumer 를 사용할 수 있습니다.

const kafka = require('syrup-kafka')

// - kafka.init 함수 호출이 필요합니다.
// - ...
const consumerGroup = {
    `topic-consumer-group-id`: {
        isAssignPartition: true,
        fromBeginning: true,
        topics: [
            {
                name: `topicA`,
                partitions: [0],
            },
        ]
    }
}

// - ...

커스텀 컨슈머(독립 실행 컨슈머)를 생성하는 일부 소스코드 입니다. isPartitionAssign 속성을 true 로 선언시 독립 실행 컨슈머 타입으로 생성되며 컨슈머 그룹 IDRoundRobin Consumer 에서 정의한 것과 동일한 방식으로 속성 key 로 지정하면 됩니다. 위 에제 코드에서 컨슈머 그룹 IDtopic-consumer-group-id 가 되며, 이때 value 는 컨슈머 그룹에 참여할 컨슈머의 정보를 포함합니다.

  • isPartitionAssign 파티션을 직접 지정하여 메시지를 소비할지 여부
  • fromBeginning 컨슈머 그룹에서 오프셋 커밋이 없는 경우 메시지를 파티션 처음부터 소비할지 여부
  • topics 컨슈머가 소비할 토픽정보 배열
  • topics.name 메시지를 소비할 토픽의 이름
  • topics.partitions 메시지를 소비할 파티션 번호 배열

Custom Consumer 로 컨슈머를 생성한 경우 각기 다른 서버에서 생성된 컨슈머들이 동일한 컨슈머 그룹 에 속해 있더라도 topics.partitions 에 정의한 파티션의 메시지를 각자 모두 소비합니다.

Create Producer

producer 객체의 생성 시점은 send 함수 실행시 syrup-kafka 라이브러리 내부에 producer 객체가 존재지 않는 경우 싱글턴 패턴으로 생성되며 kafka broker 에 연결 됩니다. 메시지의 전송은 Send Message 에서 확인 가능합니다.

Send Message

const kafka = require('syrup-kafka')

// - kafka.init 함수 호출이 필요합니다.
// - ...

// 일반적인 메시지 전송
await kafka.send({
    topic: "topicA",
    value: "test message",
})

// key 가 포함된 형태의 메시지 전송
await kafka.send({
    topic: "topicA",
    key: "key1",
    value: "test message",
})

// header 정보를 포함한 형태의 메시지 전송
await kafka.send({
    topic: "topicA",
    headers: {
        example: "header example",
    }
})

// Object 형태의 메시지 전송
await kafka.send({
    topic: "topicA",
    value: {
        example: "example",
    }
})

// 토픽의 특정 파티션에 메시지 전송
await kafka.send({
    topic: "topicB",
    partitions: [0, 1],
    value: "test message",
})

// client id 를 직접 지정하여 독립 실행 컨슈머에게 메시지 전송
await kafka.send({
    topic: "topicC",
    arrival: `standalone-consumer`,
    value: "test message",
})

// partitions 와 arrival 동시에 선언시 arrival 는 무시됨
// topicD 토픽의 0번 파티션에 메시지 생성
await kafka.send({
    topic: "topicD",
    partitions: [0],
    arrival: `standalone-consumer`, // 속성이 무시됨
    value: "test message",
})

// 옵션을 사용하여 메시지 전송
const options = {
    acks: -1,
    timeout: 30000,
    compression: CompressionTypes.None,
}

await kafka.send({
    topic: "topicE",
    value: "test message",
    options: options,
})

// - ...

send 함수를 사용하여 메시지를 토픽에 전송할수 있습니다. send 함수에서 사용되는 객체의 속성은 topic partitions arrival key value options 입니다. 각 속성의 의미는 다음과 같습니다.

  • topic 메시지 전송할 토픽 이름입니다. 값이 반드시 있어하며 문자열 형태여야 합니다. 값이 없거나 공백인 경우, 문자열이 아닌경우 throw - exception 이 발생합니다.
  • partitions 파티션을 지정하여 메시지를 전송하고자 할때 사용합니다. number 배열 형태로 데이터를 입력해야하며, 해당 값이 존재하지 않는 경우 기본 파티셔너를 통해 메시지가 전송됩니다.
  • arrival 전송한 데이터를 소비하고자 하는 컨슈머가 Custom Consumer 로 정의되어 있는경우 Kafka Config 에서 등록한 clientIdarrival 에 등록시 파티션 번호를 명시하지 않아도 해당 Custom Consumer 가 소비하는 파티션으로 메시지를 전송합니다.
  • headers 전송하려는 메시지의 해더 정보 입니다. Object 형색이며 departure key 와 arrival key 값은 예약어 이므로 사용할 수 없습니다.
  • key 전송하려는 key 값 입니다.
  • value 전송하려는 메시지 내용 입니다. string 혹은 buffer 타입이며 Object 타입으로 전송하는 경우 자동으로 JSON.stringify 형태로 전송됩니다.
  • options 전송하려는 메시지의 옵션 입니다. 옵션을 사용하지 않을시 생략 가능합니다. options 에 대한 각 속성은 여기 를 참고해 주세요.

partitionsarrival 속성을 동시에 선언한 경우에는 partitions 속성이 우선권을 가지며 arrival 은 무시됩니다.

key 의 추가적인 설명

key 는 메시지를 보낼 파티션을 결정하는데 사용됩니다. 이는 동일한 관련된 메시지가 순서대로 처리되도록 하는데 필요하며 예를 들어, orderIdkey 로 사용시 주문과 관련된 모든 메시지가 순서대로 처리되도록 할 수 있습니다.

기본적으로 producer 는 다음 로직에 따라 메시지를 배포하도록 구성됩니다.

  • 메시지에 파티션이 지정되어 있는 경우 해당 파티션을 선택
  • 파티션이 지정되지 않았지만 키가 있는 경우 키의 해시 (murmur2) 기반으로 파티션을 선택
  • 파티션이나 키가 없으면 라운드 로빈 방식으로 파티션을 선택

Message send options

| 속성 | 설명 | 기본값 | | --- | --- | ---- | | acks | 필요한 ack 수를 제어 -1 (all) leader 파티션과 모든 follow 파티션이 메시지를 수신하였는지 확인합니다. 0 broker 에게 메시지 전달이 정상적으로 되었는지만 확인합니다. 1 leader 파티션에게 메시지가 제대로 전달되었는지를 확인합니다. follow 파티션은 확인하지 않습니다. | -1 | | timeout | 응답 대기시간 | 30000 | | compression | 압축 코덱 | CompressionTypes.None |

Callback Event

Callback 함수를 등록하여 특정 이벤트에 대한 수신이 가능합니다. init 함수 이전/이후 순서가 중요하진 않지만 init 함수 호출 이전에 Callback 함수를 등록하는것을 권장합니다.

아래는 Callback 함수의 종류와 등록 방법을 설명합니다.

const kafka = require('syrup-kafka')

// - ...

kafka.on(kafka.CALLBACK_TYPE.ERROR, async (message) => {

})

kafka.on(kafka.CALLBACK_TYPE.KAFKA_LOG, async (level, entry) => {

})

kafka.on(kafka.CALLBACK_TYPE.EACH_MESSAGE, async (topic, partition, message) => {

})

kafka.on(kafka.CALLBACK_TYPE.JOIN_GROUP, async (payLoad) => {

})

kafka.on(kafka.CALLBACK_TYPE.CONSUMERS_LOAD_COMPLETE, async (consumers) => {

})

// - ...

// kafka.init 함수 호출

// - ...
  • ERROR 라이브러리 동작 오류 발생시
  • KAFKA_LOG kafkajs 라이브러리의 로그를 로깅하고자 할 때
  • EACH_MESSAGE 등록된 컨슈머에 의해 메시지가 소비될 때
  • JOIN_GROUP 컨슈머가 컨슈머 그룹에 참가를 완료했을 때
  • CONSUMERS_LOAD_COMPLETE 정의된 모든 컨슈머들이 각 컨슈머 그룹에 참가를 완료했을 때

ERROR

const kafka = require('syrup-kafka')

// - ...

kafka.on(kafka.CALLBACK_TYPE.ERROR, async (message) => {
    console.error(message)
})

// - ...

syrup-kafka 라이브러리 동작 오류 발생시 이 이벤트가 호출 되며 message 는 string 타입 입니다. KAFKA_LOG 이벤트의 error 수준의 로그 메시지도 이 콜백에서 함께 수신됩니다.

KAFKA_LOG

const kafka = require('syrup-kafka')

// - ...

kafka.on(kafka.CALLBACK_TYPE.KAFKA_LOG, async (level, entry) => {
    console.log(`level: ${level}, entry:\n${JSON.stringify(entry, null, 2)}`)
})

// - ...

syrup-kafka 내부에서 사용되는 kafka 라이브러리인 kafkajs 라이브러리에 대한 로깅이 필요한 경우 이 이벤트로 수신 가능합니다.

level 은 다음 값을 가집니다.

| level | label | 추가 설명 | | ----- | --------- | ------------------------------------------ | | 0 | NOTHING | | | 1 | ERROR | 이 경우, ERROR 이벤트가 함께 발생합니다. | | 2 | WARN | | | 4 | INFO | | | 5 | DEBUG | |

entry 는 다음과 같은 속성을 가지며 예를들어 이러한 값들이 올수 있습니다.

  • level 4,
  • label 'INFO', // NOTHING, ERROR, WARN, INFO, or DEBUG
  • timestamp '2017-12-29T13:39:54.575Z'
  • logger 'kafkajs'
  • message 'Started'

더 자세한 kafkajs logger 에 대한 내용은 해당 문서 를 참고해주세요.

EACH_MESSAGE

const kafka = require('syrup-kafka')

// - ...

kafka.on(kafka.CALLBACK_TYPE.EACH_MESSAGE, async (topic, partition, message) => {
    console.log(`${JSON.stringify({
        topic: topic,
        partition: partition,
        headers: message.headers,
        key: message.key,
        value: message.value,
    }, null, 2)}`)
})

// - ...

컨슈머 그룹에 참가한 컨슈머들이 메시지를 수신했을때 이벤트가 호출 됩니다.

  • topic 메시지가 수신된 토픽 이름 string
  • partition 메시지가 수신된 파티션 번호 number
  • message 수신된 메시지 정보 object

message 객체의 속성을 정의하고 있습니다.

  • key 파티셔닝에 사용되는 key 값. 자세한 내용은 key 의 추가적인 설명 을 참고해 주세요.
  • value 메시지 내용. send-message 에서 value 를 Object 타입으로 전송한경우 Object 타입으로 자동 파싱됩니다. JSON.parse
  • headers 메시지의 헤더 정보 object

headers 에는 다음 정보가 포함되어 있습니다.

  • departure 메시지가 생성된 프로듀서의 clientId
  • arrival 메시지가 도착할 컨슈머가 정의된 clientId (지정된 경우)

JOIN_GROUP

const kafka = require('syrup-kafka')

// - ...

kafka.on(kafka.CALLBACK_TYPE.JOIN_GROUP, async (payLoad) => {
    console.log(JSON.stringify(payLoad, null, 2))
})

// - ...

컨슈머가 컨슈머 그룹에 참가를 성공한 경우 이벤트가 호출 됩니다. 매개변수 payLoadkafkajs 내부에 정의되어 있는 ConsumerGroupJoinEvent 타입 입니다.

예를 들어 컨슈머를 3개 정의한 경우 각 컨슈머들이 정의된 컨슈머 그룹에 정상적으로 참여에 성공한 경우 해당 이벤트는 3번 호출 됩니다.

ConsumerGroupJoinEvent 타입은 다음 속성이 있습니다.

  • duration number
  • groupId string
  • isLeader boolean
  • leaderId string
  • groupProtocol string
  • memberId string
  • memberAssignment [key: string]: number[]

CONSUMERS_LOAD_COMPLETE

const kafka = require('syrup-kafka')

// - ...

kafka.on(kafka.CALLBACK_TYPE.CONSUMERS_LOAD_COMPLETE, async (consumers) => {
    // for (const consumer of consumers) {
    //     const topcis = consumer.topics
    //     const groupId = consumer.groupId
    //     const fromBeginning = consumer.fromBeginning
    //     const kafkaConsumer = consumer.kafkaConsumer
    //     ...
    // }
})

// - ...

Create Consumer 에서 생성한 모든 컨슈머들이 정상적으로 컨슈머 그룹에 참가 완료 되었을때 이 이벤트가 호출 됩니다.

매갸변수로 전달되는 consumers 는 consumer[] 배열이며 consumer 객체에 대한 속성 정보는 다음과 같습니다

  • topics 해당 컨슈머가 구독중인 토픽 목록
  • groupId 해당 컨슈머가 속한 그룹 id
  • fromBeginning 해당 컨슈머가 메시지를 처음부터 소비하는지에 대한 여부
  • kafkaConsumer kafkajs consumer 객체

Disconnection & Exit

const kafka = require('syrup-kafka')

// - ...

kafka.exit((error) => {
    // kafka.exit 처리 이후 수행되여야 할 코드
})

// - ...

redis, kafka connection 종료 및 redis 에 저장된 컨슈머 정보 삭제를 위해 exit() 함수 호출이 필요합니다.

예를 들어 다음과 같이 exit 함수를 호출할 수 있습니다.

const kafka = require('syrup-kafka')

// - ...

await kafka.init(redisConfig, kafkaConfig)

// - ...

process.stdin.resume()

const exitHandler = (options, exitCode) => {
    kafka.exit((error) => {
        if (error) console.error(error)

        if (options.cleanup) console.log('clean');
        if (exitCode || exitCode === 0) console.log(`exitCode: ${exitCode}`);
        if (options.exit) process.exit();
    })
}

process.on('exit', exitHandler.bind(null, { cleanup: true }));
process.on('SIGINT', exitHandler.bind(null, { exit: true }));
process.on('SIGUSR1', exitHandler.bind(null, { exit: true }));
process.on('SIGUSR2', exitHandler.bind(null, { exit: true }));
process.on('uncaughtException', exitHandler.bind(null, { exit: true }));