sns-sqs-big-payload
v0.1.4
Published
![Build](https://github.com/aspecto-io/sns-sqs-big-payload/workflows/Build/badge.svg) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](http://makeapullrequest.com) [![TypeScript](https://badgen.net/npm/types/env
Downloads
7,401
Readme
sns-sqs-big-payload
SQS/SNS producer/consumer library. Provides an ability to pass payloads though s3.
Motivation
Aspecto helps modern development teams solve production issues before they evolve. We collect real production data and perform deep API analysis over it to autogenerate tests and monitor services stability. As a result, we often need to handle large payloads which can't be used with SQS & SNS due to the hard size limit. This library was developed to overcome this challenge - it enables you to manage Amazon SNS & SQS message payloads with Amazon S3 when dealing with payloads larger than 256KB. Key functionality includes:
- Controlling whether message payloads are always stored in Amazon S3 or only when a message's size exceeds 256KB.
- Send a message that references a single message object stored in an Amazon S3 bucket.
- Get the corresponding message object from an Amazon S3 bucket.
- Handle the interface for large messages between SNS to SQS via S3 bucket in the middle
Installation
npm install sns-sqs-big-payload
Important:
Make sure you also have
aws-sdk
installed, because it's listed as a peer dependency, so won't be installed automatically.
Usage
The library exports 3 clients:
- SNS producer
- SQS producer
- SQS consumer
All 3 clients are under the same repository since they share a similar contract when sending payloads via S3.
SNS Producer
import { SnsProducer } from 'sns-sqs-big-payload';
const snsProducer = SnsProducer.create({
topicArn: '<topic-arn>',
region: 'us-east-1',
// to enable sending large payloads (>256KiB) though S3
largePayloadThoughS3: true,
// Opt-in to enable compatibility with
// Amazon SQS Extended Client Java Library (and other compatible libraries).
// see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
extendedLibraryCompatibility: boolean,
s3EndpointUrl: '...',
});
await snsProducer.sendJSON({
// ...
});
SQS Producer
import { SqsProducer } from 'sns-sqs-big-payload';
const sqsProducer = SqsProducer.create({
queueUrl: '...',
region: 'us-east-1',
// to enable sending large payloads (>256KiB) though S3
largePayloadThoughS3: true,
// Opt-in to enable compatibility with
// Amazon SQS Extended Client Java Library (and other compatible libraries).
// see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
extendedLibraryCompatibility: boolean,
s3Bucket: '...',
});
await sqsProducer.sendJSON({
// ...
});
SQS Consumer
import { SqsConsumer, SqsConsumerEvents } from 'sns-sqs-big-payload';
const sqsConsumer = SqsConsumer.create({
queueUrl: '...',
region: 'us-east-1',
// to enable loading payloads from S3 automatically
getPayloadFromS3: true,
s3Bucket: '...',
// if the queue is subscribed to SNS
// the message will arrive wrapped in sns envelope
// so we need to unwrap it first
transformMessageBody: (body) => {
const snsMessage = JSON.parse(body);
return snsMessage.Message;
},
// if you expect json payload - use `parsePayload` hook to parse it
parsePayload: (raw) => JSON.parse(raw),
// message handler, payload already parsed at this point
handleMessage: async ({ payload }) => {
// ...
},
// Opt-in to enable compatibility with
// Amazon SQS Extended Client Java Library (and other compatible libraries).
// see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
extendedLibraryCompatibility: boolean,
});
// to subscribe for events
sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
// ...
});
sqsConsumer.start();
// to stop processing
sqsConsumer.stop();
- The queue is polled continuously for messages using long polling.
- Messages are deleted from the queue once the handler function has completed successfully.
- Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
- By default messages are processed by 10 at a time – a new batch won't be received until the previous one is processed. To adjust number of messages that is being processed in parallel, use the
batchSize
option detailed below.
Usage in lambda
If you have a lambda function subscribed to sqs queue, you can use SqsConsumer in this case too. This is a short guide.
Compatibility with libraries in other languages
If you turn on extendedLibraryCompatibility
, then the library will be compatible with:
- Amazon SQS Extended Client Library for Java (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html)
- Boto3 Sqs Extended Client Lib for python (https://github.com/timothymugayi/boto3-sqs-extended-client-lib)
- other libraries that are compatible to the above
Please be careful: This mode is not compatible with the standard mode due to differences in s3 payload.
Credentials
By default the consumer will look for AWS credentials in the places specified by the AWS SDK. The simplest option is to export your credentials as environment variables:
export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...
If you need to specify your credentials manually, you can use a pre-configured instance of the AWS SQS client:
import { SqsConsumer } from 'sns-sqs-big-payload';
import * as aws from 'aws-sdk';
aws.config.update({
region: 'us-east-1',
accessKeyId: '...',
secretAccessKey: '...',
});
const consumer = SqsConsumer.create({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// ...
},
sqs: new aws.SQS(),
});
consumer.start();
Events and logging
SqsConsumer has an internal EventEmitter, you can subscribe for events like this:
sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
// ...
});
It sends the following events:
| Event | Params | Description |
| ------------------------- | ---------------------- | ----------------------------------------------------------------------------------- |
| started | None | Fires when the polling is started |
| message-received | message
| Fires when a message is received (one per each message, not per batch) |
| message-processed | message
| Fires after the message is successfully processed and removed from the queue |
| batch-processed | None | Fires after the current batch of messages is processed. |
| poll-ended | None | Fires after the polling cycle is ended. Useful for graceful shutdown. |
| stopped | None | Fires when the polling stops |
| error | {err, message}
| Fires in case of processing error |
| s3-payload-error | {err, message}
| Fires when an error occurs during downloading payload from s3 |
| s3-extended-payload-error | {err, message}
| Fires when a payload from s3 using extended compatibility is not in expected format |
| processing-error | {err, message}
| Fires when an error occurs during processing (only inside handleMessage
function) |
| connection-error | err
| Fires when a connection error occurs during polling (retriable) |
| payload-parse-error | err
| Fires when a connection error occurs during parsing |
You can also use this enum if you're using TypeScript
enum SqsConsumerEvents {
started = 'started',
messageReceived = 'message-received',
messageProcessed = 'message-processed',
batchProcessed = 'batch-processed',
pollEnded = 'poll-ended',
stopped = 'stopped',
error = 'error',
s3PayloadError = 's3-payload-error',
s3ExtendedPayloadError = 's3-extended-payload-error',
processingError = 'processing-error',
connectionError = 'connection-error',
payloadParseError = 'payload-parse-error',
}
You may subscribe to those events to add logging for example.
Testing
Since this library heavily relies on AWS APIs, it is less relevant to run an isolated test using mocks. As a result, we recommend testing it using a localstack or by using real SQS queues and SNS topics.
To run localstack on mac:
TMPDIR=/private$TMPDIR docker-compose up
To run unit tests:
npm test