npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

dynamodb-stream-consumer

v0.0.0-alpha.9

Published

Utilities for building robust AWS Lambda consumers of stream events from Amazon Web Services (AWS) DynamoDB streams

Downloads

1

Readme

dynamodb-stream-consumer v0.0.0-alpha.9

Utilities for building robust AWS Lambda consumers of stream events from Amazon Web Services (AWS) DynamoDB streams.

Modules:

  • dynamo-consumer.js module
    • Utilities and functions to be used to configure and robustly consume messages from an AWS DynamoDB stream
  • dynamodb-processing.js module
    • Utilities for configuring DynamoDB stream processing, which configures and determines the processing behaviour of a DynamoDB stream consumer

Dependencies

  • aws-stream-consumer module - Common AWS stream consumer libraries used by this DynamoDB stream consumer module

Purpose

The goal of the AWS DynamoDB stream consumer functions is to make the process of consuming records from an AWS DynamoDB stream more robust for an AWS Lambda stream consumer by providing solutions to and workarounds for common AWS stream consumption issues.

Installation

This module is exported as a Node.js module.

Using npm:

$ {sudo -H} npm i -g npm
$ npm i --save dynamodb-stream-consumer

Usage

To use the dynamodb-stream-consumer module:

  • Define the tasks that you want to execute on individual messages and/or on the entire batch of messages
// Assuming the following example functions are meant to be used during processing:
function saveMessageToDynamoDB(message, context) { /* ... */ }
function sendPushNotification(notification, recipients, context) { /* ... */ }
function sendEmail(from, to, email, context) { /* ... */ }
function logMessagesToS3(messages, context) { /* ... */ }

// Import TaskDef
const TaskDef = require('task-utils/task-defs');

// Example of creating a task definition to be used to process each message one at a time
const saveMessageTaskDef = TaskDef.defineTask(saveMessageToDynamoDB.name, saveMessageToDynamoDB);

// Example of adding optional sub-task definition(s) to your task definitions as needed
saveMessageTaskDef.defineSubTask(sendPushNotification.name, sendPushNotification);
saveMessageTaskDef.defineSubTask(sendEmail.name, sendEmail);

// Example of creating a task definition to be used to process the entire batch of messages all at once 
const logMessagesToS3TaskDef = TaskDef.defineTask(logMessagesToS3.name, logMessagesToS3); // ... with any sub-task definitions needed

const processOneTaskDefs = [saveMessageTaskDef]; // ... and/or more "process one" task definitions
const processAllTaskDefs = [logMessagesToS3TaskDef]; // ... and/or more "process all" task definitions
  • Generate an AWS Lambda handler function that will configure and process stream events according to the given settings & options
const streamConsumer = require('dynamodb-stream-consumer');

const logging = require('logging-utils');
const LogLevel = logging.LogLevel;
const Settings = require('aws-stream-consumer/settings');

// Create a context object
const context = {}; // ... or your own pre-configured context object

// Use undefined to use the default settings, if the default behaviour is adequate
const settings = undefined; 
// OR - Use your own settings for custom configuration of any or all logging, stage handling and/or stream processing settings & functions
const settings2 = {/* ... */};

// Use your own options json file for custom configuration of any or all logging, stage handling and/or stream processing options
// - For sequenced Kinesis stream consumption, copy `default-kinesis-seq-options.json` as a starting point for your options file
// - For unsequenced Kinesis stream consumption, copy `default-kinesis-unseq-options.json` as a starting point for your options file
// - For DynamoDB stream consumption, copy `default-dynamodb-options.json` as a starting point for your options file 
const options = require('./your-custom-options.json');

// Generate an AWS Lambda handler function that will configure and process stream events 
// according to the given settings & options (and use defaults for optional arguments)
module.exports.handler = streamConsumer.generateHandlerFunction(context, settings, options, processOneTaskDefs, processAllTaskDefs);

// OR ... with optional arguments included
module.exports.handler = streamConsumer.generateHandlerFunction(context, settings, options, processOneTaskDefs, processAllTaskDefs, 
  LogLevel.DEBUG, 'Failed to do Xyz', 'Finished doing Xyz');
  • ALTERNATIVELY, configure your own AWS Lambda handler function using the following functions: (See dynamo-consumer.js generateHandlerFunction for an example handler function)

    • Configure the stream consumer
// Configure the stream consumer's dependencies and runtime settings
streamConsumer.configureStreamConsumer(context, settings, options, awsEvent, awsContext);
  • Process the AWS Kinesis or DynamoDB stream event
const promise = streamConsumer.processStreamEvent(awsEvent, processOneTaskDefs, processAllTaskDefs, context);
  • Within your custom task execute function(s), update the message's (or messages') tasks' and/or sub-tasks' states
  • Example custom "process one at a time" task execute function for processing one message at a time
function saveMessageToDynamoDB(message, context) {
  // Note that 'this' will be the currently executing task witin your custom task execute function
  const task = this; 
  const sendPushNotificationTask = task.getSubTask(sendPushNotification.name);
  const sendEmailTask = task.getSubTask(sendEmail.name);
  
  // ... or alternatively from anywhere in the flow of your custom execute code
  const task1 = streamConsumer.getProcessOneTask(message, saveMessageToDynamoDB.name, context);
  const subTaskA = task1.getSubTask(sendPushNotification.name);
  const subTaskB = task1.getSubTask(sendEmail.name);
  
  // ... execute your actual logic (e.g. save the message to DynamoDB) 
  
  // If your logic succeeds, then start executing your task's sub-tasks, e.g.
  
  sendPushNotificationTask.execute(notification, recipients, context);
  
  sendEmailTask.execute(from, to, email, context);
  
  // If necessary, change the task's and/or sub-tasks' states based on outcomes, e.g.
  task.fail(new Error('Task failed'));
  
  // ...
}

function sendPushNotification(notification, recipients, context) {
  const task = this;
  
  // ... execute your actual send push notification logic 
  
  // If necessary, change the task's state based on the outcome, e.g.
  task.succeed(result);

  // ...
}

function sendEmail(from, to, email, context) {
  const task = this;
  
  // ... execute your actual send email logic 
  
  // If necessary, change the task's state based on the outcome, e.g.
  task.reject('Invalid email address', new Error('Invalid email address'), true);

  // ...
}
  • Example custom "process all at once" task execute function for processing the entire batch of messages
function logMessagesToS3(messages, context) {
  // Note that 'this' will be the currently executing master task witin your custom task execute function
  // NB: Master tasks and sub-tasks will apply any state changes made to them to every message in the batch
  const masterTask = this; 
  const masterSubTask = masterTask.getSubTask('doX');
  
  // ... or alternatively from anywhere in the flow of your custom execute code
  const masterTask1 = streamConsumer.getProcessAllTask(messages, logMessagesToS3.name, context);
  const masterSubTask1 = masterTask1.getSubTask('doX');
  
  const masterSubTask2 = masterTask1.getSubTask('doY');
  
  // ...
  
  // Change the master task's and/or sub-tasks' states based on outcomes, e.g.
  masterSubTask1.succeed(subTask1Result);
  
  // ...
  
  masterSubTask2.reject('Cannot do X', new Error('X is un-doable'), true);
  
  // ...
  
  masterTask.fail(new Error('Task failed'));
    
  // ...
  
  // ALTERNATIVELY (or in addition) change the task state of individual messages
  const firstMessage = messages[0]; // e.g. working with the first message in the batch
  const messageTask1 = streamConsumer.getProcessAllTask(firstMessage, logMessagesToS3.name, context);
  const messageSubTask1 = messageTask1.getSubTask('doX');
  messageSubTask1.reject('Cannot do X on first message', new Error('X is un-doable on first message'), true);
  messageTask1.fail(new Error('Task failed on first message'));
  
  // ...
}

Unit tests

This module's unit tests were developed with and must be run with tape. The unit tests have been tested on Node.js v6.10.3.

Install tape globally if you want to run multiple tests at once:

$ npm install tape -g

Run all unit tests with:

$ npm test

or with tape:

$ tape test/*.js

See the package source for more details.

Changes

See CHANGES.md