npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kubemq-js

v2.0.2

Published

kubemq js/ts library for KubeMQ Message Broker

Downloads

1,324

Readme

KubeMQ Node JS/TS SDK

The KubeMQ SDK for NodeJS/TS enables typescript developers to communicate with KubeMQ server.

Prerequisites

  • Node.js (Ensure you have a recent version of Node.js installed)

  • TypeScript Compiler

  • KubeMQ server running locally or accessible over the network

General SDK description

The SDK implements all communication patterns available through the KubeMQ server:

  • Events

  • EventStore

  • Command

  • Query

  • Queue

Installing

The recommended way to use the SDK for Node in your project is to consume it from Node package manager.


npm install kubemq-js

Running the examples

The examples

are standalone projects that showcase the usage of the SDK.

To run the examples, you need to have a running instance of KubeMQ.

Import the project in any IDE of choice like Visual Studio Code or IntelliJ .

You will see three directory in example project which contains files to showing

implementation.

Directories are:

cq

pubsub

queues

cq directory contains the example related to Command and Query

pubsub directory contains the example related to Event and EventStore

queues directory contains the example related to Queues

Building from source

Once you check out the code from GitHub, you can build it using Node & Typescript.


npx  path/to/example_file.ts

Example:

npx  tsc  examples/cq/CreateExample.ts

Above command will compile the .ts file and produce the .js file in same directory where .ts file is, To run the compiled JavaScript file use below command .


node  path/to/example_file.js

Example:

node  examples/cq/CreateExample.ts

Payload Details

  • Metadata: The metadata allows us to pass additional information with the event. Can be in any form that can be presented as a string, i.e., struct, JSON, XML and many more.

  • Body: The actual content of the event. Can be in any form that is serializable into a byte array, i.e., string, struct, JSON, XML, Collection, binary file and many more.

  • ClientID: Displayed in logs, tracing, and KubeMQ dashboard(When using Events Store, it must be unique).

  • Tags: Set of Key value pair that help categorize the message

KubeMQ PubSub Client Examples

Below examples demonstrating the usage of KubeMQ PubSub (Event and EventStore) client. The examples include creating, deleting, listing channels, and sending/subscribing event messages.

File Structure

Event

  • pubsub\CreateChannelExample.ts: Demonstrates creating event channels.

  • pubsub\DeleteChannelExample.ts: Demonstrates deleting event channels.

  • pubsub\ListEventsChanneExample.ts: Demonstrates listing event channels.

  • pubsub\SendEventMessageExample.ts: Demonstrates to send message to single event & event-store event channel

  • pubsub\SubscribeToEventExample.ts: Demonstrates Subscribe to event channel

  • pubsub\SubscribeToEventStoreExample.ts Demonstrates example of subscribing event-store channel.

Getting Started

Construct the PubsubClient

For executing PubSub operation we have to create the instance of PubsubClient, it's instance can created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connection are established. Below Table Describe the Parameters available for establishing connection.

PubsubClient Accepted Configuration

| Name | Type | Description | Default Value | Mandatory | |--------------------------|---------|---------------------------------------------------------|-------------------|-----------| | address | String | The address of the KubeMQ server. | None | Yes | | clientId | String | The client ID used for authentication. | None | Yes | | authToken | String | The authorization token for secure communication. | None | No | | tls | boolean | Indicates if TLS (Transport Layer Security) is enabled. | None | No | | tlsCertFile | String | The path to the TLS certificate file. | None | No (Yes if tls is true) | | tlsKeyFile | String | The path to the TLS key file. | None | No (Yes if tls is true) | | tlsCaCertFile | String | The path to the TLS CA cert file. | None | No (Yes if tls is true) | | maxReceiveSize | int | The maximum size of the messages to receive (in bytes). | 104857600 (100MB) | No | | reconnectIntervalSeconds | int | The interval in seconds between reconnection attempts. | 1 | No |

PubsubClient connection establishment example code


const  opts: Config = {
	address:  'localhost:50000',
	clientId:  Utils.uuid(),
	reconnectIntervalSeconds:  1,
};

const  pubsubClient = new  PubsubClient(opts);

Below example demonstrate to construct PubSubClient with ssl and other configurations:


const  config: Config = {

	address:  'localhost:50000', // KubeMQ gRPC endpoint address
	clientId:  'your-client-id', // Connection clientId
	authToken:  'your-jwt-auth-token', // Optional JWT authorization token
	tls:  true, // Indicates if TLS is enabled
	tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
	tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
	tlsCaCertFile:  'path/to/tls-key.pem', // Path to the TLS key file
	maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
	reconnectIntervalSeconds:  1 // Interval in milliseconds between reconnect attempts (1 second)
};

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo Interface Attributes

| Name | Type | Description | |--------------------|--------|------------------------------------------| | host | String | The host of the server. | | version | String | The version of the server. | | serverStartTime | long | The start time of the server (in seconds).| | serverUpTimeSeconds| long | The uptime of the server (in seconds). |


ServerInfo  pingResult = pubsubClient.ping();
console.log('Ping Response: ' + pingResult);

PubSub CreateEventsChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|---------------------------------------------|---------------|-----------| | channelName | String | Channel name which you want to subscribe to | None | Yes |

Response:

| Name | Type | Description | |------|---------------|---------------------------------------| | void | Promise | Doesn't return a value upon completion |


async  function  createEventsChannel(channel: string) {
return  pubsubClient.createEventsChannel(channel);
}

PubSub Create Events Store Channel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|---------------------------------------------|---------------|-----------| | channelName | String | Channel name to which you want to subscribe | None | Yes |

Response:

| Name | Type | Description | |------|---------------|----------------------------------------| | void | Promise | Doesn't return a value upon completion |



async  function  createEventsStoreChannel(channel: string) {
return  pubsubClient.createEventsStoreChannel(channel);
}

PubSub ListEventsChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|-------------------------------------------|---------------|-----------| | channelName | String | Channel name that you want to search for | None | No |

Response: PubSubChannel[] PubSubChannel interface Attributes

| Name | Type | Description | |-------------|-------------|--------------------------------------------------------------------------------------------------| | name | String | The name of the Pub/Sub channel. | | type | String | The type of the Pub/Sub channel. | | lastActivity| long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. | | isActive | boolean | Indicates whether the channel is active or not. | | incoming | PubSubStats | The statistics related to incoming messages for this channel. | | outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |


async  function  listEventsChannel(search: string) {
	const  channels = await  pubsubClient.listEventsChannels(search);
	console.log(channels);
}

PubSub ListEventsStoreChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|-------------------------------------------|---------------|-----------| | channelName | String | Channel name that you want to search for | None | No |

Response: PubSubChannel[] PubSubChannel interface Attributes

| Name | Type | Description | |--------------|-------------|----------------------------------------------------------------------------------------------| | name | String | The name of the Pub/Sub channel. | | type | String | The type of the Pub/Sub channel. | | lastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. | | isActive | boolean | Indicates whether the channel is active or not. | | incoming | PubSubStats | The statistics related to incoming messages for this channel. | | outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |


async  function  listEventsStoreChannel(search: string) {
	const  channels = await  pubsubClient.listEventsStoreChannels(search);
	console.log(channels);
}

PubSub SendEventMessage Example:

Request: EventMessage Interface Attributes

| Name | Type | Description | Default Value | Mandatory | |----------|-----------------------|----------------------------------------------------------|------------------|-----------| | id | String | Unique identifier for the event message. | None | No | | channel | String | The channel to which the event message is sent. | None | Yes | | metadata | String | Metadata associated with the event message. | None | No | | body | byte[] | Body of the event message in bytes. | Empty byte array | No | | tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |

Note:- metadata or body or tags any one is required

Response: NONE


await  pubsubClient.sendEventsMessage({
	id:  `234`,
	channel: 'events.single',
	body:  Utils.stringToBytes('event message'),
});

PubSub SendEventStoreMessage Example:

Request: EventStoreMessage Class Attributes

| Name | Type | Description | Default Value | Mandatory | |----------|-----------------------|----------------------------------------------------------|------------------|-----------| | id | String | Unique identifier for the event message. | None | No | | channel | String | The channel to which the event message is sent. | None | Yes | | metadata | String | Metadata associated with the event message. | None | No | | body | byte[] | Body of the event message in bytes. | Empty byte array | No | | tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |

Note:- metadata or body or tags any one is required

Response: NONE


await  pubsubClient.sendEventStoreMessage({
	id:  '987',
	channel: 'events_store.single',
	body:  Utils.stringToBytes('event store message'),
});

PubSub SubscribeEvents Example:

Request: EventsSubscription Class Attributes

| Name | Type | Description | Default Value | Mandatory | |-------------------------|------------------------------------|---------------------------------------------------------------------------|---------------|-----------| | channel | String | The channel to subscribe to. | None | Yes | | group | String | The group to subscribe with. | None | No | | onReceiveEventCallback | Consumer | Callback function to be called when an event message is received. | None | Yes | | onErrorCallback | Consumer | Callback function to be called when an error occurs. | None | No |

Response: NONE

Callback: EventMessageReceived class details

| Name | Type | Description | |-------------|-----------------------|--------------------------------------------------------------------| | id | String | The unique identifier of the message. | | fromClientId| String | The ID of the client that sent the message. | | timestamp | long | The timestamp when the message was received, in seconds. | | channel | String | The channel to which the message belongs. | | metadata | String | The metadata associated with the message. | | body | byte[] | The body of the message. | | sequence | long | The sequence number of the message. | | tags | Map<String, String> | The tags associated with the message. |

async function subscribeToEvent() {  
  //Subscribes to events from the specified channel and processes received events.  
  const eventsSubscriptionRequest = new EventsSubscriptionRequest('events.A', '');  
  
  // Define the callback for receiving events  
  eventsSubscriptionRequest.onReceiveEventCallback = (event: EventMessageReceived) => {  
    console.log('SubscriberA received event:', {  
      id: event.id,  
      fromClientId: event.fromClientId,  
      timestamp: event.timestamp,  
      channel: event.channel,  
      metadata: event.metadata,  
      body: event.body,  
      tags: event.tags,  
    });  
  };  
  
  // Define the callback for handling errors  
  eventsSubscriptionRequest.onErrorCallback = (error: string) => {  
    console.error('SubscriberA error:', error);  
  };  
  
  pubsubClient  
  .subscribeToEvents(eventsSubscriptionRequest)  
    .then(() => {  
      console.log('Subscription successful');  
    })  
    .catch((reason: any) => {  
      console.error('Subscription failed:', reason);  
    });  
  
}

PubSub SubscribeEventsStore Example:

Request: EventsStoreSubscription Interface Attributes

| Name | Type | Description | Default Value | Mandatory | |--------------------------|-----------------------------------------|-------------------------------------------------------------------------|---------------|-----------| | channel | String | The channel to subscribe to. | None | Yes | | group | String | The group to subscribe with. | None | No | | onReceiveEventCallback | Consumer | Callback function to be called when an event message is received. | None | Yes | | onErrorCallback | Consumer | Callback function to be called when an error occurs. | None | No |

Response: None

Callback: EventStoreMessageReceived class details

| Name | Type | Description | |-------------|-----------------------|--------------------------------------------------------------------| | id | String | The unique identifier of the message. | | fromClientId| String | The ID of the client that sent the message. | | timestamp | long | The timestamp when the message was received, in seconds. | | channel | String | The channel to which the message belongs. | | metadata | String | The metadata associated with the message. | | body | byte[] | The body of the message. | | sequence | long | The sequence number of the message. | | tags | Map<String, String> | The tags associated with the message. |

async function subscribeToEventStore() {  
  //Subscribes to events store messages from the specified channel with a specific configuration.  
  const eventsSubscriptionRequest = new EventsStoreSubscriptionRequest('events_store.A', '');  
  eventsSubscriptionRequest.eventsStoreType = EventStoreType.StartAtSequence;  
  eventsSubscriptionRequest.eventsStoreSequenceValue=1;  
  
  // Define the callback for receiving events  
  eventsSubscriptionRequest.onReceiveEventCallback = (event: EventStoreMessageReceived) => {  
    console.log('SubscriberA received event:', {  
      id: event.id,  
      fromClientId: event.fromClientId,  
      timestamp: event.timestamp,  
      channel: event.channel,  
      metadata: event.metadata,  
      body: event.body,  
      tags: event.tags,  
      sequence: event.sequence,  
    });  
  };  
  
  // Define the callback for handling errors  
  eventsSubscriptionRequest.onErrorCallback = (error: string) => {  
    console.error('SubscriberA error:', error);  
  };  
  
  pubsubClient  
  .subscribeToEvents(eventsSubscriptionRequest)  
    .then(() => {  
      console.log('Eventstore Subscription successful');  
    })  
    .catch((reason: any) => {  
      console.error('Eventstore Subscription failed:', reason);  
    });  
}

PubSub DeleteEventsChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|-----------------------------------------------|---------------|-----------| | channelName | String | Channel name that you want to delete | None | Yes |

Response:

| Name | Type | Description | |------|---------------|----------------| | void | Promise | Returns nothing |



async  function  deleteChannel(channel: string) {
	return  pubsubClient.deleteEventsChannel(channel);
}

PubSub DeleteEventsStoreChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|-------------------------------------------|---------------|-----------| | channelName | String | The name of the channel you want to delete | None | Yes |

Response:

| Name | Type | Description | |------------------|---------|-------------------------------| | isChannelDeleted | boolean | Indicates if the channel is deleted (true/false) |



async  function  deleteChannel(channel: string) {
	return  pubsubClient.deleteEventsStoreChannel(channel);
}

KubeMQ Queues Client Examples

Below examples demonstrating the usage of KubeMQ Queues client. The examples include creating, deleting, listing channels, and sending/receiving queues messages.

Project Structure

  • queues/CreateQueuesChannelExample.ts: Demonstrates creating queues channels.

  • queues/DeleteQueuesChannelExample.ts: Demonstrates deleting queues channels.

  • queues/ListQueuesChannelExample.ts: Demonstrates listing queues channels.

  • queues/Send_ReceiveMessageExample.ts: Demonstrates example of sending & receiving message.

  • queues/WaitingPullExample.ts: Demonstrates example of pulling message by waiting and pulling.

Getting Started

Construct the QueuesClient

For executing Queues operation we have to create the instance of QueuesClient, it's instance can created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connection are established. Below Table Describe the Parameters available for establishing connection.

QueuesClient Accepted Configuration

| Name | Type | Description | Default Value | Mandatory | |--------------------------|---------|------------------------------------------------------------|-------------------|-----------| | address | String | The address of the KubeMQ server. | None | Yes | | clientId | String | The client ID used for authentication. | None | Yes | | authToken | String | The authorization token for secure communication. | None | No | | tls | boolean | Indicates if TLS (Transport Layer Security) is enabled. | None | No | | tlsCertFile | String | The path to the TLS certificate file. | None | No (Yes if tls is true) | | tlsKeyFile | String | The path to the TLS key file. | None | No (Yes if tls is true) | | tlsCaCertFile | String | The path to the TLS CA cert file. | None | No (Yes if tls is true) | | maxReceiveSize | int | The maximum size of the messages to receive (in bytes). | 104857600 (100MB) | No | | reconnectIntervalSeconds | int | The interval in seconds between reconnection attempts. | 1 | No |

QueuesClient establishing connection example code


const  opts: Config = {
	address:  'localhost:50000',
	clientId:  Utils.uuid(),
};

const  queuesClient = new  QueuesClient(opts);

Below example demonstrate to construct PubSubClient with ssl and other configurations:

const  opts: Config = {
	address:  'localhost:50000', // KubeMQ gRPC endpoint address
	clientId:  'your-client-id', // Connection clientId
	authToken:  'your-jwt-auth-token', // Optional JWT authorization token
	tls:  true, // Indicates if TLS is enabled
	tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
	tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
	tlsCaCertFile:  'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file
	maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
	reconnectIntervalSeconds:  1 // Interval in milliseconds between reconnect attempts (1 second)
};

const  queuesClient = new  QueuesClient(opts);

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo Class Attributes

| Name | Type | Description | |------------------|-------|------------------------------------------------------| | host | String| The host of the server. | | version | String| The version of the server. | | serverStartTime | long | The start time of the server (in seconds). | | serverUpTimeSeconds | long | The uptime of the server (in seconds). |


const  pingResult = queuesClient.ping();
console.log('Ping Response: ' + pingResult);

Queues CreateQueueChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |--------------|--------|------------------------------------------|---------------|-----------| | channelName | String | The name of the channel you want to create | None | Yes |

Response:

| Name | Type | Description | |------------------|---------|------------------------------------------------| | isChannelCreated | boolean | Indicates whether the channel was created (true/false) |



async  function  createQueueChannel(channel: string) {
	return  queuesClient.createQueuesChannel(channel);
}

Queues listQueueChannels Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |---------------|--------|------------------------------------------|---------------|-----------| | searchString | String | The channel name you want to search for | None | No |

Response: QueuesChannel[] QueuesChannel interface Attributes

| Name | Type | Description | |-----------------|---------------|----------------------------------------------------------| | name | String | The name of the queue channel. | | type | String | The type of the queue channel. | | lastActivity | long | The timestamp of the last activity in the queue channel.| | isActive | boolean | Indicates whether the queue channel is currently active.| | incoming | QueuesStats | The statistics for incoming messages in the queue channel. | | outgoing | QueuesStats | The statistics for outgoing messages in the queue channel. |


async  function  listQueueChannels(search: string) {
	const  channels = await  queuesClient.listQueuesChannel(search);
	console.log(channels);
}

Queues SendSingleMessage Example:

Request: QueueMessage class attributes

| Name | Type | Description | Default Value | Mandatory | |-------------------------------|----------------------|---------------------------------------------------------------------------------------------|------------------|-----------| | id | String | The unique identifier for the message. | None | No | | channel | String | The channel of the message. | None | Yes | | metadata | String | The metadata associated with the message. | None | No | | body | byte[] | The body of the message. | new byte[0] | No | | tags | Map<String, String> | The tags associated with the message. | new HashMap<>() | No | | delayInSeconds | int | The delay in seconds before the message becomes available in the queue. | None | No | | expirationInSeconds | int | The expiration time in seconds for the message. | None | No | | attemptsBeforeDeadLetterQueue | int | The number of receive attempts allowed before the message is moved to the dead letter queue. | None | No | | deadLetterQueue | String | The dead letter queue where the message will be moved after reaching max receive attempts. | None | No |

Response: QueueSendResult class attributes

| Name | Type | Description | |-----------|----------------|-----------------------------------------------------------------| | id | String | The unique identifier of the message. | | sentAt | LocalDateTime | The timestamp when the message was sent. | | expiredAt | LocalDateTime | The timestamp when the message will expire. | | delayedTo | LocalDateTime | The timestamp when the message will be delivered. | | isError | boolean | Indicates if there was an error while sending the message. | | error | String | The error message if isError is true. |


await  queuesClient.sendQueuesMessage({
	channel:  'queues.single',
	body:  Utils.stringToBytes('queue message'),
})
.then((result) =>  console.log(result))
.catch((reason) =>  console.error(reason));

Queues Pulls messages from a queue. Example:

Request: QueuesPullWaitngMessagesRequest class attributes

| Name | Type | Description | Default Value | Mandatory | |---------------------|--------|--------------------------------------------|---------------|-----------| | channel | String | The channel to poll messages from. | None | Yes | | maxNumberOfMessages | int | The maximum number of messages to poll. | 1 | No | | waitTimeoutSeconds | int | The wait timeout in seconds for polling messages. | 60 | No |

Response: QueuesPullWaitingMessagesResponse class attributes

| Name | Type | Description | |------------------|-------------------|-----------------------------------------------------| | id | String | The reference ID of the request. | | messagesReceived | number | Number of valid messages received. | | messages | QueueMessage[] | The list of received queue messages. | | error | String | The error message, if any error occurred. | | isError | boolean | Indicates if there was an error. | | isPeek | boolean | Indicates if it is a peek or pull operation. | | messagesExpired | number | Number of expired messages from the queue. |


await  queuesClient
.pull({
	channel:  'queues.peek',
	maxNumberOfMessages:  10,
	waitTimeoutSeconds:  10,
})

.then((response) => {
	response.messages.forEach((msg) => {
	console.log(msg);
});
})

.catch((reason) => {
	console.error(reason);
});

Queues Get waiting messages from a queue Example:

Request: QueuesPullWaitngMessagesRequest class attributes

| Name | Type | Description | Default Value | Mandatory | |--------------------|--------|----------------------------------------------------|---------------|-----------| | channel | String | The channel to poll messages from. | None | Yes | | maxNumberOfMessages| int | The maximum number of messages to poll. | 1 | No | | waitTimeoutSeconds | int | The wait timeout in seconds for polling messages. | 60 | No |

Response: QueuesPullWaitingMessagesResponse class attributes

| Name | Type | Description | |------------------|-------------------|--------------------------------------------------| | id | String | The reference ID of the request. | | messagesReceived | number | Number of valid messages received. | | messages | QueueMessage[] | The list of received queue messages. | | error | String | The error message, if any error occurred. | | isError | boolean | Indicates if there was an error. | | isPeek | boolean | Indicates if the operation is a peek or pull. | | messagesExpired | number | Number of expired messages from the queue. |


await  queuesClient
.waiting({
	channel:  'queues.peek',
	maxNumberOfMessages:  5,
	waitTimeoutSeconds:  20,
})

.then((response) => {
	response.messages.forEach((msg) => {
	console.log(msg);
});
})
.catch((reason) => {
	console.error(reason);
});

Receive Queue Messages

Receives messages from a Queue channel.

Request: QueuesPollRequest Class Attributes

| Name | Type | Description | Default Value | Mandatory | |--------------------------|---------|------------------------------------------------------|---------------|-----------| | channel | String | The channel to poll messages from. | None | Yes | | pollMaxMessages | int | The maximum number of messages to poll. | 1 | No | | pollWaitTimeoutInSeconds | int | The wait timeout in seconds for polling messages. | 60 | No | | autoAckMessages | boolean| Indicates if messages should be auto-acknowledged. | false | No | | visibilitySeconds | int| Add a visibility timeout feature for messages. | 0 | No |

Response: QueuesMessagesPulledResponse Class Attributes

| Name | Type | Description | |------------------------|----------------------------|---------------------------------------------------------| | id | String | The reference ID of the request. | | messages | QueueMessageReceived[] | The list of received queue messages. | | messagesReceived | number | Number of valid messages received. | | messagesExpired | number | Number of messages expired. | | isPeek | boolean | Indicates if the operation is a peek or pull. | | error | String | The error message, if any error occurred. | | isError | boolean | Indicates if there was an error. | | visibilitySeconds | int | The visibility timeout for the message in seconds. | | isAutoAcked | boolean | Indicates whether the message was auto-acknowledged. |

Response: QueueMessageReceived class attributes

Here's the requested Markdown table for the QueueMessageReceived class:

| Name | Type | Description | |-----------------------|---------------------------------------|---------------------------------------------------------| | id | String | The unique identifier for the message. | | channel | String | The channel from which the message was received. | | metadata | String | Metadata associated with the message. | | body | byte[] | The body of the message in byte array format. | | fromClientId | String | The ID of the client that sent the message. | | tags | Map<String, String> | Key-value pairs representing tags for the message. | | timestamp | Instant | The timestamp when the message was created. | | sequence | long | The sequence number of the message. | | receiveCount | int | The number of times the message has been received. | | isReRouted | boolean | Indicates whether the message was rerouted. | | reRouteFromQueue | String | The name of the queue from which the message was rerouted.| | expiredAt | Instant | The expiration time of the message, if applicable. | | delayedTo | Instant | The time the message is delayed until, if applicable. | | transactionId | String | The transaction ID associated with the message. | | isTransactionCompleted| boolean | Indicates whether the transaction for the message is completed. | | responseHandler | StreamObserver<QueuesDownstreamRequest> | The response handler for processing downstream requests. | | receiverClientId | String | The ID of the client receiving the message. | | visibilitySeconds | int | The visibility timeout for the message in seconds. | | isAutoAcked | boolean | Indicates whether the message was auto-acknowledged. |

Example

async function main() {  
  const opts: Config = {  
    address: 'localhost:50000',  
    clientId: 'kubeMQClientId-ts',  
  };  
  const queuesClient = new QueuesClient(opts);  
  
  // Receive with message visibility  
  async function receiveWithVisibility(visibilitySeconds: number) {  
    console.log("\n============================== Receive with Visibility =============================\n");  
    try {  
      const pollRequest = new QueuesPollRequest({  
        channel: 'visibility_channel',  
        pollMaxMessages: 1,  
        pollWaitTimeoutInSeconds: 10,  
        visibilitySeconds: visibilitySeconds,  
        autoAckMessages: false,  
      });  
  
      const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest);  
      console.log("Received Message Response:", pollResponse);  
        
      if (pollResponse.isError) {  
        console.log("Error: " + pollResponse.error);  
      } else {  
        pollResponse.messages.forEach(async (msg) => {  
          console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);  
          try {  
            await new Promise(resolve => setTimeout(resolve, 1000));  
            await msg.ack();  
            console.log("Acknowledged message");  
          } catch (err) {  
            console.error("Error acknowledging message:", err);  
          }  
        });  
      }  
    } catch (error) {  
      console.error('Failed to receive queue messages:', error);  
    }  
  }  
  
  // Test visibility expiration  
  async function receiveWithVisibilityExpired() {  
    console.log("\n============================== Receive with Visibility Expired =============================\n");  
    await receiveWithVisibility(2);  
  }  
  
  // Test visibility extension  
  async function receiveWithVisibilityExtension() {  
    console.log("\n============================== Receive with Visibility Extension =============================\n");  
    try {  
      const pollRequest = new QueuesPollRequest({  
        channel: 'visibility_channel',  
        pollMaxMessages: 1,  
        pollWaitTimeoutInSeconds: 10,  
        visibilitySeconds: 3,  
        autoAckMessages: false,  
      });  
  
      const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest);  
      console.log("Received Message Response:", pollResponse);  
  
      if (pollResponse.isError) {  
        console.log("Error: " + pollResponse.error);  
      } else {  
        pollResponse.messages.forEach(async (msg) => {  
          console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);  
          try {  
            await new Promise(resolve => setTimeout(resolve, 1000));  
            await msg.extendVisibilityTimer(3);  
            await new Promise(resolve => setTimeout(resolve, 2000));  
            await msg.ack();  
            console.log("Acknowledged message after extending visibility");  
          } catch (err) {  
            console.error("Error during visibility extension:", err);  
          }  
        });  
      }  
    } catch (error) {  
      console.error('Failed to receive queue messages:', error);  
    }  
  }  
  
  await receiveWithVisibilityExpired();  
  await receiveWithVisibilityExtension();  
}  
  
main();

This method allows you to receive messages from a specified Queue channel. You can configure the polling behavior, including the maximum number of messages to receive and the wait timeout. The response provides detailed information about the received messages and the transaction.

Message Handling Options:

  1. Acknowledge (ack): Mark the message as processed and remove it from the queue.
  2. Reject: Reject the message. It won't be requeued.
  3. Requeue: Send the message back to the queue for later processing.

Choose the appropriate handling option based on your application's logic and requirements.

KubeMQ Command & Query Client Examples

Below examples demonstrating the usage of KubeMQ CQ (Commands and Queries) Client. The examples include creating, deleting, listing channels, and sending/subscribing to command and query messages.

Project Structure

Command

  • cq\CreateExample.ts: Demonstrates creating command channels.
  • cq\DeleteExample.ts: Demonstrates deleting command channels.
  • cq\ListExample.ts: Demonstrates listing command channels.
  • cq\CommandsExample.ts: Demonstrates sending to command messages.
  • cq\SubscribeCommandsExample.ts: Demonstrates subscribing to command messages.
  • cq\QueriesExample.ts: Demonstrates sending to queries messages.
  • cq\SubscribeQueriesExample.ts: Demonstrates subscribing to queries messages.

Getting Started

Construct the CQClient

For executing command & query operation we have to create the instance of CQClient, it's instance can created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connection are established. Below Table Describe the Parameters available for establishing connection.

CQClient Accepted Configuration

| Name | Type | Description | Default Value | Mandatory | |--------------------------|---------|------------------------------------------------------------|-------------------|-----------| | address | String | The address of the KubeMQ server. | None | Yes | | clientId | String | The client ID used for authentication. | None | Yes | | authToken | String | The authorization token for secure communication. | None | No | | tls | boolean | Indicates if TLS (Transport Layer Security) is enabled. | None | No | | tlsCertFile | String | The path to the TLS certificate file. | None | No (Yes if tls is true) | | tlsKeyFile | String | The path to the TLS key file. | None | No (Yes if tls is true) | | tlsCaCertFile | String | The path to the TLS CA cert file. | None | No (Yes if tls is true) | | maxReceiveSize | int | The maximum size of the messages to receive (in bytes). | 104857600 (100MB) | No | | reconnectIntervalSeconds | int | The interval in seconds between reconnection attempts. | 1 | No |

CQClient establishing connection example code


const  opts: Config = {

    address:  'localhost:50000',
    clientId:  Utils.uuid(),
    reconnectIntervalSeconds:  1,
};

const  cqClient = new  CQClient(opts);

Below example demonstrate to construct CQClient with ssl and other configurations:


const  config: Config = {

    address:  'localhost:50000', // KubeMQ gRPC endpoint address
    clientId:  'your-client-id', // Connection clientId
    authToken:  'your-jwt-auth-token', // Optional JWT authorization token
    tls:  true, // Indicates if TLS is enabled
    tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
    tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
    tlsCaCertFile:  'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file
    maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
    reconnectIntervalSeconds:  1, // Interval in milliseconds between reconnect attempts (1 second)
};
const  cqClient = new  CQClient(opts);

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo interface Attributes

| Name | Type | Description | |-------------------|--------|----------------------------------------------| | host | String | The host of the server. | | version | String | The version of the server. | | serverStartTime | long | The start time of the server (in seconds). | | serverUpTimeSeconds | long | The uptime of the server (in seconds). |


const  pingResult = cqClient.ping();
console.log('Ping Response: ' + pingResult);

Command CreateCommandsChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|-------------------------------------|---------------|-----------| | channelName | String | Channel name which you want to create | None | Yes |

Response:

| Name | Type | Description | |-------------------|---------|------------------------------------| | isChannelCreated | boolean | Indicates if the channel was created (true/false) |



async  function  createCommandsChannel(channel: string) {
    return  cqClient.createCommandsChannel(channel);
}

Queries CreateQueriesChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |-------------|--------|---------------------------------------|---------------|-----------| | channelName | String | The name of the channel to create. | None | Yes |

Response:

| Name | Type | Description | |-------------------|---------|------------------------------------------| | isChannelCreated | boolean | Indicates whether the channel was created (true/false) |



async  function  createQueriesChannel(channel: string) {
    return  cqClient.createQueriesChannel(channel);
}

Command ListCommandsChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |---------------|--------|----------------------------------------|---------------|-----------| | searchString | String | The name of the channel to search for. | None | No |

Response: CQChannel[] CQChannel interface attributes

| Name | Type | Description | |----------------|--------|-----------------------------------------------------------| | name | String | The name of the channel. | | type | String | The type of the channel. | | lastActivity | long | The timestamp of the last activity on the channel. | | isActive | boolean| Indicates whether the channel is currently active. | | incoming | CQStats| Statistics about incoming messages to the channel. | | outgoing | CQStats| Statistics about outgoing messages from the channel. |


async  function  listCommandsChannels(search: string) {
    const  channels = await  cqClient.listCommandsChannels(search);
    console.log(channels);
}

Queries ListQueriesChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |--------------|--------|-----------------------------------------------|---------------|-----------| | searchString | String | Channel name which you want to search | None | No |

Response: List<CQChannel> CQChannel class attributes

| Name | Type | Description | |--------------|--------|-------------------------------------------------------| | name | String | The name of the channel. | | type | String | The type of the channel. | | lastActivity | long | The timestamp of the last activity on the channel. | | isActive | boolean| Indicates whether the channel is currently active. | | incoming | CQStats| Statistics about incoming messages to the channel. | | outgoing | CQStats| Statistics about outgoing messages from the channel.|


async  function  listQueriesChannels(search: string) {
    const  channels = await  cqClient.listQueriesChannels(search);
    console.log(channels);
}

Command SubscribeToCommandsChannel Example:

Request: CommandsSubscription Class Attributes

| Name | Type | Description | Default Value | Mandatory | |--------------------------|-------------------------|--------------------------------------------------|---------------|-----------| | channel | String | The channel for the subscription. | None | Yes | | group | String | The group associated with the subscription. | None | No | | onReceiveCommandCallback | CommandsReceiveMessage| Callback function for receiving commands. | None | Yes |

Response: None

Callback: CommandsReceiveMessage interface attributes

| Name | Type | Description | |---------------|-----------------------|--------------------------------------------| | commandReceived | CommandsReceiveMessage | The command message that was received. | | clientId | String | The ID of the client that sent the command. | | requestId | String | The ID of the request. | | isExecuted | boolean | Indicates whether the command was executed. | | timestamp | LocalDateTime | The timestamp of the response. | | error | String | The error message if an error occurred. |

async function subscribeToCommands(channelName: string) {
    //Subscribes to commands from the specified channel with a specific configuration.  
    const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1');

    // Define the callback for receiving commandMessage  
    commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => {
        console.log('SubscriberA received commandMessage:', {
            id: commandMessage.id,
            fromClientId: commandMessage.fromClientId,
            timestamp: commandMessage.timestamp,
            channel: commandMessage.channel,
            metadata: commandMessage.metadata,
            body: commandMessage.body,
            tags: commandMessage.tags,
        });
    };

    // Define the callback for handling errors  
    commandSubscriptionRequest.onErrorCallback = (error: string) => {
        console.error('SubscriberA error:', error);
    };

    cqClient.subscribeToCommands(commandSubscriptionRequest)
        .then(() => {
            console.log('Command Subscription successful');
        })
        .catch((reason: any) => {
            console.error('Command Subscription failed:', reason);
        });
}

Queries SubscribeToQueriesChannel Example:

Request: QueriesSubscriptionRequest Class Attributes

| Name | Type | Description | Default Value | Mandatory | |-------------------------|----------------------------|--------------------------------------------|---------------|-----------| | channel | String | The channel for the subscription. | None | Yes | | group | String | The group associated with the subscription. | None | No | | onReceiveQueriesCallback | QueriesReceiveMessage | Callback function for receiving queries. | None | Yes |

Response: None

Callback: QueriesReceiveMessage interface attributes

| Name | Type | Description | |---------------|--------------------------|------------------------------------------------| | id | String | The ID of the request. | | channel | String | Channel name from which the message was received. | | metadata | String | Metadata of the message. | | body | Uint8Array | The body of the response. | | tags | Map<String, String> | Tags associated with the query message. | | replyChannel | String | The reply channel for this message. |

async function subscribeToQueries(channelName: string) {

    //Subscribes to queries from the specified channel with a specific configuration.  
    const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1');

    // Define the callback for receiving queriesMessage  
    commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => {
        console.log('SubscriberA received event:', {
            id: commandMessage.id,
            fromClientId: commandMessage.fromClientId,
            timestamp: commandMessage.timestamp,
            channel: commandMessage.channel,
            metadata: commandMessage.metadata,
            body: commandMessage.body,
            tags: commandMessage.tags,
        });
    };

    // Define the callback for handling errors  
    commandSubscriptionRequest.onErrorCallback = (error: string) => {
        console.error('SubscriberA error:', error);
    };

    cqClient.subscribeToQueries(commandSubscriptionRequest)
        .then(() => {
            console.log('Queries Subscription successful');
        })
        .catch((reason: any) => {
            console.error('Queries Subscription failed:', reason);
        });
}

Command DeleteCommandsChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |--------------|--------|--------------------------------------------|---------------|-----------| | channelName | String | The name of the channel you want to delete. | None | Yes |

Response:

| Name | Type | Description | |-------|--------------|---------------------------------------| | void | Promise | Indicates no result is returned after deletion. |



async  function  deleteCommandsChannel(channel: string) {
    return  cqClient.deleteCommandsChannel(channel);
}

Queries DeleteQueriesChannel Example:

Request:

| Name | Type | Description | Default Value | Mandatory | |--------------|--------|--------------------------------------|---------------|-----------| | channelName | String | Channel name which you want to delete | None | Yes |

Response:

| Name | Type | Description | |------|--------------|----------------------------------| | void | Promise | Channel deletion returns no result |



async  function  deleteQueriesChannel(channel: string) {
    return  cqClient.deleteQueriesChannel(channel);
}

Support

if you encounter any issues, please open an issue here,

In addition, you can reach us for support by: