@openmessage/qstream
v0.1.0
Published
Topic-Based Messaging Queue on top of Redis Streams
Downloads
7
Readme
QStream
Topic-Based Messaging Queue on top of Redis Streams
Example
Producer
const streams = require('@openmessage/qstream')();
streams.publish('your-topic', data);
see more at examples
Consumer
const streams = require('@openmessage/qstream')();
const group = await streams.group('your-topic', 'group/queue name');
group.consume(async (data) => {
console.log({ data });
return true;
});
see more at examples
Usage
Connection
const QStream = require('@openmessage/qstream');
const qstream = QStream(redisUrl);
redisUrl: Valid Redis URL format
Publish/Produce/Emit
qstream.publish('your-topic', data);
data: can be any valid javascript object, primitive values not supported
With extra args, like maxLen, that will cap the stream to the specified length:
qstream.publish('your-topic', data, 10);
Or approximated maxLen:
qstream.publish('your-topic', data, '~10');
By default streams will be capped to aprox 10000 (MAXLEN ~ 10000). If you don't want your stream to be capped, you have to explicitly set the last arg of publish to null
.
Consumer Group
const group = await streams.group('your-topic', 'consumer-group/queue-name');
Consumers in the same consumer group will load balance jobs among them
Subscrie/Consume/Listen
group.consume(async (data) => {
console.log({ data });
return true;
});
The function passed to the consume method can be a promise
group.consume(console.log, 10);
as a second parameter to the consume function it receives the number of concurrent jobs, defaults to 1
Debug
This lib uses debug to debug the processing
DEBUG=qstream:* npm start
Roadmap
- [x] Add proper logging debug?
- [ ] Add linting
- [ ] Add Tests
- [ ] Add CI / CD
- [ ] Handle unacked messages (CLAIM, PENDING)
- [ ] Add pub/sub case (fanout)
- [ ] Add timeline case
- [ ] Add documentation for history rebuild
- [ ] Improve docs