npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@kaxline/streamr-client

v5.4.5

Published

JavaScript client library for Streamr

Downloads

1

Readme

Build Status GitHub release GitHub stars Discord Chat

By using this client, you can easily interact with the Streamr API from JavaScript-based environments, such as browsers and node.js. You can, for example, subscribe to real-time data in streams, produce new data to streams, and create new streams. The client uses websockets for producing and consuming messages to/from streams. It should work in all modern browsers.

Please see the API Docs for more detailed documentation.

Breaking changes notice

  • Support for unsigned data will be dropped in the second half of 2021 or in 2022. This means that every data point will require a signature using the publisher's private key.

TOC

Installation · Usage · API Docs · Client options · Authentication · Managing subscriptions · Stream API · Subscription options · Data Unions · Utility functions · Events · Stream Partitioning · Logging · NPM Publishing

Installation

The client is available on npm and can be installed simply by:

npm install streamr-client

Node v14 or higher is recommended if you intend to use the client in a Node environment. For example, inside a script.

Usage

Here are some quick examples. More detailed examples for the browser and node.js can be found here.

Please see the API Docs for more detailed documentation.

If you don't have an Ethereum account you can use the utility function StreamrClient.generateEthereumAccount(), which returns the address and private key of a fresh Ethereum account.

Creating a StreamrClient instance

const client = new StreamrClient({
    auth: {
        privateKey: 'your-private-key'
    }
})

When using Node.js remember to import the library with:

import { StreamrClient } from 'streamr-client';

Subscribing to real-time events in a stream

const sub = await client.subscribe({
    stream: 'streamId',
    partition: 0, // Optional, defaults to zero. Use for partitioned streams to select partition.
    // optional resend options here
}, (message, metadata) => {
    // This is the message handler which gets called for every incoming message in the stream.
    // Do something with the message here!
})

Resending historical data

const sub = await client.resend({
    stream: 'streamId',
    resend: {
        last: 5,
    },
}, (message) => {
    // This is the message handler which gets called for every received message in the stream.
    // Do something with the message here!
})

See "Subscription options" for resend options

Programmatically creating a stream

const stream = await client.createStream({
    id: '/foo/bar', // or 0x1234567890123456789012345678901234567890/foo/bar or mydomain.eth/foo/bar
})
console.log(`Stream ${stream.id} has been created!`)

// Optional: to enable historical data resends, add the stream to a storage node
await stream.addToStorageNode(StorageNode.STREAMR_GERMANY)

// Do something with the stream, for example call stream.publish(message)

Publishing data points to a stream

// Here's our example data point
const msg = {
    temperature: 25.4,
    humidity: 10,
    happy: true
}

// Publish using the stream id only
await client.publish('my-stream-id', msg)

// The first argument can also be the stream object
await client.publish(stream, msg)

// Publish with a specific timestamp as a Date object (default is now)
await client.publish('my-stream-id', msg, new Date(54365472))

// Publish with a specific timestamp in ms
await client.publish('my-stream-id', msg, 54365472)

// Publish with a specific timestamp as a ISO8601 string
await client.publish('my-stream-id', msg, '2019-01-01T00:00:00.123Z')

// Publish with a specific partition key (read more about partitioning further down this readme)
await client.publish('my-stream-id', msg, Date.now(), 'my-partition-key')

// For convenience, stream.publish(...) equals client.publish(stream, ...)
await stream.publish(msg)

API Docs

The API docs are automatically generated from the TypeScript source code. They can also be rebuilt locally via:

npm run docs

Client options

| Option | Default value | Description | | :----------------------- | :------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | url | wss://streamr.network/api/v1/ws | Address of the Streamr websocket endpoint to connect to. | | restUrl | https://streamr.network/api/v1 | Base URL of the Streamr REST API. | | auth | {} | Object that can contain different information to authenticate. More details below. | | publishWithSignature | 'auto' | Determines if data points published to streams are signed or not. Possible values are: 'auto', 'always' and 'never'. Signing requires auth.privateKey or auth.ethereum. 'auto' will sign only if one of them is set. 'always' will throw an exception if none of them is set. | | verifySignatures | 'auto' | Determines under which conditions signed and unsigned data points are accepted or rejected. 'always' accepts only signed and verified data points. 'never' accepts all data points. 'auto' verifies all signed data points before accepting them and accepts unsigned data points only for streams not supposed to contain signed data. | | autoConnect | true | If set to true, the client connects automatically on the first call to subscribe(). Otherwise an explicit call to connect() is required. | | autoDisconnect | true | If set to true, the client automatically disconnects when the last stream is unsubscribed. Otherwise the connection is left open and can be disconnected explicitly by calling disconnect(). | | orderMessages | true | If set to true, the subscriber handles messages in the correct order, requests missing messages and drops duplicates. Otherwise, the subscriber processes messages as they arrive without any check. | | maxPublishQueueSize | 10000 | Only in effect when autoConnect = true. Controls the maximum number of messages to retain in internal queue when client has disconnected and is reconnecting to Streamr. | | publisherGroupKeys | {} | Object defining the group key as a hex string used to encrypt for each stream id. | | publisherStoreKeyHistory | true | If true, the client will locally store every key used to encrypt messages at some point. If set to false, the client will not be able to answer subscribers asking for historical keys during resend requests. | | subscriberGroupKeys | {} | Object defining, for each stream id, an object containing the group key used to decrypt for each publisher id. Not needed if keyExchange is defined. | | keyExchange | {} | Defines RSA key pair to use for group key exchange. Can define publicKey and privateKey fields as strings in the PEM format, or stay empty to generate a key pair automatically. Can be set to null if no key exchange is required. |

Authentication options

Note: Authenticating with an API key has been deprecated. Cryptographic keys/wallets is the only supported authentication method.

If you don't have an Ethereum account you can use the utility function StreamrClient.generateEthereumAccount(), which returns the address and private key of a fresh Ethereum account.

Authenticating with Ethereum also automatically creates an associated Streamr user, even if it doesn't already exist. Under the hood, the client will cryptographically sign a challenge to authenticate you as a Streamr user:

const client = new StreamrClient({
    auth: {
        privateKey: 'your-private-key'
    }
})

Authenticating with an Ethereum private key contained in an Ethereum (web3) provider:

const client = new StreamrClient({
    auth: {
        ethereum: window.ethereum,
    }
})

Authenticating with a pre-existing session token (used internally by the Streamr app):

const client = new StreamrClient({
    auth: {
        sessionToken: 'session-token'
    }
})

To extract the session token from an authenticated client:

const bearerToken = await client.session.getSessionToken()

Then for example,

    axios({
        headers: {
            Authorization: `Bearer ${bearerToken}`,
        },
        ...
    )}

Note, session tokens expire after four hours and may need to be refreshed.

Connecting

By default the client will automatically connect and disconnect as needed, ideally you should not need to manage connection state explicitly. Specifically, it will automatically connect when you publish or subscribe, and automatically disconnect once all subscriptions are removed and no messages were recently published. This behaviour can be disabled using the autoConnect & autoDisconnect options when creating a new StreamrClient. Explicit calls to either connect() or disconnect() will disable all autoConnect & autoDisconnect functionality, but they can be re-enabled by calling enableAutoConnect() or enableAutoDisconnect().

Calls that need a connection, such as publish or subscribe will fail with an error if you are disconnected and autoConnect is disabled.

| Name | Description | | :---------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | connect() | Safely connects if not connected. Returns a promise. Resolves immediately if already connected. Only rejects if an error occurs during connection. | | disconnect() | Safely disconnects if not already disconnected, clearing all subscriptions. Returns a Promise. Resolves immediately if already disconnected. Only rejects if an error occurs during disconnection. | | enableAutoConnect(enable = true) | Enables autoConnect if it wasn't already enabled. Does not connect immediately. Use enableAutoConnect(false) to disable autoConnect. | | enableAutoDisconnect(enable = true) | Enables autoDisconnect if it wasn't already enabled. Does not disconnect immediately. Use enableAutoConnect(false) to disable autoDisconnect. |

const client = new StreamrClient({
    auth: {
        privateKey: 'your-private-key'
    },
    autoConnect: false,
    autoDisconnect: false,
})

await client.connect()

Managing subscriptions

| Name | Description | | :--------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | subscribe(options, callback) | Subscribes to a stream. Messages in this stream are passed to the callback function. See below for subscription options. Returns a Promise resolving a Subscription object. | | unsubscribe(Subscription) | Unsubscribes the given Subscription. Returns a promise. | | unsubscribeAll(streamId) | Unsubscribes all Subscriptions for streamId. Returns a promise. | | getSubscriptions() | Returns a list of all active Subscriptions on this client. Returns a promise. |

Message handler callback

The second argument to client.subscribe(options, callback) is the callback function that will be called for each message as they arrive. Its arguments are as follows:

| Argument | Description | | :------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | payload | A JS object containing the message payload itself | | streamMessage | The whole StreamMessage object containing various metadata, for example streamMessage.getTimestamp() etc. |

const sub = await client.subscribe({
    streamId: 'my-stream-id',
}, (payload, streamMessage) => {
    console.log({
        payload, streamMessage
    })
})

Subscription Options

Note that only one of the resend options can be used for a particular subscription. The default functionality is to resend nothing, only subscribe to messages from the subscription moment onwards.

| Name | Description | | :-------- | :--------------------------------------------------------------------------------- | | stream | Stream id to subscribe to | | partition | Partition number to subscribe to. Defaults to partition 0. | | resend | Object defining the resend options. Below are examples of its contents. | | groupKeys | Object defining the group key as a hex string for each publisher id of the stream. |

// Resend N most recent messages
const sub1 = await client.subscribe({
    streamId: 'my-stream-id',
    resend: {
        last: 10,
    }
}, onMessage)

// Resend from a specific message reference up to the newest message
const sub2 = await client.subscribe({
    streamId: 'my-stream-id',
    resend: {
        from: {
            timestamp: 12345,
            sequenceNumber: 0, // optional
        },
        publisher: 'publisherId', // optional
        msgChainId: 'msgChainId', // optional
    }
}, onMessage)

// Resend a limited range of messages
const sub3 = await client.subscribe({
    streamId: 'my-stream-id',
    resend: {
        from: {
            timestamp: 12345,
            sequenceNumber: 0, // optional
        },
        to: {
            timestamp: 54321,
            sequenceNumber: 0, // optional
        },
        publisher: 'publisherId', // optional
        msgChainId: 'msgChainId', // optional
    }
}, onMessage)

If you choose one of the above resend options when subscribing, you can listen on the completion of this resend by doing the following:

const sub = await client.subscribe(options)
sub.on('resent', () => {
    console.log('All caught up and received all requested historical messages! Now switching to real time!')
})

Stream API

All the below functions return a Promise which gets resolved with the result.

| Name | Description | | :-------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------- | | getStream(streamId) | Fetches a stream object from the API. | | listStreams(query) | Fetches an array of stream objects from the API. For the query params, consult the API docs. | | getStreamByName(name) | Fetches a stream which exactly matches the given name. | | createStream([properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the API docs. If you specify id, it can be a full streamId or a path (e.g. /foo/bar will create a stream with id <your-ethereum-address>/foo/bar if you have authenticated with a private key)| | getOrCreateStream(properties) | Gets a stream with the id or name given in properties, or creates it if one is not found. | | publish(streamId, message, timestamp, partitionKey) | Publishes a new message to the given stream. |

Stream object

All the below functions return a Promise which gets resolved with the result.

| Name | Description | | :---------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | update() | Updates the properties of this stream object by sending them to the API. | | delete() | Deletes this stream. | | getPermissions() | Returns the list of permissions for this stream. | | hasPermission(operation, user) | Returns a permission object, or null if no such permission was found. Valid operation values for streams are: stream_get, stream_edit, stream_delete, stream_publish, stream_subscribe, and stream_share. user is the username of a user, or null for public permissions. | | grantPermission(operation, user) | Grants the permission to do operation to user, which are defined as above. | | revokePermission(permissionId) | Revokes a permission identified by its id. | | detectFields() | Updates the stream field config (schema) to match the latest data point in the stream. | | publish(message, timestamp, partitionKey) | Publishes a new message to this stream. |

Data Unions

This library provides functions for working with Data Unions. Please see the API Docs for auto-generated documentation on each Data Union endpoint.

To deploy a new DataUnion with default deployment options:

const dataUnion = await client.deployDataUnion()

To get an existing (previously deployed) DataUnion instance:

const dataUnion = client.getDataUnion(dataUnionAddress)

Or to verify untrusted (e.g. user) input, use:

const dataUnion = await client.safeGetDataUnion(dataUnionAddress)

Admin Functions

Admin functions require xDai tokens on the xDai network. To get xDai you can either use a faucet or you can reach out on the Streamr Discord #dev channel.

Adding members using admin functions is not at feature parity with the member function join. The newly added member will not be granted publish permissions to the streams inside the Data Union. This will need to be done manually using, streamr.grantPermission(stream_publish, user). Similarly, after removing a member using the admin function removeMembers, the publish permissions will need to be removed in a secondary step using revokePermission(permissionId).

| Name | Returns | Description | | :-------------------------------- | :------------------ | :------------------------------------------------------------- | | createSecret([name]) | string | Create a secret for a Data Union | | addMembers(memberAddressList) | Transaction receipt | Add members | | removeMembers(memberAddressList) | Transaction receipt | Remove members from Data Union | | setAdminFee(newFeeFraction[, ethersOptions]) ** | Transaction receipt | newFeeFraction is a Number between 0.0 and 1.0 (inclusive) | | withdrawAllToMember(memberAddress[, options]) | Transaction receipt * | Send all withdrawable earnings to the member's address | | withdrawAllToSigned(memberAddress, recipientAddress, signature[, options]) | Transaction receipt * | Send all withdrawable earnings to the address signed off by the member (see example below) | | withdrawAmountToSigned(memberAddress, recipientAddress, amountTokenWei, signature[, options]) | Transaction receipt * | Send some of the withdrawable earnings to the address signed off by the member |

* The return value type may vary depending on the given options that describe the use case. ** ethersOptions that setAdminFee takes can be found as "overrides" documented in docs.ethers.io.

Here's how to deploy a Data Union contract with 30% Admin fee and add some members:

import { StreamrClient } from 'streamr-client'

const client = new StreamrClient({
    auth: { privateKey },
})

const dataUnion = await client.deployDataUnion({
    adminFee: 0.3,
})
const receipt = await dataUnion.addMembers([
    "0x1234567890123456789012345678901234567890",
    "0x1234567890123456789012345678901234567891",
    "0x1234567890123456789012345678901234567892",
])

Member functions

| Name | Returns | Description | | :-------------------------------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------- | | join([secret]) | JoinRequest | Join the Data Union (if a valid secret is given, the promise waits until the automatic join request has been processed) | | part() | Transaction receipt | Leave the Data Union | isMember(memberAddress) | boolean | | | withdrawAll([options]) | Transaction receipt * | Withdraw funds from Data Union | | withdrawAllTo(recipientAddress[, options]) | Transaction receipt * | Donate/move your earnings to recipientAddress instead of your memberAddress | | signWithdrawAllTo(recipientAddress) | Signature (string) | Signature that can be used to withdraw all available tokens to given recipientAddress | | signWithdrawAmountTo(recipientAddress, amountTokenWei) | Signature (string) | Signature that can be used to withdraw a specific amount of tokens to given recipientAddress | | transportMessage(messageHash[, pollingIntervalMs[, retryTimeoutMs]]) | Transaction receipt | Send the mainnet transaction to withdraw tokens from the sidechain |

* The return value type may vary depending on the given options that describe the use case.

Here's an example on how to sign off on a withdraw to (any) recipientAddress (NOTE: this requires no gas!)

import { StreamrClient } from 'streamr-client'

const client = new StreamrClient({
    auth: { privateKey },
})

const dataUnion = client.getDataUnion(dataUnionAddress)
const signature = await dataUnion.signWithdrawAllTo(recipientAddress)

Later, anyone (e.g. Data Union admin) can send that withdraw transaction to the blockchain (and pay for the gas)

import { StreamrClient } from 'streamr-client'

const client = new StreamrClient({
    auth: { privateKey },
})

const dataUnion = client.getDataUnion(dataUnionAddress)
const receipt = await dataUnion.withdrawAllToSigned(memberAddress, recipientAddress, signature)

The messageHash argument to transportMessage will come from the withdraw function with the specific options. The following is equivalent to the above withdraw line:

const messageHash = await dataUnion.withdrawAllToSigned(memberAddress, recipientAddress, signature, {
    payForTransport: false,
    waitUntilTransportIsComplete: false,
}) // only pay for sidechain gas
const receipt = await dataUnion.transportMessage(messageHash) // only pay for mainnet gas

Query functions

These are available for everyone and anyone, to query publicly available info from a Data Union:

| Name | Returns | Description | | :--------------------------------------------------------- | :--------------------------------------------- | :-------------------------------------- | | getStats() | {activeMemberCount, totalEarnings, ...} | Get Data Union's statistics | | getMemberStats(memberAddress) | {earnings, proof, ...} | Get member's stats | | getWithdrawableEarnings(memberAddress) | BigNumber withdrawable DATA tokens in the DU | | | getAdminFee() | Number between 0.0 and 1.0 (inclusive) | Admin's cut from revenues | | getAdminAddress() | Ethereum address | Data union admin's address | | getVersion() | 0, 1 or 2 | 0 if the contract is not a data union |

Here's an example how to get a member's withdrawable token balance (in "wei", where 1 DATA = 10^18 wei)

import { StreamrClient } from 'streamr-client'

const dataUnion = new StreamrClient().getDataUnion(dataUnionAddress)
const withdrawableWei = await dataUnion.getWithdrawableEarnings(memberAddress)

Withdraw options

The functions withdrawAll, withdrawAllTo, withdrawAllToMember, withdrawAllToSigned, withdrawAmountToSigned all can take an extra "options" argument. It's an object that can contain the following parameters:

| Name | Default | Description | | :---------------- | :-------------------- | :-------------------------------------------------------------------------------------- | | sendToMainnet | true | Whether to send the withdrawn DATA tokens to mainnet address (or sidechain address) | | payForTransport | true | Whether to pay for the withdraw transaction signature transport to mainnet over the bridge| | waitUntilTransportIsComplete | true | Whether to wait until the withdrawn DATA tokens are visible in mainnet | | pollingIntervalMs | 1000 (1 second) | How often requests are sent to find out if the withdraw has completed | | retryTimeoutMs | 60000 (1 minute) | When to give up when waiting for the withdraw to complete | | gasPrice | network estimate | Ethereum Mainnet transaction gas price to use when transporting tokens over the bridge |

These withdraw transactions are sent to the sidechain, so gas price shouldn't be manually set (fees will hopefully stay very low), but a little bit of sidechain native token is nonetheless required.

The return values from the withdraw functions also depend on the options.

If sendToMainnet: false, other options don't apply at all, and sidechain transaction receipt is returned as soon as the withdraw transaction is done. This should be fairly quick in the sidechain.

The use cases corresponding to the different combinations of the boolean flags:

| transport | wait | Returns | Effect | | :---------- | :------ | :------ | :----- | | true | true | Transaction receipt | (default) Self-service bridge to mainnet, client pays for mainnet gas | | true | false | Transaction receipt | Self-service bridge to mainnet (but skip the wait that double-checks the withdraw succeeded and tokens arrived to destination) | | false | true | null | Someone else pays for the mainnet gas automatically, e.g. the bridge operator (in this case the transaction receipt can't be returned) | | false | false | AMB message hash | Someone else pays for the mainnet gas, but we need to give them the message hash first |

Deployment options

deployDataUnion can take an options object as the argument. It's an object that can contain the following parameters:

| Name | Type | Default | Description | | :------------------------ | :-------- | :-------------------- | :------------------------------------------------------------------------------------ | | owner | Address |*you | Owner / admin of the newly created Data Union | | joinPartAgents | Address[] |*you, Streamr Core | Able to add and remove members to/from the Data Union | | dataUnionName | string | Generated | NOT stored anywhere, only used for address derivation | | adminFee | number | 0 (no fee) | Must be between 0...1 (inclusive) | | sidechainPollingIntervalMs| number | 1000 (1 second) | How often requests are sent to find out if the deployment has completed | | sidechainRetryTimeoutMs | number | 60000 (1 minute) | When to give up when waiting for the deployment to complete | | confirmations | number | 1 | Blocks to wait after Data Union mainnet contract deployment to consider it final | | gasPrice | BigNumber | network estimate | Ethereum Mainnet gas price to use when deploying the Data Union mainnet contract |

*you here means the address of the authenticated StreamrClient (that corresponds to the auth.privateKey given in constructor)

Streamr Core is added as a joinPartAgent by default so that joining with secret works using the member function join. If you don't plan to use join for "self-service joining", you can leave out Streamr Core agent by calling deployDataUnion e.g. with your own address as the sole joinPartAgent:

const dataUnion = await client.deployDataUnion({
    joinPartAgents: [yourAddress],
    adminFee,
})

dataUnionName option exists purely for the purpose of predicting the addresses of Data Unions not yet deployed. Data Union deployment uses the CREATE2 opcode which means a Data Union deployed by a particular address with particular "name" will have a predictable address.

Utility functions

| Name | Returns | Description | | :-------------------------------------- | :---------------------- | :--------------- | | * generateEthereumAccount() | {address, privatekey} | Generates a random Ethereum account | | getTokenBalance(address) | BigNumber | Mainnet DATA token balance | | getSidechainTokenBalance(address) | BigNumber | Sidechain DATA token balance |

* The static function StreamrClient.generateEthereumAccount() generates a new Ethereum private key and returns an object with fields address and privateKey. Note that this private key can be used to authenticate to the Streamr API by passing it in the authentication options, as described earlier in this document.

Events

The client and the subscriptions can fire events as detailed below. You can bind to them using on.

| Name | Description | | :---------------------------------- | :--------------------------------------------------------------------------------------- | | on(eventName, function) | Binds a function to an event called eventName | | once(eventName, function) | Binds a function to an event called eventName. It gets called once and then removed. | | removeListener(eventName, function) | Unbinds the function from events called eventName |

Events on the StreamrClient instance

| Name | Handler Arguments | Description | | :----------- | :---------------- | :--------------------------------------------------------------- | | connected | | Fired when the client has connected (or reconnected). | | disconnected | | Fired when the client has disconnected (or paused). | | error | Error | Fired when the client encounters an error e.g. connection issues |

// The StreamrClient emits various events
client.on('connected', () => {
    // note no need to wait for this before doing work,
    // with autoconnect enabled the client will happily establish a connection for you as required.
    console.log('Yeah, we are connected now!')
})

Events on the Subscription object

| Name | Handler Arguments | Description | | :----------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | :-------------------------------------------------------------------------------------------------- | | unsubscribed | | Fired when an unsubscription is acknowledged by the server. | | resent | ResendResponseResent | Fired after resending when the subscription has finished resending and message has been processed | | error | Error object | Reports errors, for example problems with message content |

Stream Partitioning

Partitioning (sharding) enables streams to scale horizontally. This section describes how to use partitioned streams via this library. To learn the basics of partitioning, see the docs.

Creating partitioned streams

By default, streams only have 1 partition when they are created. The partition count can be set to any positive number (1-100 is reasonable). An example of creating a partitioned stream using the JS client:

const stream = await client.createStream({
    name: 'My partitioned stream',
    partitions: 10,
})
console.log(`Stream created: ${stream.id}. It has ${stream.partitions} partitions.`)

Publishing to partitioned streams

In most use cases, a user wants related events (e.g. events from a particular device) to be assigned to the same partition, so that the events retain a deterministic order and reach the same subscriber(s) to allow them to compute stateful aggregates correctly.

The library allows the user to choose a partition key, which simplifies publishing to partitioned streams by not requiring the user to assign a partition number explicitly. The same partition key always maps to the same partition. In an IoT use case, the device id can be used as partition key; in user interaction data it could be the user id, and so on.

The partition key can be given as an argument to the publish methods, and the library assigns a deterministic partition number automatically:

await client.publish('my-stream-id', msg, Date.now(), msg.vehicleId)

// or, equivalently
await stream.publish(msg, Date.now(), msg.vehicleId)

Subscribing to partitioned streams

By default, the JS client subscribes to the first partition (partition 0) in a stream. The partition number can be explicitly given in the subscribe call:

const sub = await client.subscribe({
    stream: 'my-stream-id',
    partition: 4, // defaults to 0
}, (payload) => {
    console.log('Got message %o', payload)
})

Or, to subscribe to multiple partitions, if the subscriber can handle the volume:

const handler = (payload, streamMessage) => {
    console.log('Got message %o from partition %d', payload, streamMessage.getStreamPartition())
}

await Promise.all([2, 3, 4].map(async (partition) => {
    await client.subscribe({
        stream: 'my-stream-id',
        partition,
    }, handler)
}))

Logging

The Streamr JS client library supports debug for logging.

In node.js, start your app like this: DEBUG=StreamrClient* node your-app.js

In the browser, set localStorage.debug = 'StreamrClient*'

For Developers

Publishing to npm is automated via Github Actions. Follow the steps below to publish latest or beta.

For more technical documentation on the Data Unions API, see the JS Client API Docs. These can also be rebuilt locally via:

npm run docs

Publishing latest

  1. Update version with either npm version [patch|minor|major]. Use semantic versioning https://semver.org/. Files package.json and package-lock.json will be automatically updated, and an appropriate git commit and tag created.

  2. git push --follow-tags

  3. Wait for Github Actions to run tests

  4. If tests passed, Github Actions will publish the new version to npm

Publishing beta

  1. Update version with either npm version [prepatch|preminor|premajor] --preid=beta. Use semantic versioning https://semver.org/. Files package.json and package-lock.json will be automatically updated, and an appropriate git commit and tag created.

  2. git push --follow-tags

  3. Wait for Github Actions to run tests

  4. If tests passed, Github Actions will publish the new version to npm