npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@streamerson/core

v0.0.66

Published

> For Doing Cool & Idiomatic Things with Streams, Maybe

Downloads

29

Readme

@streamerson/core

For Doing Cool & Idiomatic Things with Streams, Maybe

Overview

This package was created as a core SDK for my smoother-brained endeavours, some of which is documented in the Streamerson Examples. Give it a read, or skip it-- this package exists on its own and might be helpful.

Everything here is built for internal usage in the rest of the @streamerson packages. However, those higher-level packages mostly wrap this core code, meaning that if my more use-case-driven tooling (the other monorepo packages) aren't of interest to you, maybe the low-level components here will help you build something too.

The idea for the exported code of this package is essentially to achieve the following:

  • wrap Redis clients behind interfaces
  • wrap reading/writing logic behind real Streams (Read/Writables)
  • provide a set of un-opinionated tools for manipulating these Read/Writables
  • instrument some basic utilities and helper functions to distribute to the rest of the monorepo

Table of Contents

Notes

Some quick notes:

  • All Typescript
  • Meant for node v20+
  • All the current code relies on redis at a version higher than 6.
  • In the future, the Topic interface will be extended to work with more than Redis streams probably. I have my sights on AWS:SQS first, because cheap and fast and Kinesis sucks.
  • Unfinished -- believe the package version being sub-1.
  • Lots of high-hanging fruit for low-level optimizations around asynchronicity, network connection pooling, ser/des, memory management, potential for hanging promises, etc. This is a fairly crunchy project and I've just been getting to these things as I have time.

Installation

  • Install the core SDK in your package of choice:
yarn add @streamerson/core
  • Import some stuff, get streamin'. The following example will connect to Redis and begin listening for events with a type hello on a stream Topic, responding in kind with some JSON:

Example

readable-stream.example.ts

import {StreamingDataSource, Topic} from '@streamerson/core';

export const readChannel = new StreamingDataSource(/* optional options */);

await readChannel.connect();

for await (const event of readChannel.getReadStream({
    // ... optional options, like batch size and timeouts
    topic: new Topic('my-example-topic')
})) {
    readChannel.logger.info(event, 'Received event!')
    // Do something with my streamed event?
    // We could even `.pipe()` this event to a Writable.
}
  • And you're ... streaming with gas? Of course, this is relying on default connection settings and the existence of a Redis server. In this monorepo, there are tools for starting a Redis Docker image, and there are connection options built into the parameters of all the functions for connecting to a non-defaulted Redis instance.

API

RedisDataSource

  • a base implementation of a data-source (any interface capable of)
    • client (getter for a client for data connection)
    • control (getter for a client for orchestration)
    • connect() (connect to the datasource)
    • disconnect() (disconnect from the datasource)

StreamingDataSource

  • an extension of the RedisDataSource, which implements streaming protocols:
    • writeToStream({ ... }) (write to a stream using the data-source)
    • getReadStream({ ... }) (get a Readable for a stream)
    • getWriteStream({ ... }) (get a Writable for a stream)
    • iterateStream({ ... }) (get an Iterable that reads from a stream)
    • set() (set a key, for orchestration purposes)
    • get() (get a key, for orchestration purposes)
    • ... and more

Promise Tracker

  • a general purpose utility for using the await keyword to cede control until a future event has occurred on a stream. You could generally use .once('event'), but due to memory management concerns I have exposed a utility with an interface as follows:
    • tracker.promise('event')
    • tracker.cancel('event')
    • tracker.cancelAll()

Stream Awaiter

  • a general purpose utility for call-and-response along two streams. After a message with a given ID is dispatched to one stream, generate a promise that will resolve when the second stream receives a message with a matching ID, using the methods:
    • streamAwaiter.dispatch('some-id') (a promise for a response with 'some-id')
    • streamAwaiter.readResponseStream() (begin reading incoming responses)

Utils

  • A collection of utilities for internal Streamerson use. Outside of that context, use them at your own peril:
    • ids
      • guuid() (generate a GUUID)
    • keys
      • keyGenerator() (generate keys with standard markup for stream & key identifiers)
      • shardDecorator() (decorate IDs with a shard for logical partitioning)
      • consumerProducerDecorator() (decorate IDs with a consumer group for group partioning)
    • stream configuration
      • buildStreamConfiguration() _(generate valid configurations for other builders)**
    • time
      • MS_TO_SECONDS() (converts milliseconds to seconds)
      • SECONDS_TO_MS() (converts seconds to milliseconds)
      • HOURS_TO_MS() (converts hours to milliseconds)
    • topic
      • new Topic({ /* options */ }) (creates a Topic, which should probably be a first-class citizen of the core package but for now resides here)

API Reference

_API.md

:factory: StreamingDataSource

A remote source capable of retrieving stream records from a Redis instance.

Methods

:gear: writeToStream

A low-level implementation wrapping a Redis Stream Write operation

| Method | Type | | ---------- | ---------- | | writeToStream | (outgoingStream: string, incomingStream: string, messageType: MessageType, messageId: string, message: string, sourceId: string, shard?: string) => Promise<string> |

Parameters:

  • outgoingStream: The stream ID to target in Redis
  • incomingStream: Maybe, a stream ID to reply to
  • messageType: The type of the event
  • messageId: The ID of the message
  • message: The message payload
  • sourceId: The ID of the source
  • shard: Maybe, the shard to target

:gear: setResponseType

Sets the MessageType field default for outgoing messages

| Method | Type | | ---------- | ---------- | | setResponseType | (type: string) => void |

Parameters:

  • type: The MessageType for outgoing messages

:gear: addStreamId

Adds a stream to the set for consumption

| Method | Type | | ---------- | ---------- | | addStreamId | (streamId: string) => void |

Parameters:

  • streamId: the key of the stream to ingest

:gear: hasStreamId

Checks whether a stream is set for consumption

| Method | Type | | ---------- | ---------- | | hasStreamId | (streamId: string) => boolean |

Parameters:

  • streamId: the key of the stream to check

:gear: removeStreamId

Removes a stream from the set for consumption

| Method | Type | | ---------- | ---------- | | removeStreamId | (streamId: string) => void |

Parameters:

  • streamId: the key of the stream to remove

:gear: getReadStream

| Method | Type | | ---------- | ---------- | | getReadStream | (options: { topic: Topic; shard?: string; } or GetReadStreamOptions) => Readable and { readableObjectMode: true; } |

:gear: getWriteStream

Get a Writable stream, for which written objects will be written to the remote

| Method | Type | | ---------- | ---------- | | getWriteStream | (options: { topic: Topic; shard?: string; } or { stream: string; responseChannel?: string; shard?: string; }) => Writable and { writableObjectMode: true; } |

Parameters:

  • options: : The Topic to publish messages to