@sequinstream/sequin-js
v0.1.0
Published
A lightweight JavaScript SDK for sending, receiving, and acknowledging messages in Sequin streams
Downloads
21
Readme
sequin-js
A lightweight JavaScript SDK for sending, receiving, and acknowledging messages in Sequin streams. For easy development and testing, it also comes with helpful methods for managing the lifecycle of streams and consumers.
Installing
Install the library:
npm i @sequinstream/sequin-js --save
Initializing
You'll typically initialize a Sequin Client
once in your application. Create a new file to initialize the Client
in, and export it for use in other parts of your app:
// sequin.js
import { Client } from `@sequinstream/sequin-js`
const baseUrl = process.env.SEQUIN_URL || 'http://localhost:7673'
const sequin = Client.init({ baseUrl })
export default sequin
By default, the Client is initialized using Sequin's default host and port in local development: http://localhost:7673
Usage
You'll predominately use sequin-js
to send, receive, and acknowledge messages in Sequin streams:
// Import the Sequin client from sequin.js
import sequin from './sequin.js';
// Define your stream and consumer
const stream = 'your-stream-name';
const consumer = 'your-consumer-name';
// Send a message
const { res, error } = await sequin.sendMessage(
stream,
'test.1',
'Hello, Sequin!'
)
if (error) {
console.error('Error sending messages:', error.summary);
// Handle the error appropriately
} else {
console.log('Messages sent successfully', res);
}
// Receive a message
const { res: message, error: receiveError } = await sequin.receiveMessage(stream, consumer);
if (receiveError) {
console.error('Error receiving message:', error.summary);
} else {
console.log('Received message:', message);
// Don't forget to acknowledge the message
const { res: ack, error: ackError } = await sequin.ackMessage(stream, consumer, [message.ack_id]);
if (ackError) {
console.log('Error acking message:' ackError.summary)
}
else {
console.log('Message acked');
}
}
sendMessage()
Send a message to a stream:
const { res, error } = sequin.sendMessage(stream_id_or_name, key, data)
Parameters
sendMessage()
accepts three arguments:
stream_id_or_name
(string): Either the name or id of the stream.key
(string): The key for the message.data
(string): The data payload for the message. Can be either an object or a string (objects will be JSON encoded).
Returns
sendMessage()
will return an object with two keys, res
and error
:
Success
{
"res": {
"published": 1
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 404,
"summary": "stream not found"
},
}
Example
const { res, error } = await sequin.sendMessage(
'my_stream',
'greeting.1',
'Hello, Sequin!'
)
if (error) {
console.error('Error sending messages:', error.summary);
// Handle the error appropriately
} else {
console.log('Messages sent successfully', res.published );
}
sendMessages()
Send a batch of messages (max 1,000):
const { res, error } = sequin.sendMessages(stream_id_or_name, messages)
Parameters
sendMessages()
accepts two arguments:
stream_id_or_name
(string): Either the name or id of the stream.messages
(array): An array of message objects:
[
{
key: "message_key_1",
data: "data_payload_1"
},
{
key: "message_key_2",
data: "data_payload_2"
},
{ ... }
]
Returns
sendMessages()
will return an object with a two keys, res
and error
:
[!IMPORTANT]
sendMessages()
is all or nothing. Either all the messages are successfully published, or none of the messages are published.
Success
{
"res": {
"published": 42
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 404,
"summary": "Stream not found"
},
}
Example
let messages = [
{
key: 'test.1',
data: 'Hello, Sequin!'
},
{
key: 'test.2',
data: 'Greetings from Sequin!'
}
];
const { res, error } = await sequin.sendMessages('my_stream', messages);
if (error) {
console.error('Error sending messages:', error.summary);
// Handle the error appropriately
} else {
console.log('Messages sent successfully', res.published );
}
receiveMessage()
To pull a single messages off the stream using a Sequin consumer, you'll use the receiveMessage()
function:
const { res, error } = sequin.receiveMessage(stream_id_or_name, consumer_id_or_name)
Parameters
receiveMessage()
accepts two arguments:
stream_id_or_name
(string): Either the name or id of the stream.consumer_id_or_name
(string): Either the name or id of the stream.
Returns
receiveMessage()
will return an object with two keys, res
and error
:
No messages available
{
"res": null,
"error": null
}
Message
{
"res": {
"message": {
"key": "test.1",
"stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
"data": "Hello, Sequin!",
"seq": 1,
"inserted_at": "2024-07-23T00:31:55.668060Z",
"updated_at": "2024-07-23T00:31:55.668060Z"
},
"ack_id": "07240856-96cb-4305-9b2f-620f4b1528a4"
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 404,
"summary": "Consumer not found."
}
}
Example
const { res, error } = await sequin.receiveMessage('my_stream', 'my_consumer');
if (error) {
console.error('Error receiving messages:', error.summary);
// Handle the error appropriately
}else if (res === null) {
console.log('No messages available');
} else {
console.log('Messages received successfully', res );
}
receiveMessages()
You can pull a batch of messages for your consumer using recieveMesages()
. receiveMessages()
pulls a batch of 10
messages by default:
const { res, error} = sequin.receiveMessages(stream_id_or_name, consumer_id_or_name, options?)
Parameters
receiveMessage()
accepts three arguments:
stream_id_or_name
(string): Either the name or id of the stream.consumer_id_or_name
(string): Either the name or id of the consumer.options
(object, optional): An object that defines optional parameters:batch_size
(integer): The number of messages to request. Default is 10, max of 1,000.
Returns
receiveMessage()
will return an object with a two keys, res
and error
:
No messages available
{
"res": [], //empty array
"error": null
}
Messages
{
"res": [
{
message: {
"key": "test.1",
"stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
"data": "Hello, Sequin!",
"seq": 1,
"inserted_at": "2024-07-23T00:31:55.668060Z",
"updated_at": "2024-07-23T00:31:55.668060Z"
},
ack_id: "07240856-96cb-4305-9b2f-620f4b1528a4"
},
{ ... },
{ ... }
],
"error": null
}
Error
{
"res": null,
"error": {
"status": 404,
"summary": "Consumer not found."
}
}
Example
const { res, error } = await sequin.receiveMessage('my_stream', 'my_consumer', { batch_size: 100 });
if (error) {
console.error('Error receiving messages:', error.summary);
// Handle the error appropriately
} else if (res === []) {
console.log('No messages available');
} else {
console.log('Messages received successfully', res );
}
ackMessage()
After processing a message, you can acknowledge it using ackMessage()
:
const { res, error } = sequin.ackMessage(stream_id_or_name, consumer_id_or_name, ack_id)
Parameters
ackMessage()
accepts three arguments:
stream_id_or_name
(string): Either the name or id of the stream.consumer_id_or_name
(string): Either the name or id of the stream.ack_id
(string): Theack_id
for the messages you want to ack.
Returns
ackMessage()
will return an object with a two keys, res
and error
:
Success
{
"res": {
"success": true
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 400,
"summary": "Invalid ack_id."
},
}
Example
const { res, error } = await sequin.ackMessage('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4');
if (error) {
console.error('Error acknowledging message:', error.summary);
// Handle the error appropriately
} else {
console.log('Message acknowledged successfully', res);
}
ackMessages()
You can also acknowledge a batch of messages using ackMessages()
:
const { res, error } = sequin.ackMessages(stream_id_or_name, consumer_id_or_name, ack_ids)
Parameters
ackMessages()
accepts three arguments:
stream_id_or_name
(string): Either the name or id of the stream.consumer_id_or_name
(string): Either the name or id of the stream.ack_ids
(array): An array ofack_id
strings for the messages you want to ack.
Returns
ackMessages()
will return an object with a two keys, res
and error
:
[!IMPORTANT]
ackMessages()
is all or nothing. Either all the messages are successfully acknowledged, or none of the messages are acknowledged.
Success
{
"res": {
"success": true
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 400,
"summary": "Invalid ack_id."
},
}
Example
const { res, error } = await sequin.ackMessages('my_stream', 'my_consumer', ['07240856-96cb-4305-9b2f-620f4b1528a4', '522c69a1-0bbe-49ec-9d0d-e39b40d483f8']);
if (error) {
console.error('Error acknowledging message:', error.summary);
// Handle the error appropriately
} else {
console.log('Message acknowledged successfully', res);
}
nackMessage()
Or, you can nack
a message using nackMessage()
:
const { res, error } = sequin.nackMessage(stream_id_or_name, consumer_id_or_name, ack_id)
Parameters
nackMessage()
accepts three arguments:
stream_id_or_name
(string): Either the name or id of the stream.consumer_id_or_name
(string): Either the name or id of the stream.ack_id
(string): Theack_id
for the message to not acknowledge.
Returns
nackMessage()
will return an object with a two keys, res
and error
:
Success
{
"res": {
"success": true
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 400,
"summary": "Invalid ack_id"
},
}
Example
const { res, error } = await sequin.nackMessage('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4');
if (error) {
console.error('Error acknowledging message:', error.summary);
// Handle the error appropriately
} else {
console.log('Message acknowledged successfully', res);
}
nackMessages()
Or, you can nack
a batch of messages using nackMessages()
:
const { res, error } = sequin.nackMessages(stream_id_or_name, consumer_id_or_name, ack_ids)
Parameters
nackMessages()
accepts three arguments:
stream_id_or_name
(string): Either the name or id of the stream.consumer_id_or_name
(string): Either the name or id of the stream.ack_ids
(array): An array ofack_id
strings for the messages to not acknowledge.
Returns
nackMessages()
will return an object with a two keys, res
and error
:
[!IMPORTANT]
nackMessages()
is all or nothing. Either all the messages are successfully nacked, or none of the messages are nacked.
Success
{
"res": {
"success": true
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 400,
"summary": "Invalid ack_id"
},
}
Example
const { res, error } = await sequin.nackMessages('my_stream', 'my_consumer', ['07240856-96cb-4305-9b2f-620f4b1528a4', '522c69a1-0bbe-49ec-9d0d-e39b40d483f8']);
if (error) {
console.error('Error acknowledging message:', error.summary);
// Handle the error appropriately
} else {
console.log('Message acknowledged successfully', res);
}
createStream()
Creating streams can be helpful in automated testing. You can create a new stream using createStream()
:
const { res, error } = sequin.createStream(stream_name, options?)
Parameters
createStream()
accepts two parameters:
name
(string): The name of the stream you want to create.options
(object, optional): An object of key:value pairs that define optional parameters:one_message_per_key
(bool)process_unmodified
(bool)max_storage_gb
(integer)retain_up_to
(integer)retain_at_least
(integer)
Returns
createStream()
will return an object with a two keys, res
and error
:
Success
{
"res": {
"id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
"name": "my_stream",
"account_id": "8b930c30-2334-4339-b7ba-f250b7be223e",
"stats": {
"message_count": 0,
"consumer_count": 0,
"storage_size": 163840
},
"inserted_at": "2024-07-24T20:02:46Z",
"updated_at": "2024-07-24T20:02:46Z"
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 422,
"summary": "Validation failed: duplicate name"
},
}
Example
const { res, error } = await sequin.createStream('my_stream')
if (error) {
console.error('Error creating stream:', error.summary);
// Handle the error appropriately
} else {
console.log('Stream created successfully', res );
}
deleteStream()
Deleting streams can be helpful in automated testing. You can delete a stream using deleteStream()
:
const { res, error } = sequin.deleteStream(stream_id_or_name)
Parameters
deleteStream()
accepts one parameter:
stream_id_or_name
(string): The id or name of the stream you want to delete.
Returns
deleteStream()
will return an object with a two keys, res
and error
:
Successful delete
{
"res": {
"id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
"deleted": true
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 404,
"summary": "Not found: No `stream` found matching the provided ID or name"
},
}
Example
const { res, error } = await sequin.deleteStream('my_stream')
if (error) {
console.error('Error deleting stream:', error.summary);
// Handle the error appropriately
} else {
console.log('Stream deleted successfully', res );
}
createConsumer()
Creating consumers can be helpful in automated testing. You can create a new consumer using createConsumer()
:
const { res, error } = sequin.createConsumer(stream_id_or_name, consumer_name, consumer_filter, options?)
Parameters
createConsumer()
accepts two parameters:
stream_id_or_name
(string): The id or name of the stream you want to attach the consumer to.name
(string): The name of the consumer you want to create.filter
(string): The filter pattern the consumer will use to pull messages off the stream.options
(object, optional): An object of key:value pairs that define optional parameters:ack_wait_ms
(integer): Acknowledgement wait time in millisecondsmax_ack_pending
(integer): Maximum number of pending acknowledgementsmax_deliver
(integer): Maximum number of delivery attempts
Returns
createConsumer()
will return an object with a two keys, res
and error
:
Success
{
"res": {
"ack_wait_ms": 30000,
"filter_key_pattern": "test.>",
"id": "67df6362-ba21-4ddc-8601-55d404bacaeb",
"inserted_at": "2024-07-24T20:12:20Z",
"kind": "pull",
"max_ack_pending": 10000,
"max_deliver": null,
"max_waiting": 20,
"name": "my_consumer",
"stream_id": "15b1f003-3a47-4371-8331-6437cb48477e",
"updated_at": "2024-07-24T20:12:20Z",
"http_endpoint_id": null,
"status": "active"
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 422,
"summary": "Validation failed: duplicate name"
},
}
Example
const { res, error } = await sequin.createConsumer('my_stream', 'my_consumer', 'test.>')
if (error) {
console.error('Error creating consumer:', error.summary);
// Handle the error appropriately
} else {
console.log('Consumer created successfully', res );
}
deleteConsumer()
Deleting consumers can be helpful in automated testing. You can delete a consumer using deleteConsumer()
:
const { res, error } = sequin.deleteConsumer(stream_id_or_name, consumer_id_or_name)
Parameters
deleteConsumer()
accepts two parameters:
stream_id_or_name
(string): The id or name of the stream associated to the consumer you want to delete.consumer_id_or_name
(string): The id or name of the consumer you want to delete.
Returns
deleteConsumer()
will return an object with a two keys, res
and error
:
Successful delete
{
"res": {
"id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
"deleted": true
},
"error": null
}
Error
{
"res": null,
"error": {
"status": 404,
"summary": "Not found: No `consumer` found matching the provided ID or name"
},
}
Example
const { res, error } = await sequin.deleteConsumer('my_stream', 'my_consumer')
if (error) {
console.error('Error deleting consumer:', error.summary);
// Handle the error appropriately
} else {
console.log('Consumer deleted successfully', res );
}
Testing
To adequately test Sequin, we recommend creating temporary streams and consumers in addition to testing sending and receiving messages. Here's an example using jest:
import sequin from './sequin.js';
describe('Sequin Stream and Consumer Test', () => {
const streamName = `test-stream-${Date.now()}`;
const consumerName = `test-consumer-${Date.now()}`;
test('Stream and Consumer Lifecycle', async () => {
// Create a new stream
const { res: stream, error: streamError } = await client.createStream(streamName);
expect(stream.name).toBe(streamName);
// Create a consumer
const { res: consumer, error: consumerError } = await client.createConsumer(streamName, consumerName, 'test.>');
expect(consumer.name).toBe(consumerName);
// Send a message
await sequin.sendMessage(streamName, 'test.1', 'Hello, Sequin!');
expect(sendRes).toHaveProperty('published', 1);
// Receive and ack a message
const { res: receiveRes } = await sequin.receiveMessage(streamName, consumerName);
// Add tests for your business logic
await sequin.ackMessages(streamName, consumerName, [receiveRes.ack_id]);
// Delete the consumer
const { res: deleteConsumerRes, error: deleteConsumerError } = await client.deleteConsumer(streamName, consumerName);
expect(deleteConsumerRes).toHaveProperty('deleted', true);
// Delete the stream
const { res: deleteStreamRes, error: deleteStreamError } = await client.deleteStream(streamName);
expect(deleteStreamRes).toHaveProperty('deleted', true);
});
});