@streamerson/core
v0.0.66
Published
> For Doing Cool & Idiomatic Things with Streams, Maybe
Downloads
29
Readme
@streamerson/core
For Doing Cool & Idiomatic Things with Streams, Maybe
Overview
This package was created as a core SDK for my smoother-brained endeavours, some of which is documented in the Streamerson Examples. Give it a read, or skip it-- this package exists on its own and might be helpful.
Everything here is built for internal usage in the rest of the @streamerson
packages. However, those higher-level packages mostly wrap this core code, meaning that if my more use-case-driven tooling (the other monorepo packages) aren't of interest to you, maybe the low-level components here will help you build something too.
The idea for the exported code of this package is essentially to achieve the following:
- wrap Redis clients behind interfaces
- wrap reading/writing logic behind real Streams (Read/Writables)
- provide a set of un-opinionated tools for manipulating these Read/Writables
- instrument some basic utilities and helper functions to distribute to the rest of the monorepo
Table of Contents
Notes
Some quick notes:
- All Typescript
- Meant for
node v20+
- All the current code relies on
redis
at a version higher than6
. - In the future, the
Topic
interface will be extended to work with more than Redis streams probably. I have my sights onAWS:SQS
first, because cheap and fast and Kinesis sucks. - Unfinished -- believe the package version being sub-
1
. - Lots of high-hanging fruit for low-level optimizations around asynchronicity, network connection pooling, ser/des, memory management, potential for hanging promises, etc. This is a fairly crunchy project and I've just been getting to these things as I have time.
Installation
- Install the core SDK in your package of choice:
yarn add @streamerson/core
- Import some stuff, get streamin'. The following example will connect to Redis and begin listening for events with a
type
hello
on a streamTopic
, responding in kind with some JSON:
Example
import {StreamingDataSource, Topic} from '@streamerson/core';
export const readChannel = new StreamingDataSource(/* optional options */);
await readChannel.connect();
for await (const event of readChannel.getReadStream({
// ... optional options, like batch size and timeouts
topic: new Topic('my-example-topic')
})) {
readChannel.logger.info(event, 'Received event!')
// Do something with my streamed event?
// We could even `.pipe()` this event to a Writable.
}
- And you're ... streaming with gas? Of course, this is relying on default connection settings and the existence of a Redis server. In this monorepo, there are tools for starting a Redis Docker image, and there are connection options built into the parameters of all the functions for connecting to a non-defaulted Redis instance.
API
RedisDataSource
- a base implementation of a data-source (any interface capable of)
- client (getter for a client for data connection)
- control (getter for a client for orchestration)
- connect() (connect to the datasource)
- disconnect() (disconnect from the datasource)
StreamingDataSource
- an extension of the RedisDataSource, which implements streaming protocols:
- writeToStream({ ... }) (write to a stream using the data-source)
- getReadStream({ ... }) (get a Readable for a stream)
- getWriteStream({ ... }) (get a Writable for a stream)
- iterateStream({ ... }) (get an Iterable that reads from a stream)
- set() (set a key, for orchestration purposes)
- get() (get a key, for orchestration purposes)
- ... and more
Promise Tracker
- a general purpose utility for using the
await
keyword to cede control until a future event has occurred on a stream. You could generally use.once('event')
, but due to memory management concerns I have exposed a utility with an interface as follows:- tracker.promise('event')
- tracker.cancel('event')
- tracker.cancelAll()
Stream Awaiter
- a general purpose utility for call-and-response along two streams. After a message with a given ID is dispatched to one stream, generate a promise that will resolve when the second stream receives a message with a matching ID, using the methods:
- streamAwaiter.dispatch('some-id') (a promise for a response with 'some-id')
- streamAwaiter.readResponseStream() (begin reading incoming responses)
Utils
- A collection of utilities for internal Streamerson use. Outside of that context, use them at your own peril:
- ids
- guuid() (generate a GUUID)
- keys
- keyGenerator() (generate keys with standard markup for stream & key identifiers)
- shardDecorator() (decorate IDs with a shard for logical partitioning)
- consumerProducerDecorator() (decorate IDs with a consumer group for group partioning)
- stream configuration
- buildStreamConfiguration() _(generate valid configurations for other builders)**
- time
- MS_TO_SECONDS() (converts milliseconds to seconds)
- SECONDS_TO_MS() (converts seconds to milliseconds)
- HOURS_TO_MS() (converts hours to milliseconds)
- topic
- new Topic({ /* options */ }) (creates a Topic, which should probably be a first-class citizen of the core package but for now resides here)
- ids
API Reference
:factory: StreamingDataSource
A remote source capable of retrieving stream records from a Redis instance.
Methods
:gear: writeToStream
A low-level implementation wrapping a Redis Stream Write operation
| Method | Type |
| ---------- | ---------- |
| writeToStream
| (outgoingStream: string, incomingStream: string, messageType: MessageType, messageId: string, message: string, sourceId: string, shard?: string) => Promise<string>
|
Parameters:
outgoingStream
: The stream ID to target in RedisincomingStream
: Maybe, a stream ID to reply tomessageType
: The type of the eventmessageId
: The ID of the messagemessage
: The message payloadsourceId
: The ID of the sourceshard
: Maybe, the shard to target
:gear: setResponseType
Sets the MessageType
field default for outgoing messages
| Method | Type |
| ---------- | ---------- |
| setResponseType
| (type: string) => void
|
Parameters:
type
: TheMessageType
for outgoing messages
:gear: addStreamId
Adds a stream to the set for consumption
| Method | Type |
| ---------- | ---------- |
| addStreamId
| (streamId: string) => void
|
Parameters:
streamId
: the key of the stream to ingest
:gear: hasStreamId
Checks whether a stream is set for consumption
| Method | Type |
| ---------- | ---------- |
| hasStreamId
| (streamId: string) => boolean
|
Parameters:
streamId
: the key of the stream to check
:gear: removeStreamId
Removes a stream from the set for consumption
| Method | Type |
| ---------- | ---------- |
| removeStreamId
| (streamId: string) => void
|
Parameters:
streamId
: the key of the stream to remove
:gear: getReadStream
| Method | Type |
| ---------- | ---------- |
| getReadStream
| (options: { topic: Topic; shard?: string; } or GetReadStreamOptions) => Readable and { readableObjectMode: true; }
|
:gear: getWriteStream
Get a Writable
stream, for which written objects will be written to the remote
| Method | Type |
| ---------- | ---------- |
| getWriteStream
| (options: { topic: Topic; shard?: string; } or { stream: string; responseChannel?: string; shard?: string; }) => Writable and { writableObjectMode: true; }
|
Parameters:
options
: : The Topic to publish messages to