npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@proximaone/stream-client-js

v1.3.1

Published

Proxima Streams client library

Downloads

10

Readme

StreamDB Client JS

Installation

StreamDB Client requires Node.js v12+ to run.

Install @proximaone/stream-client-js package.

yarn add @proximaone/stream-client-js

Install the dependencies and devDependencies.

yarn install

Relevant Links

To get a simple look at streams without using the client you can check out these quick links:

  • Stream Registry: [https://streams.proxima.one]
  • Streams API Documentation: [https://streams.api.proxima.one]

Client Usage

Reading the stream from the beginning

The most simple scenario is just consuming all the events from the stream.

const client = new ProximaStreamClient();
const name = "proxima.eth-main.blocks.1_0";
const pauseable = await client.streamEvents(name, Offset.zero);
pauseable.observable.forEach(event => {
    console.log({
        offset: event.offset,
        payload: decodeJson(event.payload),
        undo: event.undo,
        timestamp: event.timestamp,
    });
});

Full code

Reading the stream starting from given position

Sometimes you may want to skip first events of the stream and start reading from a certain point. How do you get the offset to pass to the streamEvents call?

One option would be to find it through the UI (TODO: add link). However it may be more convenient to do it directly from the code. For this you have to use StreamRegistryClient which provides access to information about the streams themselves. You can find the offset by events's height or timestamp (see Concepts).

const registryClient = new StreamRegistryClient();
const offsetByHeight = await registryClient.findOffset(name, 1000000);
assert(offsetByHeight);
const offsetByTime = await registryClient.findOffset(name, undefined, 1667131199000);
assert(offsetByTime);
const pauseable = await client.streamEvents(name, offsetByHeight);

Finding all existing streams

Another useful function of StreamRegistryClient is fetching a list of all streams. You can get the first event of each stream like this.

const client = new ProximaStreamClient();
const registryClient = new StreamRegistryClient();
for (const stream of await registryClient.getStreams()) {
    const encoding = stream.metadata.labels["encoding"];
    if (encoding != "json") {
        console.log(`Stream ${stream.name} has unknown encoding: ${encoding}`);
        return;
    }
    const events = await client.fetchEvents(stream.name, Offset.zero, 1, "next");
    const event = events.pop()!;
    console.log(`Stream: ${stream.name} (${stream.metadata.description})`);
    console.log(decodeJson(event.payload));
}

Full code

Handling backpressure

If you are processing events slower than read them they are accumulating in memory possibly causing too high memory consumption. In order to automatically pause reading new events during consumption you can use a BufferedStreamReader helper.

const bufferSize = 10000;
const chunkSize = 1000;
const streamReader = BufferedStreamReader.fromStream(pauseable, bufferSize);
while (true) {
    const chunk = await streamReader.read(chunkSize);
    if (chunk === undefined) {
        console.log("Completed");
        break;
    }
    assert(chunk.length <= chunkSize);
    console.log(`Processing batch from ${chunk[0].offset} to ${chunk[chunk.length - 1].offset}...`);
    await sleep(500);
}

Full code

Concepts

In order to fully understand the client design you should first learn some facts about the event streams themselves.

Fork illustration

Handling reorgs

First, the streams are immutable (append-only). That means that after the event was produced to the stream it can never be removed from it even if no one has consumed it yet. But the sources of most our streams' events are blockchains which are not immutable. After publishing events coming from some blocks "X" and "Y" those blocks could be "forked" rewriting history with blocks "B", "C" and "D" instead. As we can't remove published events from the stream, we publish "undo" events ("-Y" and "-X" on the illustration) which logically cancel the effect of previous events. Each "undo" event corresponds to the last uncancelled event in the stream, has the same payload and has undo property set to true.

Offset

Offset is some unique pointer to the place in event history. Offset actually points not to the event but to the state between the events. Every stream has initial offset Offset.zero.

Height

We call a height of an offset a number of events directly preceding it, without considering forks. Hence every regular event increases height by one and every undo event decreases it by one.

Timestamp

Each event has a timestamp which is the time when the event actually happened. For example, when the block was mined in a blockchain. It may differ from a time when that event got produced to the stream. Timestamp of an offset is the timestamp of the event preceding it.

Aside from epoch time in milliseconds a timestamp may contain additional parts which help to strictly order the events with equal time. For example, parts list may contain index of transaction inside a block and index of emitted log inside a transaction.

Reading direction

When you try to read stream events starting from a given offset most client methods will require specifying a direction of reading which can be either next or last. It controls the direction of consumption.

  • The next mode starts reading events that happened after the given offset, in the order they were produced. Note that because the offset points between the events the first produced event in this mode may have timestamp greater than the timestamp of the given offset.
  • The last mode starts reading events backwards from the given offset. Note that because the offset points between the events the first produced event in this mode will have a timestamp equal to the timestamp of the given offset.

Internals

Instead of having a single backend service the client actually communicates to multiple services:

  • Stream registry which stores streams metadata and endpoints of StreamDBs for a given stream.
  • StreamDB instances which store stream events.

For accessing them there are two client classes:

  • StreamRegistryClient which only accesses stream registry and can fetch metadata.
  • ProximaStreamClient which makes requests to both stream registry and StreamDBs. It fetches a list of StreamDB endpoints from the registry and uses it to automatically connect to the correct instances and switch between them when necessary.

Client connections