npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@sequinstream/sequin-js

v0.1.0

Published

A lightweight JavaScript SDK for sending, receiving, and acknowledging messages in Sequin streams

Downloads

21

Readme

sequin-js

A lightweight JavaScript SDK for sending, receiving, and acknowledging messages in Sequin streams. For easy development and testing, it also comes with helpful methods for managing the lifecycle of streams and consumers.

Installing

Install the library:

npm i @sequinstream/sequin-js --save

Initializing

You'll typically initialize a Sequin Client once in your application. Create a new file to initialize the Client in, and export it for use in other parts of your app:

// sequin.js

import { Client } from `@sequinstream/sequin-js`

const baseUrl = process.env.SEQUIN_URL || 'http://localhost:7673'

const sequin = Client.init({ baseUrl })

export default sequin

By default, the Client is initialized using Sequin's default host and port in local development: http://localhost:7673

Usage

You'll predominately use sequin-js to send, receive, and acknowledge messages in Sequin streams:

// Import the Sequin client from sequin.js
import sequin from './sequin.js';

// Define your stream and consumer
const stream = 'your-stream-name';
const consumer = 'your-consumer-name';

// Send a message
const { res, error } = await sequin.sendMessage(
  stream,
  'test.1',
  'Hello, Sequin!'
)
if (error) {
  console.error('Error sending messages:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Messages sent successfully', res);
}

// Receive a message
const { res: message, error: receiveError } = await sequin.receiveMessage(stream, consumer);
if (receiveError) {
  console.error('Error receiving message:', error.summary);
} else {
  console.log('Received message:', message);
  // Don't forget to acknowledge the message
  const { res: ack, error: ackError } = await sequin.ackMessage(stream, consumer, [message.ack_id]);
  if (ackError) {
    console.log('Error acking message:' ackError.summary)
  }
  else {
    console.log('Message acked');
  }
}

sendMessage()

Send a message to a stream:

const { res, error } = sequin.sendMessage(stream_id_or_name, key, data)

Parameters

sendMessage() accepts three arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • key (string): The key for the message.
  • data (string): The data payload for the message. Can be either an object or a string (objects will be JSON encoded).

Returns

sendMessage() will return an object with two keys, res and error:

Success

{
  "res": {
    "published": 1
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 404,
    "summary": "stream not found"
  },
}

Example

const { res, error } = await sequin.sendMessage(
  'my_stream',
  'greeting.1',
  'Hello, Sequin!'
)
if (error) {
  console.error('Error sending messages:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Messages sent successfully', res.published );
}

sendMessages()

Send a batch of messages (max 1,000):

const { res, error } = sequin.sendMessages(stream_id_or_name, messages)

Parameters

sendMessages() accepts two arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • messages (array): An array of message objects:
[
  {
    key: "message_key_1",
    data: "data_payload_1"
  },
  {
    key: "message_key_2",
    data: "data_payload_2"
  },
  { ... }
]

Returns

sendMessages() will return an object with a two keys, res and error:

[!IMPORTANT] sendMessages() is all or nothing. Either all the messages are successfully published, or none of the messages are published.

Success

{
  "res": {
    "published": 42
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 404,
    "summary": "Stream not found"
  },
}

Example

let messages = [
  {
    key: 'test.1',
    data: 'Hello, Sequin!'
  },
  {
    key: 'test.2',
    data: 'Greetings from Sequin!'
  }
];


const { res, error } = await sequin.sendMessages('my_stream', messages);
if (error) {
  console.error('Error sending messages:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Messages sent successfully', res.published );
}

receiveMessage()

To pull a single messages off the stream using a Sequin consumer, you'll use the receiveMessage() function:

const { res, error } = sequin.receiveMessage(stream_id_or_name, consumer_id_or_name)

Parameters

receiveMessage() accepts two arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • consumer_id_or_name (string): Either the name or id of the stream.

Returns

receiveMessage() will return an object with two keys, res and error:

No messages available

{
  "res": null,
  "error": null
}

Message

{
  "res": {
    "message": {
        "key": "test.1",
        "stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
        "data": "Hello, Sequin!",
        "seq": 1,
        "inserted_at": "2024-07-23T00:31:55.668060Z",
        "updated_at": "2024-07-23T00:31:55.668060Z"
    },
    "ack_id": "07240856-96cb-4305-9b2f-620f4b1528a4"
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 404,
    "summary": "Consumer not found."
  }
}

Example

const { res, error } = await sequin.receiveMessage('my_stream', 'my_consumer');
if (error) {
  console.error('Error receiving messages:', error.summary);
  // Handle the error appropriately
}else if (res === null) {
  console.log('No messages available');
} else {
  console.log('Messages received successfully', res );
}

receiveMessages()

You can pull a batch of messages for your consumer using recieveMesages(). receiveMessages() pulls a batch of 10 messages by default:

const { res, error} = sequin.receiveMessages(stream_id_or_name, consumer_id_or_name, options?)

Parameters

receiveMessage() accepts three arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • consumer_id_or_name (string): Either the name or id of the consumer.
  • options (object, optional): An object that defines optional parameters:
    • batch_size (integer): The number of messages to request. Default is 10, max of 1,000.

Returns

receiveMessage() will return an object with a two keys, res and error:

No messages available

{
  "res": [], //empty array
  "error": null
}

Messages

{
  "res": [
    {
      message: {
          "key": "test.1",
          "stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
          "data": "Hello, Sequin!",
          "seq": 1,
          "inserted_at": "2024-07-23T00:31:55.668060Z",
          "updated_at": "2024-07-23T00:31:55.668060Z"
      },
      ack_id: "07240856-96cb-4305-9b2f-620f4b1528a4"
    },
    { ... },
    { ... }
  ],
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 404,
    "summary": "Consumer not found."
  }
}

Example

const { res, error } = await sequin.receiveMessage('my_stream', 'my_consumer', { batch_size: 100 });
if (error) {
  console.error('Error receiving messages:', error.summary);
  // Handle the error appropriately
} else if (res === []) {
  console.log('No messages available');
} else {
  console.log('Messages received successfully', res );
}

ackMessage()

After processing a message, you can acknowledge it using ackMessage():

const { res, error } = sequin.ackMessage(stream_id_or_name, consumer_id_or_name, ack_id)

Parameters

ackMessage() accepts three arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • consumer_id_or_name (string): Either the name or id of the stream.
  • ack_id (string): The ack_id for the messages you want to ack.

Returns

ackMessage() will return an object with a two keys, res and error:

Success

{
  "res": {
    "success": true
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 400,
    "summary": "Invalid ack_id."
  },
}

Example

const { res, error } = await sequin.ackMessage('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4');

if (error) {
  console.error('Error acknowledging message:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Message acknowledged successfully', res);
}

ackMessages()

You can also acknowledge a batch of messages using ackMessages():

const { res, error } = sequin.ackMessages(stream_id_or_name, consumer_id_or_name, ack_ids)

Parameters

ackMessages() accepts three arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • consumer_id_or_name (string): Either the name or id of the stream.
  • ack_ids (array): An array of ack_id strings for the messages you want to ack.

Returns

ackMessages() will return an object with a two keys, res and error:

[!IMPORTANT] ackMessages() is all or nothing. Either all the messages are successfully acknowledged, or none of the messages are acknowledged.

Success

{
  "res": {
    "success": true
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 400,
    "summary": "Invalid ack_id."
  },
}

Example

const { res, error } = await sequin.ackMessages('my_stream', 'my_consumer', ['07240856-96cb-4305-9b2f-620f4b1528a4', '522c69a1-0bbe-49ec-9d0d-e39b40d483f8']);

if (error) {
  console.error('Error acknowledging message:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Message acknowledged successfully', res);
}

nackMessage()

Or, you can nack a message using nackMessage():

const { res, error } = sequin.nackMessage(stream_id_or_name, consumer_id_or_name, ack_id)

Parameters

nackMessage() accepts three arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • consumer_id_or_name (string): Either the name or id of the stream.
  • ack_id (string): The ack_id for the message to not acknowledge.

Returns

nackMessage() will return an object with a two keys, res and error:

Success

{
  "res": {
    "success": true
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 400,
    "summary": "Invalid ack_id"
  },
}

Example

const { res, error } = await sequin.nackMessage('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4');

if (error) {
  console.error('Error acknowledging message:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Message acknowledged successfully', res);
}

nackMessages()

Or, you can nack a batch of messages using nackMessages():

const { res, error } = sequin.nackMessages(stream_id_or_name, consumer_id_or_name, ack_ids)

Parameters

nackMessages() accepts three arguments:

  • stream_id_or_name (string): Either the name or id of the stream.
  • consumer_id_or_name (string): Either the name or id of the stream.
  • ack_ids (array): An array of ack_id strings for the messages to not acknowledge.

Returns

nackMessages() will return an object with a two keys, res and error:

[!IMPORTANT] nackMessages() is all or nothing. Either all the messages are successfully nacked, or none of the messages are nacked.

Success

{
  "res": {
    "success": true
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 400,
    "summary": "Invalid ack_id"
  },
}

Example

const { res, error } = await sequin.nackMessages('my_stream', 'my_consumer', ['07240856-96cb-4305-9b2f-620f4b1528a4', '522c69a1-0bbe-49ec-9d0d-e39b40d483f8']);

if (error) {
  console.error('Error acknowledging message:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Message acknowledged successfully', res);
}

createStream()

Creating streams can be helpful in automated testing. You can create a new stream using createStream():

const { res, error } = sequin.createStream(stream_name, options?)

Parameters

createStream() accepts two parameters:

  • name (string): The name of the stream you want to create.
  • options (object, optional): An object of key:value pairs that define optional parameters:
    • one_message_per_key (bool)
    • process_unmodified (bool)
    • max_storage_gb (integer)
    • retain_up_to (integer)
    • retain_at_least (integer)

Returns

createStream() will return an object with a two keys, res and error:

Success

{
  "res": {
    "id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
    "name": "my_stream",
    "account_id": "8b930c30-2334-4339-b7ba-f250b7be223e",
    "stats": {
      "message_count": 0,
      "consumer_count": 0,
      "storage_size": 163840
    },
    "inserted_at": "2024-07-24T20:02:46Z",
    "updated_at": "2024-07-24T20:02:46Z"
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 422,
    "summary": "Validation failed: duplicate name"
  },
}

Example

const { res, error } = await sequin.createStream('my_stream')
if (error) {
  console.error('Error creating stream:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Stream created successfully', res );
}

deleteStream()

Deleting streams can be helpful in automated testing. You can delete a stream using deleteStream():

const { res, error } = sequin.deleteStream(stream_id_or_name)

Parameters

deleteStream() accepts one parameter:

  • stream_id_or_name (string): The id or name of the stream you want to delete.

Returns

deleteStream() will return an object with a two keys, res and error:

Successful delete

{
  "res": {
    "id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
    "deleted": true
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 404,
    "summary": "Not found: No `stream` found matching the provided ID or name"
  },
}

Example

const { res, error } = await sequin.deleteStream('my_stream')
if (error) {
  console.error('Error deleting stream:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Stream deleted successfully', res );
}

createConsumer()

Creating consumers can be helpful in automated testing. You can create a new consumer using createConsumer():

const { res, error } = sequin.createConsumer(stream_id_or_name, consumer_name, consumer_filter, options?)

Parameters

createConsumer() accepts two parameters:

  • stream_id_or_name (string): The id or name of the stream you want to attach the consumer to.
  • name (string): The name of the consumer you want to create.
  • filter (string): The filter pattern the consumer will use to pull messages off the stream.
  • options (object, optional): An object of key:value pairs that define optional parameters:
    • ack_wait_ms (integer): Acknowledgement wait time in milliseconds
    • max_ack_pending (integer): Maximum number of pending acknowledgements
    • max_deliver (integer): Maximum number of delivery attempts

Returns

createConsumer() will return an object with a two keys, res and error:

Success

{
  "res": {
    "ack_wait_ms": 30000,
    "filter_key_pattern": "test.>",
    "id": "67df6362-ba21-4ddc-8601-55d404bacaeb",
    "inserted_at": "2024-07-24T20:12:20Z",
    "kind": "pull",
    "max_ack_pending": 10000,
    "max_deliver": null,
    "max_waiting": 20,
    "name": "my_consumer",
    "stream_id": "15b1f003-3a47-4371-8331-6437cb48477e",
    "updated_at": "2024-07-24T20:12:20Z",
    "http_endpoint_id": null,
    "status": "active"
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 422,
    "summary": "Validation failed: duplicate name"
  },
}

Example

const { res, error } = await sequin.createConsumer('my_stream', 'my_consumer', 'test.>')
if (error) {
  console.error('Error creating consumer:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Consumer created successfully', res );
}

deleteConsumer()

Deleting consumers can be helpful in automated testing. You can delete a consumer using deleteConsumer():

const { res, error } = sequin.deleteConsumer(stream_id_or_name, consumer_id_or_name)

Parameters

deleteConsumer() accepts two parameters:

  • stream_id_or_name (string): The id or name of the stream associated to the consumer you want to delete.
  • consumer_id_or_name (string): The id or name of the consumer you want to delete.

Returns

deleteConsumer() will return an object with a two keys, res and error:

Successful delete

{
  "res": {
    "id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
    "deleted": true
  },
  "error": null
}

Error

{
  "res": null,
  "error": {
    "status": 404,
    "summary": "Not found: No `consumer` found matching the provided ID or name"
  },
}

Example

const { res, error } = await sequin.deleteConsumer('my_stream', 'my_consumer')
if (error) {
  console.error('Error deleting consumer:', error.summary);
  // Handle the error appropriately
} else {
  console.log('Consumer deleted successfully', res );
}

Testing

To adequately test Sequin, we recommend creating temporary streams and consumers in addition to testing sending and receiving messages. Here's an example using jest:

import sequin from './sequin.js';

describe('Sequin Stream and Consumer Test', () => {
  const streamName = `test-stream-${Date.now()}`;
  const consumerName = `test-consumer-${Date.now()}`;

  test('Stream and Consumer Lifecycle', async () => {
    // Create a new stream
    const { res: stream, error: streamError } = await client.createStream(streamName);
    expect(stream.name).toBe(streamName);

    // Create a consumer
    const { res: consumer, error: consumerError } = await client.createConsumer(streamName, consumerName, 'test.>');
    expect(consumer.name).toBe(consumerName);

    // Send a message
    await sequin.sendMessage(streamName, 'test.1', 'Hello, Sequin!');
    expect(sendRes).toHaveProperty('published', 1);

    // Receive and ack a message
    const { res: receiveRes } = await sequin.receiveMessage(streamName, consumerName);
    // Add tests for your business logic
    await sequin.ackMessages(streamName, consumerName, [receiveRes.ack_id]);

    // Delete the consumer
    const { res: deleteConsumerRes, error: deleteConsumerError } = await client.deleteConsumer(streamName, consumerName);
    expect(deleteConsumerRes).toHaveProperty('deleted', true);

    // Delete the stream
    const { res: deleteStreamRes, error: deleteStreamError } = await client.deleteStream(streamName);
    expect(deleteStreamRes).toHaveProperty('deleted', true);
  });
});