npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

sqs-batch

v0.0.2

Published

A wrapper on top of AWS SQS (Amazon Simple Queue Service) including a memory buffer for batch message processing

Downloads

3

Readme

sqs-batch

An AWS.SQS wrapper that let's you build queue-based applications easier. It has a built-in memory buffer to extend the 10 batch limit.

Installation

npm install sqs-buffer

Usage

const Receiver = require('sqs-buffer')

const worker = new Receiver({
  queueUrl: '...',
  messageReceiver: (messages, done) => {
    // process message(s)
    done(messages)
  }
})

worker.start()
  • By default the queue will be polled one message at a time using sqs long polling.
  • In order to process multiple messages either specify batchSize and/or bufferSize options explained below
  • The message parameter in the messageReceiver callback contains an array of messages even if there's only one message polled.
  • If no bufferSize specified then the next polling will be made after the current one is process and acknowledged by calling done(messages).
  • Calling done(err) with an error will leave the message in the queue and the processing:error event will be fired. See events below.
  • In order to delete a message form the queue after it has been processed, call done(messages) where messages can be either one message or an array of messages.

Credentials

By default it uses the Environment Variables credentials as specified here. In order to specify them manually you can do:

const AWS = require('aws-sdk')
const Receiver = require('sqs-buffer')

const SQS = new AWS.SQS({
  region: 'eu-west-1',
  accessKeyId: '...',
  secretAccessKey: '...'
})

const worker = new Receiver({
  queueUrl: '...',
  sqs: SQS,
  messageReceiver: (messages, done) => {
    // ...
    done(messages)
  }
})

worker.start()

API

new Receiver(options)

Creates a new SQS receiver instance

options

  • queueUrl - String - REQUIRED - SQS queue URL
  • messageReceiver - Function - REQUIRED - A callback function to be called when a message is received or the buffer is flushed. (messageReceiver(messages, done)).
  • batchSize - Number - (default 1) - The number of messages to poll from SQS. Max. 10.
  • waitTimeSeconds - Number - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning reference.
  • visibilityTimeout - Number - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request reference.
  • attributeNames - Array - A list of attributes that need to be returned along with each message reference.
  • messageAttributeNames - Array - A list of message attributes to include in the response reference.
  • authenticationErrorTimeout - Number - (default 10000) - The duration (in milliseconds) to wait before making another request after an authentication error.
  • bufferSize - Number - The number of messages to be placed in buffer before processing. Should be greater than batchSize.
  • bufferTimeout - Number - The duration (in milliseconds) to wait before the buffer is flushed.
  • sqs - Object - An AWS.SQS instance in case you want to configure it's options manually.
  • buffer - Object - (default memory) - A class instance used to store the buffer into. More info bellow

receiver.start()

Start message polling

receiver.stop()

Stop message polling

Events

|Event|Params|Description| |-----|------|-----------| |error|err|Fired when a request error occurs| |message:received|message|Fired when new message(s) received| |processing:error|err|Fired when you call don(error)| |message:processed|message|Fired when the message(s) is/are processed| |stopped|None|Fired when receiver is stopped|

Buffer

The buffer is used to temporarily store SQS messages in order to be later processed in batches larger than the Amazon's 10 messages per batch limit. The buffer fires a flush event wich is then captured by the Receiver and then proxied to the messageReceiver(messages, done) callback. The flush event is fired either on bufferTimeout or bufferSize reached.

API

new Buffer(options)

Creates a new buffer instance

options

  • bufferSize - Number - Buffer size.
  • bufferTimeout - Number - The duration (in milliseconds) after which the buffer will flush.

Buffer.add(messages)

  • messages - |[] - The data type of each item is constrained by Amazon SQS. It can be a single item or an array of items.

Events

|Event|Params|Description| |-----|------|-----------| |flush|messages|Fired with either bufferSize is full or bufferTimeout reached.|

Custom Buffer

In order to use your own custom buffer (redis, mongo etc.) you would need to implement two things:

  1. Have a Buffer.add(messages) method that stores messages into the buffer
  2. Fire a flush event when the buffer is full or timer completed.

Your custom buffer

class MyBuffer extends EventEmitter {
    constructor (options) {
        super()
        // constructor
        
        this.buffer = []
    }
    
    add (messages) {
        // add messages into this.buffer
        
        // on bufferSize reached
        this.emit('flush', this.buffer)
        
        // on bufferTimeout
        this.emit('flush', this.buffer)
    }
}
const worker = new Receiver({
  queueUrl: '...',
  buffer: new MyBuffer(options),
  messageReceiver: (messages, done) => {
    // will be called once buffer is filled or timed out.
    done(messages)
  }
})

worker.start()

Licence

MIT Licence © Copyright 2016 C8 MANAGEMENT LIMITED