npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

message-subscriber

v1.0.55

Published

Subscribe to messages at any Cloud Service. Receive messages only by registering a EventEmitter Listener. (AWS SQS, Azure ESQ, Google Tasks)

Downloads

475

Readme

message-subscriber

NPM version Build status TypeScript types Maintainability Coverage Status Known Vulnerabilities FOSSA Status

Async message subscriber for receiving messages from queueing services available in cloud services. Gives the option to implement your own MessageAdapter to plug new services and use the core message-subscriber. Can handle hundreds of parallel processing using Node async nature, the messages are delivered by subscribing to an Event dispatched from Node EventEmitter.

Installation

npm install --save message-subscriber

MessageSubscriber

The MessageSubscriber is the main interface to manage and receive the messages from the queueing service.

const messageSubscriber = new MessageSubscriber({
    messageAdapter: sqsAdapter,
    parallelism: 100,
    refreshInterval: 10
});
  • messageAdapter - required, message adapter that will be called to send de commands to the cloud queue service (E.g.: SQSAdapter)
  • parallelism - required the number of parallel messages that you will receive to process
  • refreshInterval - optional, when you receive one message from the queue service it becomes unavailable for a period of time, using the refresh interval the MessageSubscriber will delay the message using the interval passed (in seconds) (E.g.: Using aws sqs when you receive a message it becomes invisible for 30 seconds, if your processing takes more than 30 seconds, the message will become available and you can have duplicity, with refreshInterval the code will call delay on the message from time to time and the message will not become available when processing).

Events

The MessageSubscriber emits the following events:

  • message: Comes with the message received

    • Message:
      • id: string - The id of the message
      • receipt?: string - The receipt of the message (Generally used to delete)
      • payload: any - The payload of the message
      • attributes?: any - The attributes of the message
      • receivedTimestamp: number - The timestamp that the message was received
      • delete: async function - The function to delete message
      • delay: async function - The function to delay message
  • empty: When the queue is empty this event is emitted

  • error: If any operation errors this event will be dispatched with the error

  • drained: When gracefulShutdown() is called the code will wait all the messages that are queued to be processed, when all are processed this event is called

  • paused: When pause() is called this event is emitted

  • resumed: When resume() is called this event is emitted

  • stoped: `When stop() is called this event is emitter. (Note: when the queue is stoped it CAN NOT start again, this DO NOT wait for the queued messages to be processed)

MessageAdapters

The message adapters are interfaces to communicate with the queueing services at the cloud. You can use the MessageAdapter Interface to create your own adapter and plug at the MessageSubscriber to use the queueing service that you need.

SQSAdapter

To use this package with AWS, you need to have at least one SQS queue created in your account. You'll need the queue URL to pass as a parameter to the SQSAdapter. E.g.: https://sqs.us-east-1.amazonaws.com/000000000000/your-queue.

Authentication to AWS can be done using any methods of setting credentials in the AWS Javascript SDK. Your credential must have the following permissions in the queues that you are going to use in this package:

  • sqs:ReceiveMessage
  • sqs:DeleteMessage
  • sqs:ChangeMessageVisibility
const sqsAdapter = new SQSAdapter({
    queueURL: 'QUEUE_URL',
    maxNumberOfMessages: 10,
    sqs: {
        region: 'us-east-1'
    }
})
  • queueURL - required, the url of the AWS SQS Queue
  • maxNumberOfMessages - optional the max number of messages to receive at one SQS receiveMessage call, default: 10
  • sqs - required, the params to configure the aws sqs queue (these are the params that are passed to aws-sdk sqs client).

Usage

This example uses que SQSAdapter to subscribe for messages using de AWS SQS.

import { Message, MessageSubscriber, SQSAdapter } from 'message-subscriber';

const sqsAdapter = new SQSAdapter({
    queueURL: 'QUEUE_URL',
    maxNumberOfMessages: 10,
    sqs: {
        region: 'us-east-1'
    }
})

const messageSubscriber = new MessageSubscriber({
    messageAdapter: sqsAdapter,
    parallelism: 100,
    refreshInterval: 10 // This will automatically refresh the delay of the message visibility at the queue.
});

// Registering event listeners

messageSubscriber.on('message', (message: Message) => {
  console.log('messageReceived', message);
  console.log('queue length', messageSubscriber.length); // You can see the length of the queue that the messages are being dispatched
  await message.delete(); // You can call delete directly from message
});

messageSubscriber.on('empty', () => {
  console.log('queue is empty');
});

messageSubscriber.on('error', (err: Error) => {
  console.log('messageSubscriber error', err);
});

// Handling process termination

const handleShutdown = async () => {
  try {
    await messageSubscriber.gracefulShutdown(); // This will wait for queue to process all pending messages.
  } catch (err) {
    console.log(err);
    process.exit(1);
  }

  process.exit(0);
};

process.on('SIGINT', handleShutdown);
process.on('SIGTERM', handleShutdown);

// Starting message-subscriber
messageSubscriber.start();

License

MIT

FOSSA Status