redis-x-stream
v3.2.3
Published
An async iterable interface for redis streams
Downloads
246,072
Maintainers
Readme
redis-x-stream
Create async iterables that emit redis stream entries. Requires Redis 5 or greater.
Getting Started
import { RedisStream } from 'redis-x-stream'
import Redis from 'ioredis'
const myStream = 'my-stream'
await populate(myStream, 1e5)
let i = 0
for await (const [streamName, [id, keyvals]] of new RedisStream(myStream)) {
i++;
}
console.log(`read ${i} stream entries from ${myStream}`)
async function populate(stream, count) {
const writer = new Redis({ enableAutoPipelining: true })
await Promise.all(
Array.from(Array(count), (_, j) => writer.xadd(stream, '*', 'index', j))
)
writer.quit()
await new Promise(resolve => writer.once('close', resolve))
console.log(`wrote ${count} stream entries to ${stream}`)
}
Usage
See the API Docs for available options.
Advanced Usage
Task Processing
If you have a cluster of processes reading redis stream entries you likely want to utilize redis consumer groups
A task processing application may look like the following:
const control = {
/* some control event emitter */
}
const stream = new RedisStream({
streams: ['my-stream'],
group: 'my-group',
//eg. k8s StatefulSet hostname. or Cloud Foundry instance index
consumer: 'tpc_' + process.env.SOME_ORDINAL_IDENTIFIER,
block: Infinity,
count: 10,
deleteOnAck: true,
})
const lock = new Semaphore(11)
const release = lock.release.bind(lock)
control.on('new-source', (streamName) => {
//Add an additional source stream to a blocked stream.
stream.addStream(streamName)
})
control.on('shutdown', async () => {
//drain will process all claimed entries (the PEL) and stop iteration
await stream.drain()
})
async function tryTask(stream, streamName, id, entry) {
//...process entry...
stream.ack(streamName, id)
}
for await (const [streamName, [id, keyvals]] of stream) {
await lock.acquire()
void tryTask(stream, streamName, id, keyvals).finally(release)
}