@chronark/upstash-kafka
v1.1.1
Published
Serverless kafka client for upstash
Downloads
13
Maintainers
Readme
upstash-kafka
Serverless kafka client for upstash
This project is written using the deno runtime and then transpiled to node and published as a package on npm.
Requirements
Either deno 1.x or node 14.x and higher
Installation
Deno
import { Kafka } from "https://deno.land/x/upstash_kafka/mod.ts"
Node
npm install @chronark/upstash-kafka
yarn add @chronark/upstash-kafka
pnpm add @chronark/upstash-kafka
You get the idea.
Quickstart
Auth
- Go to upstash and select your database.
- Copy the
REST API
secrets at the bottom of the page
import { Kafka } from "@chronark/upstash-kafka";
const kafka = new Kafka({
url: "<UPSTASH_KAFKA_REST_URL>",
username: "<UPSTASH_KAFKA_REST_USERNAME>",
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
});
Produce a single message
const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const res = await p.produce("<my.topic>", message);
const res = await p.produce("<my.topic>", message, {
partition: 1,
timestamp: 12345,
key: "<custom key>",
headers: [{ key: "traceId", value: "85a9f12" }],
});
Produce multiple messages.
The same options from the example above can be set for every message.
const p = kafka.producer();
const res = await p.produceMany([
{
topic: "my.topic",
value: "my message",
// ...options
},
{
topic: "another.topic",
value: "another message",
// ...options
},
]);
Consume
The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That's why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.
const c = kafka.consumer();
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
});
More examples can be found in the docstring
Commit manually
While consume
can handle committing automatically, you can also use
Consumer.commit
to manually commit.
const consumerGroupId = "mygroup";
const instanceId = "myinstance";
const topic = "my.topic";
const c = kafka.consumer();
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
});
for (const message of messages) {
// message handling logic
await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
});
}
Fetch
You can also manage offsets manually by using Consumer.fetch
const c = kafka.consumer();
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
});
Examples
There is a minimal working example application available in /example as well as various examples in the docstrings of each method.
Contributing
Setup
- Create a kafka instance on upstash docs
- Create the following topics:
blue
,red
,green
docs - Create
.env
file with your kafka secretscp .env.example .env
Running tests
make test
Building for node
make build
A /npm
folder will be created with the built node module. As part of the build
process the tests are run against your installed node version. To disable this,
you can configure the build pipeline in /cmd/build.ts
// ...
await build({
test: false, // <-- add this
// ... remaining config
});