npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

planetscale-stream-ts

v1.1.0

Published

Run Vitess VStream & Messaging streams on PlanetScale instances, in Typescript.

Downloads

128

Readme

planetscale-stream-ts

npm version build

This package exports two classes for streaming data from a PlanetScale database:

  • PlanetScaleMessagingStream: For reading a Vitess Messaging stream
  • PlanetScaleVStream: For reading a Vitess VStream (change data capture) stream


PlanetScaleMessagingStream

Offers a method, stream(), which returns an async iterable for consuming messages from a Vitess Messaging stream.

Note that messages need to be acknowledged, otherwise they will be redelivered. Use the ack() method to acknowledge messages.

This class uses PlanetScale’s psdb gRPC API, which is a slimmed down version of the Vitess queryservice. It’s an alpha API, without much documentation, whose purpose I’m unsure of, and which has not been publicized in any real way, so use with caution.

See the Vitess documentation for more information on Vitess Messaging, including instructions on how to create a messaging table:

Parameters

Constructor

| Parameter | Description | | :------------------- | :------------------------------------------------------------ | | db_config | Database connection config | | db_config.host | PlanetScale host | | db_config.database | PlanetScale database name | | db_config.username | PlanetScale branch username | | db_config.password | PlanetScale branch password | | table_name | The name of the messaging table from which to stream messages | | table_primary_key | The name of the primary key field in the messaging table |

Method: stream()

The stream() method uses named parameters:

| Parameter | Description | | :----------------- | :---------------------------------------------------------------------------------------- | | read_duration_ms | (Optional) The duration for which the stream will be read. Omit to stream indefinitely. |

Method: ack()

The ack() method uses one positional parameter:

| Parameter | Description | | :-------- | :----------------------------------------------------- | | keys | An array of message primary key values to acknowledge. |

Usage

import { PlanetScaleMessagingStream } from 'planetscale-stream-ts';

const messenger = new PlanetScaleMessagingStream({
    db_config: {
        host: 'aws.connect.psdb.cloud',
        database: 'my_db',
        username: 'my_user',
        password: '<secret>',
    },
    table_name: 'my_message',
    table_primary_key: 'id',
});

const stream = messenger.stream({ read_duration_ms: 30 * 1000 });

for await (const { messages } of stream) {
    // Log out messages
    console.dir(messages, { depth: null });

    // Acknowledge messages using primary key values
    const keys = messages.map(r => r.id);
    void messenger.ack(keys);
}

Example


PlanetScaleVStream

Offers a method, stream(), which returns an async iterable for consuming messages from a Vitess VStream.

This class uses the psdbconnect gRPC API, which is used for e.g. the Connect Airbyte adapter. This API is in alpha, so use with caution.

See the Vitess documentation for more information on VStream:

Parameters

Constructor

Note that this db_config includes a use_replica boolean.

| Parameter | Description | | :---------------------- | :------------------------------------------------- | | db_config | Database connection config | | db_config.host | PlanetScale host | | db_config.database | PlanetScale database name | | db_config.username | PlanetScale branch username | | db_config.password | PlanetScale branch password | | db_config.use_replica | Whether to use the branch replica | | table_name | The name of the table from which to stream changes |

Method: stream()

The stream() method uses named parameters:

| Parameter | Description | | :----------------- | :---------------------------------------------------------------------------------------- | | starting_cursor | The table cursor from which the stream will be read. | | read_duration_ms | (Optional) The duration for which the stream will be read. Omit to stream indefinitely. | | stop_position | (Optional) The VGtid position at which to stop. |

Determining the starting cursor

The TableCursor encodes the keyspace, shard, and VGtid position from which the stream will begin.

| Parameter | Description | | :--------- | :----------------------------------------------- | | keyspace | The keyspace from which to stream changes. | | shard | The shard from which to stream changes. | | position | The VGtid position from which to stream changes. |

The position parameter has two special values:

  • undefined: Stream will start from the start of the binlog
    • PlanetScale retains binlog records for 3 days, by default
      • Run SHOW VARIABLES LIKE 'binlog_expire_logs_seconds' to confirm
  • "current": Stream will start from the current moment

Keyspace and shard values can be found by querying the database:

  • SHOW KEYSPACES: Lists keyspaces
  • SHOW VITESS_SHARDS: Lists shards in each keyspace, using format {keyspace}/{shard}

Usage

import { PlanetScaleVStream, TableCursor } from 'planetscale-stream-ts';

const vstream = new PlanetScaleVStream({
    db_config: {
        host: 'aws.connect.psdb.cloud',
        database: 'my_db',
        username: 'my_user',
        password: '<secret>',
        use_replica: true,
    },
    table_name: 'my_table',
});

const stream = vstream.stream({
    starting_cursor: new TableCursor({
        keyspace: 'my_keyspace',
        shard: '-',
        position: 'current',
    }),
    read_duration_ms: 30 * 1000,
});

for await (const { cursor, inserts, updates, deletes } of stream) {
    // Log out stream cursor position (VGtid)
    console.log('streamed up to:', cursor?.position);

    // Log out changes
    console.dir({ mod: 'INSERTS', data: inserts }, { depth: null });
    console.dir({ mod: 'UPDATES', data: updates }, { depth: null });
    console.dir({ mod: 'DELETES', data: deletes }, { depth: null });
}

Example


Using the examples

This repository includes two example scripts, one for each class, in the examples/ folder.

Before running an example, copy the .env.template file to .env and set the correct environment variables. Both of the examples additionally require some configuration values to be set in the files themselves, indicated by the @todo comments.

After running pnpm install, run the examples using the scripts in package.json:

  • pnpm run messaging runs the PlanetScale Messaging example
    • See examples/messaging.ts
  • pnpm run vstream runs the PlanetScale VStream example
    • See examples/vstream.ts

See the Example sections in the documentation above for screencaps of behaviour.


Protocol buffers

Sources

The .proto files in the proto/psdb directory have been copied in from the planetscale/psdb repository. Those in proto/vitess come from Vitess.

Conversion to TypeScript

TypeScript equivalents are generated using the @bufbuild/protoc-gen-es package and the @connectrpc/protoc-gen-connect-es plugin. Code generation is configured in buf.yaml and buf.gen.yaml, and generated code is saved to src/generated.

Run pnpm run generate to regenerate src/generated.

API clients

gRPC API clients for the two PlanetScale APIs are created in the src/clients directory using @connectrpc/connect and @connectrpc/connect-node.