aws-stream-consumer
v1.1.1
Published
Utilities for building robust AWS Lambda consumers of stream events from Amazon Web Services (AWS) Kinesis or DynamoDB streams
Downloads
12
Readme
aws-stream-consumer v1.1.1
Utilities for building robust AWS Lambda consumers of stream events from Amazon Web Services (AWS) Kinesis or DynamoDB streams.
Modules:
stream-consumer.js
module- Utilities and functions to be used to configure and robustly consume messages from an AWS Kinesis or DynamoDB stream event
stream-processing.js
module- Utilities for configuring stream processing, which configures and determines the processing behaviour of a stream consumer
Purpose
The goal of the AWS stream consumer functions is to make the process of consuming records from an AWS Kinesis or DynamoDB stream more robust for an AWS Lambda stream consumer by providing solutions to and workarounds for common AWS stream consumption issues.
Common AWS stream consumption issues
The fundamental issue is that either all of a stream event's records must be processed successfully or an error must be thrown back to AWS Lambda to trigger a replay of all of the event's records again (assuming that you don't want to lose any of the records). This course-grained error handling makes no distinction between persistent and transient errors and does not provide a way to only reprocess unsuccessful records.
The fact that AWS stream event records should always be processed in batches from the AWS stream (to increase throughput and reduce the risk of slow consumption ultimately leading to message loss), both increases the complexity and the chance of failures. For example, while processing a batch of 100 messages, if processing fails on only 1 message with a transient error, then ideally we would want to only replay that 1 failed message, but the only replay option is to throw an error that will trigger a replay of the entire batch of messages.
Any persistent error encountered, which is unhandled, is fatal, because any record that cannot be processed due to a persistent error will block the shard from which it came (and all the records behind it), since the stream will continuously redeliver this record until it expires 24 hours to 7 days later (depending on your stream retention configuration). At expiry, the record will be lost and the records behind it with similar ages are also at risk of being lost.
A "poisonous" record that always causes an error to be thrown back to AWS Lambda when an attempt is made to parse it into a message, will block the shard from which it came until it expires.
A successfully parsed, but still invalid message that can NEVER be successfully processed also blocks its shard until it expires.
Tasks/functions, which are executed on a message or batch of messages, that fail "indefinitely" will similarly block the shard from which the message(s) originated.
Each AWS Lambda invocation has a configurable, but limited number of seconds that it is allowed to run and if a batch of messages cannot be fully processed within that time, then the invocation will be timed out and an error will be thrown back to AWS Lambda, which will cause the same batch of messages to be replayed again and, in the worst case scenario, continue to time out and replay indefinitely until the batch of messages expires.
Solutions to and workarounds for the above issues provided by aws-stream-consumer:
Any and all errors encountered during processing of a record or its extracted message are caught, logged and handled either by "discarding" the unusable record or by tracking them as failed task states on each message. A task tracking object is attached to each message to keep track of the state of each and every task (i.e. custom execute/processing function) applied to a message. The stream consumer attempts to persist this task tracking information by resubmitting incomplete messages with this information back to their shard instead of throwing these errors back to AWS Lambda. This enables more fine-grained error handling and reprocessing of only incomplete messages.
Each message has its own task tracking information, so whether or not a single message or a batch of messages is being consumed makes no difference. The information enables the stream consumer to determine which messages are completely processed and which messages are still incomplete and then only replay incomplete messages by resubmitting them back to their shard.
Persistent errors can be dealt with by preferably explicitly rejecting a failing task, which marks it as 'Rejected', within the task's custom execute function, which is the responsibility of the developer of the custom task execute function, or along with transient errors by "discarding" a message when all of its failing tasks have reached the maximum number of allowed attempts.
Any error thrown during the extraction of a message from an event record, will be caught and logged and the record will be then treated as an "unusable" record. Any such unusable record will be "discarded" by passing it to the configurable
discardUnusableRecords
function to be dealt with. The defaultdiscardUnusableRecordsToDRQ
function, routes these unusable records to a Kinesis "Dead Record Queue (DRQ)" stream.Invalid messages that can never be successfully processed should ideally be identified and their failing task(s) should be rejected, which marks them as 'Rejected', within the custom task execute function. If this is not done, then invalid messages will be indistinguishable from valid messages that could not be successfully processed within the allowed number of attempts.
Task tracking includes tracking the number of attempts at each task on each message, which enables the stream consumer to "discard" a message when all of its failing tasks have reached the maximum number of allowed attempts by discarding these tasks, which marks them as 'Discarded', and then passing the message to the configurable
discardRejectedMessages
function to be dealt with. The defaultdiscardRejectedMessagesToDMQ
function, routes these rejected messages to a Kinesis "Dead Message Queue (DMQ)" stream.The stream consumer attempts to deal with the issue of AWS Lambda time outs by setting up its own time out at a configurable percentage of the remaining time that the AWS Lambda invocation has to execute. This time out races against the completion of all processing tasks on all of the messages in the batch. If the time out triggers before processing has completed, the stream consumer finalises message processing prematurely with the current state of the messages' tasks with the view that its better to preserve at least some of the task tracking information on each message than none. The stream consumer finalises message processing in both the time out case and the successful processing completion case by freezing all of the messages' tasks, which prevents subsequent updates by any still in progress tasks in the time out case, by ensuring that the discarding of any unusable records has completed, by resubmitting any incomplete messages back to their shard and by discarding any finalised message that contains a task that was rejected (explicitly by custom task execute functions), discarded (due to exceeded attempts) or abandoned (if code changes make previous task definitions obsolete). If the stream consumer is unable to finalise message processing due to an error, then it is unfortunately left with no choice, but to throw the error back to AWS Lambda to trigger a replay of the entire batch of records to prevent message loss. These errors need to be monitored.
Current limitations
- The default configuration currently supports consuming AWS Kinesis stream events.
- While the current stream consumer code allows for customisation of stream processing behaviour to support AWS DynamoDB stream events, there is currently no out-of-the-box default configuration for supporting AWS DynamoDB stream events.
- The AWS stream consumer functions focus on ensuring "at least once" message delivery semantics, so currently there is no support planned for "at most once" message delivery semantics.
- The message resubmission strategy attempts to preserve some semblance of the original sequence by resubmitting messages using the Kinesis SequenceNumberForOrdering parameter set to the source record's sequence number. However, this does not guarantee that the original sequence will be preserved, so if message sequence is vital you will need to cater for this separately.
Installation
This module is exported as a Node.js module.
Using npm:
$ {sudo -H} npm i -g npm
$ npm i --save aws-stream-consumer
Usage
To use the aws-stream-consumer
module:
// ---------------------------------------------------------------------------------------------------------------------
// Define the tasks that you want to execute on individual messages and/or on the entire batch of messages
// ---------------------------------------------------------------------------------------------------------------------
// Import TaskDef
const taskDefs = require('task-utils/task-defs');
const TaskDef = taskDefs.TaskDef;
// Define a function that will generate any new process "one at a time" task definition(s) needed
function generateProcessOneTaskDefs() {
// Example of creating a task definition to be used to process each message one at a time
const saveMessageTaskDef = TaskDef.defineTask(saveMessageToDynamoDB.name, saveMessageToDynamoDB);
// Example of adding optional sub-task definition(s) to your task definitions as needed
saveMessageTaskDef.defineSubTask(sendPushNotification.name, sendPushNotification);
saveMessageTaskDef.defineSubTask(sendEmail.name, sendEmail);
return [saveMessageTaskDef]; // ... and/or more task definitions
}
// Define a function that will generate any new process "all at once" task definition(s) needed
function generateProcessAllTaskDefs() {
// Example of creating a task definition to be used to process the entire batch of messages all at once
const logMessagesToS3TaskDef = TaskDef.defineTask(logMessagesToS3.name, logMessagesToS3);
// ... with any sub-task definitions needed
return [logMessagesToS3TaskDef]; // ... and/or more task definitions
}
// ---------------------------------------------------------------------------------------------------------------------
// Generate an AWS Lambda handler function that will configure and process stream events according to the given settings
// & options
// ---------------------------------------------------------------------------------------------------------------------
const streamConsumer = require('aws-stream-consumer/stream-consumer');
const logging = require('logging-utils');
// Create a context object
const context = {}; // ... or your own pre-configured context object
const settings = undefined; // ... or your own settings for custom configuration of any or all logging, stage handling and/or stream processing settings
const options = require('aws-stream-consumer/default-kinesis-options.json'); // ... or your own options for custom configuration of any or all logging, stage handling, kinesis and/or stream processing options
// Generate an AWS Lambda handler function that will configure and process stream events
// according to the given settings & options (and use defaults for optional arguments)
module.exports.handler = streamConsumer.generateHandlerFunction(context, settings, options, generateProcessOneTaskDefs,
generateProcessAllTaskDefs);
// OR ... with optional arguments included
module.exports.handler = streamConsumer.generateHandlerFunction(context, settings, options, generateProcessOneTaskDefs,
generateProcessAllTaskDefs, logging.DEBUG, 'Failed to ...', 'Finished ...');
// ---------------------------------------------------------------------------------------------------------------------
// ALTERNATIVELY, configure your own AWS Lambda handler function using the following functions:
// (See `stream-consumer.js` `generateHandlerFunction` for an example handler function)
// ---------------------------------------------------------------------------------------------------------------------
// Configure the stream consumer's dependencies and runtime settings
streamConsumer.configureStreamConsumer(context, settings, options, awsEvent, awsContext);
// Process the AWS Kinesis (or DynamoDB) stream event
// ---------------------------------------------------------------------------------------------------------------------
const promise = streamConsumer.processStreamEvent(awsEvent, generateProcessOneTaskDefs, generateProcessAllTaskDefs, context);
// ---------------------------------------------------------------------------------------------------------------------
// Within your custom task execute function(s), update the message's (or messages') tasks' and/or sub-tasks' states
// Example custom "process one" task execute function for processing a single, individual message at a time
// ---------------------------------------------------------------------------------------------------------------------
//noinspection JSUnusedLocalSymbols
function saveMessageToDynamoDB(message, context) {
// Note that 'this' will be the currently executing task within your custom task execute function
const task = this;
const sendPushNotificationTask = task.getSubTask(sendPushNotification.name);
const sendEmailTask = task.getSubTask(sendEmail.name);
// ... OR ALTERNATIVELY from anywhere in the flow of your custom execute code
const task1 = streamConsumer.getProcessOneTask(message, saveMessageToDynamoDB.name, context);
//noinspection JSUnusedLocalSymbols
const subTaskA = task1.getSubTask(sendPushNotification.name);
//noinspection JSUnusedLocalSymbols
const subTaskB = task1.getSubTask(sendEmail.name);
// ... execute your actual logic (e.g. save the message to DynamoDB)
// If your logic succeeds, then start executing your task's sub-tasks, e.g.
sendPushNotificationTask.execute('Welcome back', ['+27835551234'], context);
sendEmailTask.execute('[email protected]', '[email protected]', 'Welcome back', context);
// If necessary, change the task's and/or sub-tasks' states based on outcomes, e.g.
task.fail(new Error('Task failed'));
// ...
}
//noinspection JSUnusedLocalSymbols
function sendPushNotification(notification, recipients, context) {
const task = this;
// ... execute your actual send push notification logic
// If necessary, change the task's state based on the outcome, e.g.
task.succeed(result);
// ...
}
//noinspection JSUnusedLocalSymbols
function sendEmail(from, to, email, context) {
const task = this;
// ... execute your actual send email logic
// If necessary, change the task's state based on the outcome, e.g.
task.reject('Invalid email address', new Error('Invalid email address'), true);
// ...
}
// ---------------------------------------------------------------------------------------------------------------------
// Example custom "process all at once" task execute function for processing the entire batch of messages
// ---------------------------------------------------------------------------------------------------------------------
//noinspection JSUnusedLocalSymbols
function logMessagesToS3(messages, context) {
// Note that 'this' will be the currently executing master task within your custom task execute function
// NB: Master tasks and sub-tasks will apply any state changes made to them to every message in the batch
const masterTask = this;
//noinspection JSUnusedLocalSymbols
const masterSubTask = masterTask.getSubTask('doX');
// ... or alternatively from anywhere in the flow of your custom execute code
const masterTask1 = streamConsumer.getProcessAllTask(messages, logMessagesToS3.name, context);
const masterSubTask1 = masterTask1.getSubTask('doX');
const masterSubTask2 = masterTask1.getSubTask('doY');
// ...
// Change the master task's and/or sub-tasks' states based on outcomes, e.g.
masterSubTask1.succeed('subTask1Result');
// ...
masterSubTask2.reject('Cannot do X', new Error('X is un-doable'), true);
// ...
masterTask.fail(new Error('Task failed'));
// ...
// ALTERNATIVELY (or in addition) change the task state of individual messages
const firstMessage = messages[0]; // e.g. working with the first message in the batch
const messageTask1 = streamConsumer.getProcessAllTask(firstMessage, logMessagesToS3.name, context);
const messageSubTask1 = messageTask1.getSubTask('doX');
messageSubTask1.reject('Cannot do X on first message', new Error('X is un-doable on first message'), true);
messageTask1.fail(new Error('Task failed on first message'));
// ...
}
- Advanced customisation of your stream consumer's dependencies & stream processing behaviour (if needed):
- Advanced customisation of the logging dependency:
// Configure logging
const logging = require('logging-utils');
const forceConfiguration = false;
// EITHER - configure with your own custom logging settings and/or logging options
logging.configureLogging(context, loggingSettings, loggingOptions, undefined, forceConfiguration);
// ... OR - simply use overriding loggingOptions with the default logging configuration
logging.configureDefaultLogging(context, loggingOptions, undefined, forceConfiguration);
- Advanced customisation of the stage handling dependency:
// Configure stage-handling, which determines the behaviour of the stage handling functions
const stages = require('aws-core-utils/stages');
// EITHER - configure with your own custom stage handling settings and/or stage handling options
stages.configureStageHandling(context, stageHandlingSettings, stageHandlingOptions, otherSettings, otherOptions, forceConfiguration);
// ... OR - start with the default settings and override with your own custom stage-handling configuration
const stageHandlingSettings = stages.getDefaultStageHandlingSettings(stageHandlingOptions);
// Optionally override the default stage handling functions with your own custom functions
// stageHandlingSettings.customToStage = undefined;
// stageHandlingSettings.convertAliasToStage = stages.DEFAULTS.convertAliasToStage;
// stageHandlingSettings.injectStageIntoStreamName = stages.DEFAULTS.toStageSuffixedStreamName;
// stageHandlingSettings.extractStageFromStreamName = stages.DEFAULTS.extractStageFromSuffixedStreamName;
// stageHandlingSettings.injectStageIntoResourceName = stages.DEFAULTS.toStageSuffixedResourceName;
// stageHandlingSettings.extractStageFromResourceName = stages.DEFAULTS.extractStageFromSuffixedResourceName;
stages.configureStageHandling(context, stageHandlingSettings, undefined, otherSettings, otherOptions, forceConfiguration);
// ... OR - simply override the default stage handling options with your custom stageHandlingOptions
stages.configureDefaultStageHandling(context, stageHandlingOptions, otherSettings, otherOptions, forceConfiguration);
// Note that this last approach does NOT give you the option of overriding the default stage handling functions, which
// can only be configured via stage handling settings (i.e. not via stage handling options)
- Advanced customisation and caching of an AWS Kinesis instance (if needed)
// Configure and cache a default Kinesis instance (if you are using the default stream processing configuration or you are using Kinesis)
const kinesisCache = require('aws-core-utils/kinesis-cache');
// NB: Only specify a region in the kinesisOptions if you do NOT want to use your AWS Lambda's current region
kinesisCache.configureKinesis(context, kinesisOptions);
- Advanced customisation of your stream consumer's stream processing behaviour:
// Configure stream processing
const streamProcessing = require('aws-stream-consumer/stream-processing');
// EITHER - configure with your own custom stream processing settings and/or stream processing options
streamProcessing.configureStreamProcessing(context, streamProcessingSettings, streamProcessingOptions, settings, options,
awsEvent, awsContext, forceConfiguration);
// ... OR - start with the default settings and override with your own custom stream processing settings
const streamProcessingSettings = streamProcessing.getDefaultKinesisStreamProcessingSettings(streamProcessingOptions);
// Optionally override the default stream processing functions with your own custom functions
// streamProcessingSettings.extractMessageFromRecord = streamProcessing.DEFAULTS.extractJsonMessageFromKinesisRecord;
// streamProcessingSettings.loadTaskTrackingState = streamProcessing.DEFAULTS.skipLoadTaskTrackingState;
// streamProcessingSettings.saveTaskTrackingState = streamProcessing.DEFAULTS.skipSaveTaskTrackingState;
// streamProcessingSettings.handleIncompleteMessages = streamProcessing.DEFAULTS.resubmitIncompleteMessagesToKinesis;
// streamProcessingSettings.discardUnusableRecords = streamProcessing.DEFAULTS.discardUnusableRecordsToDRQ;
// streamProcessingSettings.discardRejectedMessages = streamProcessing.DEFAULTS.discardRejectedMessagesToDMQ;
streamProcessing.configureStreamProcessing(context, streamProcessingSettings, undefined, settings, options,
awsEvent, awsContext, forceConfiguration);
// ... OR - simply override the default stream processing options with your custom streamProcessingOptions
streamProcessing.configureDefaultKinesisStreamProcessing(context, streamProcessingOptions, settings, options,
awsEvent, awsContext, forceConfiguration);
// Note that this last approach does NOT give you the option of overriding the default stream processing functions,
// which can only be configured via stream processing settings (i.e. not via stream processing options)
Unit tests
This module's unit tests were developed with and must be run with tape. The unit tests have been tested on Node.js v4.3.2.
Install tape globally if you want to run multiple tests at once:
$ npm install tape -g
Run all unit tests with:
$ npm test
or with tape:
$ tape test/*.js
See the package source for more details.
Changes
1.1.1
- Minor changes to remove logging of potentially large objects
1.1.0
- Changes to
stream-processing
module:- Major refactoring of
discardUnusableRecordsToDRQ
,toDRQPutRequestFromUnusableRecord
,toDRQPutRequestFromKinesisUnusableRecord
&toDRQPutRequestFromDynamoDBUnusableRecord
functions (including addition of newbatchKey
arguments) - Major refactoring of
discardRejectedMessagesToDMQ
,toDMQPutRequestFromRejectedMessage
,toDMQPutRequestFromKinesisRejectedMessage
&toDMQPutRequestFromDynamoDBRejectedMessage
functions (including addition of newbatchKey
arguments) - Added default
batchKeyedOnEventID
option - Added back-ports of
resolveBatchKey
,getFunctionNameVersionAndAlias
,resolveConsumerId
,getTaskTrackingName
,getTaskTracking
,deleteTaskTracking
,getKinesisShardId
,getKinesisShardIdFromEventID
&isBatchKeyedOnEventID
functions from various[email protected]
modules & from latest uncommittedaws-stream-consumer
,kinesis-stream-consumer
&dynamodb-stream-consumer
modules
- Major refactoring of
- Changes to
stream-consumer
module:- Added
getTaskTrackingName
&deleteTaskTracking
functions
- Added
- Changes to
type-defs
module:- Added backport of
BatchKey
&BatchKeyComponents
type definitions
- Added backport of
- Updated
aws-core-utils
dependency to version 5.1.1 - Updated
core-functions
dependency to version 2.0.18 - Updated
logging-utils
dependency to version 3.0.18 - Updated
task-utils
dependency to version 4.0.14 - Updated
aws-sdk
dev dependency to version 2.92.0
1.0.15
- Upgraded
aws-core-utils
dependency to 5.1.0 - Updated
core-functions
dependency to version 2.0.17 - Updated
logging-utils
dependency to version 3.0.17 - Updated
task-utils
dependency to version 4.0.13
1.0.14
- Moved test devDependencies to package.json & removed test/package.json
- Upgraded
aws-core-utils
dependency to 5.0.26 - Updated
core-functions
dependency to version 2.0.16 - Updated
logging-utils
dependency to version 3.0.16 - Updated
task-utils
dependency to version 4.0.12
1.0.13
- Upgraded
aws-core-utils
dependency to 5.0.25 - Updated
core-functions
dependency to version 2.0.15 - Updated
logging-utils
dependency to version 3.0.13 - Upgraded
task-utils
dependency to 4.0.11
1.0.12
- Updated
aws-core-utils
dependency to version 5.0.24 - Upgraded
aws-sdk
dev dependency to 2.54.0
1.0.11
- Updated
README.md
1.0.10
- Updated
aws-core-utils
dependency to version 5.0.23
1.0.9
- Updated
aws-core-utils
dependency to version 5.0.22 - Updated
uuid
test dependency to version 3.1.0
1.0.8
- Changes to inner
handler
function ofgenerateHandlerFunction
function ofstream-consumer
module:- Restored UNSAFE fallback to use of
generateProcessOneTaskDefs
&generateProcessAllTaskDefs
parameters as arrays for BACKWARD COMPATIBILITY ONLY! Do NOT pass as arrays in NEW code & refactor OLD code to use functions ASAP!
- Restored UNSAFE fallback to use of
- Updated
task-utils
dependency to version 4.0.10
1.0.7
- Updated
task-utils
dependency to version 4.0.9
1.0.6
- Critical fixes to
aws-stream-consumer
module to avoid shared state bugs withprocessOneTaskDefsOrNone
&processAllTaskDefsOrNone
parameters- Replaced problematic, shared state
processOneTaskDefsOrNone
&processAllTaskDefsOrNone
parameters of thegenerateHandlerFunction
function with new optionalgenerateProcessOneTaskDefs
andgenerateProcessAllTaskDefs
function parameters, which are used to generate clean, unshared lists of "process one" and "process all" task definitions for each new run
- Replaced problematic, shared state
1.0.5
- Updated
task-utils
dependency to version 4.0.8 - Changes to
aws-stream-consumer
module:- Changed
discardIncompleteTasksIfMaxAttemptsExceeded
function to abandon any "dead" unusable & unstarted tasks and sub-tasks that would otherwise block their fully finalised and/or unusable root tasks from completing
- Changed
1.0.4
- Changed
executeUpdateStateAndReturnPromise
function returned bytaskExecutePromiseFactory
function of thestream-consumer
module:- To look for a
context
in the last argument position - instead of only in 2nd argument position - To allow the default recursive starting of a task's sub-tasks to be disabled via
context.streamProcessing.startTasksNonRecursively
- To look for a
1.0.3
- Updated
aws-core-utils
dependency to version 5.0.21
1.0.2
- Updated
aws-core-utils
dependency to version 5.0.20
1.0.1
- Updated
aws-core-utils
dependency to version 5.0.19
1.0.0
- Back ported patch for finalising timeout issue
- Locked down versions in
package.json
1.0.0-beta.18
- Fixed critical module-scope defects in
generateHandlerFunction
function instream-consumer
module - Updated
core-functions
dependency to version 2.0.14 - Updated
logging-utils
dependency to version 3.0.12 - Updated
task-utils
dependency to version 4.0.7 - Updated
aws-core-utils
dependency to version 5.0.17
1.0.0-beta.17
- Added new
generateHandlerFunction
function tostream-consumer.js
module - More improvements to typedefs in
type-defs.js
,stream-consumer.js
&stream-processing.js
modules - Updated
core-functions
dependency to version 2.0.12 - Updated
logging-utils
dependency to version 3.0.10 - Updated
aws-core-utils
dependency to version 5.0.16
1.0.0-beta.16
- Changes to unit tests to ensure reset of environment variables
1.0.0-beta.15
- Changes to
type-defs.js
module:- Renamed
StreamConsuming
typedef toStreamConsumerContext
& changed it to extend fromStandardContext
- Changed
StreamConsumerSettings
typedef to extend fromStandardSettings
- Changed
StreamConsumerOptions
typedef to extend fromStandardOptions
- Changed
StreamProcessing
typedef to extend fromStandardContext
- Removed
SPOtherSettings
&SPOtherOptions
typedefs (replaced byStandardSettings
&StandardOptions
) - Removed optional
kinesisOptions
&dynamoDBDocClientOptions
fromStreamProcessingOptions
typedef
- Renamed
- Changes to
stream-processing.js
module:- Added optional AWS event and AWS context arguments to
configureStreamProcessing
,configureStreamProcessingWithSettings
,configureDefaultKinesisStreamProcessing
andconfigureDefaultDynamoDBStreamProcessing
functions to enable full or partial stream consumer configuration - Changed
configureStreamProcessingWithSettings
function to use newaws-core-utils/contexts
module'sconfigureStandardContext
function - Removed unnecessary
configureDependencies
function - Improved JsDoc type definitions of all configuration functions
- Added optional AWS event and AWS context arguments to
- Changes to
stream-consumer.js
module:- Changed JsDoc comments of
configureStreamConsumer
function to allow its AWS event and AWS context arguments to be optional, to enable partial configuration that must be completed before invokingprocessStreamEvent
by invoking theconfigureRegionStageAndAwsContext
function of theaws-core-utils/stages
module - Changed
configureStreamConsumer
function to pass its now optional AWS event and AWS context through to thestream-processing
module's modifiedconfigureStreamProcessing
function - Removed
configureRegionStageAndAwsContext
function, which was moved toaws-core-utils/stages
module - Improved JsDoc type definitions of all configuration functions
- Changed JsDoc comments of
- Renamed
kinesis-options.json
file todefault-kinesis-options.json
- Renamed
dynamodb-options.json
file todefault-dynamodb-options.json
- Updated
logging-utils
dependency to version 3.0.9 - Updated
aws-core-utils
dependency to version 5.0.12
1.0.0-beta.14
- Fixed broken unit tests by changing incorrect imports of
node-uuid
touuid
- Updated
aws-core-utils
dependency to version 5.0.10
1.0.0-beta.13
- Fixed missing return type in
configureStreamConsumer
function instream-consumer.js
module - Fixed missing return type in
configureDependencies
function instream-processing.js
module - Moved all typedefs from
stream-consumer.js
&stream-processing.js
modules to newtype-defs.js
module - Added new
StreamConsuming
andStreamProcessing
typedefs to newtype-defs.js
module - Changed the argument and return types on many of the
stream-consumer.js
&stream-processing.js
functions to use the existing and new typedefs - Updated
logging-utils
dependency to version 3.0.8 - Updated
aws-core-utils
dependency to version 5.0.9
1.0.0-beta.12
- Updated
core-functions
dependency to version 2.0.11 - Updated
logging-utils
dependency to version 3.0.6 - Updated
task-utils
dependency to version 4.0.5 - Updated
aws-core-utils
dependency to version 5.0.6 - Replaced
node-uuid
dependency withuuid
dependency intest\package.json
- Removed
context.streamConsumer
property that is no longer used
1.0.0-beta.11
- Changes to
stream-consumer
module: - Fixed logging defect in
awaitAndLogStreamProcessingPartialResults
function - Renamed
StreamProcessingResults
typedef toStreamConsumerResults
- Changes to
StreamConsumerResults
typedef:- Removed
processingCompleted
,processingFailed
&processingTimedOut
properties - Added
processing
&finalising
task properties - Added
savedMessagesTaskTrackingState
&saveMessagesTaskTrackingStateError
properties - Added
partial
,saveMessagesTaskTrackingStatePromise
,handleIncompleteMessagesPromise
,discardUnusableRecordsPromise
&discardRejectedMessagesPromise
properties
- Removed
- Renamed
StreamProcessingError
typedef toStreamConsumerError
- Changes to
StreamConsumerError
typedef:- Renamed
streamProcessingPartialResults
property tostreamConsumerResults
and made it optional
- Renamed
- Added
SummarizedStreamComsumerResults
typedef - Added
summarizeStreamConsumerResults
function to create a summary fromStreamConsumerResults
- Removed internal
isProcessingCompleted
,isProcessingFailed
&isProcessingTimedOut
functions - Renamed
awaitStreamProcessingPartialResults
function toawaitStreamConsumerResults
- Renamed
awaitAndLogStreamProcessingPartialResults
function toawaitAndLogStreamConsumerResults
- Changed
processStreamEvents
to track the processing phase state via a processing task - Refactored
createTimeoutPromise
&createCompletedPromise
functions to accept and update the state of the current phase task and to enable them to also be used during the finalising phase - Changed
finaliseMessageProcessing
to track the finalising phase state via a finalising task, to wait for all finalising promises to resolve/reject and to set up a timeout race with the finalising promises - Added internal
timeoutMessagesProcessOneAndAllTasks
,completeStreamConsumerResults
,logStreamConsumerResults
,addPartialStreamConsumerResultsToError
,logPartialStreamConsumerResults
,getPhaseTasksByName
&getPhaseTask
functions - Changed
awaitStreamConsumerResults
&awaitAndLogStreamConsumerResults
functions to acceptStreamConsumerResults
instead of errors - Added calls to new internal
completeStreamConsumerResults
,logStreamConsumerResults
,addPartialStreamConsumerResultsToError
&logPartialStreamConsumerResults
functions tofinaliseMessageProcessing
function to update and log stream consumer results & summaries - Added asynchronous calls to
awaitAndLogStreamConsumerResults
function tofinaliseMessageProcessing
via the newlogPartialStreamConsumerResults
function - Updated
core-functions
dependency to version 2.0.10 - Updated
logging-utils
dependency to version 3.0.5 - Updated
task-utils
dependency to version 4.0.3 - Updated
aws-core-utils
dependency to version 5.0.5
1.0.0-beta.10
- Changes to
stream-consumer
module:- Added
awaitStreamProcessingPartialResults
function to enable waiting for partial stream processing results - Added
awaitAndLogStreamProcessingPartialResults
function to enable waiting for and logging of partial stream processing results - Changed comments and logging still referring to resubmitting of incomplete messages to use the term handling instead
- Changed
StreamProcessingResults
typedef to also cater for finalised partial stream processing results:- Renamed
resubmittedIncompleteMessages
property tohandledIncompleteMessages
- Changed
handledIncompleteMessages
,discardedUnusableRecords
&discardedRejectedMessages
properties to optional - Added optional
handleIncompleteMessagesError
,discardUnusableRecordsError
&discardRejectedMessagesError
properties
- Renamed
- Added
- Changes to
stream-processing
module:- Changed comments and logging still referring to resubmitting of incomplete messages to use the term handling instead
1.0.0-beta.9
- Changes to
stream-processing
module:- Removed
FOR_TESTING_ONLY
exports - Exposed
configureStreamProcessingWithSettings
function as normal (non-testing) export, since its useful externally
- Removed
- JSDoc changes to
stream-consumer
module:- Renamed
Settings
typedef toStreamConsumerSettings
- Renamed
Options
typedef toStreamConsumerOptions
- Renamed
1.0.0-beta.8
- Changes to
stream-processing.js
module:- Changed
configureStreamProcessing
function to usecore-functions/objects
module'scopy
andmerge
functions to ensure that any and all given custom settings and options are not lost - Changed
getDefaultKinesisStreamProcessingSettings
,getDefaultDynamoDBStreamProcessingSettings
,loadDefaultKinesisStreamProcessingOptions
andloadDefaultDynamoDBStreamProcessingOptions
functions to usecore-functions/objects
module'scopy
andmerge
functions to ensure that any and all given custom options are not lost
- Changed
- Updated
core-functions
dependency to version 2.0.7 - Updated
logging-utils
dependency to version 3.0.1 - Updated
aws-core-utils
dependency to version 5.0.2 - Updated
task-utils
dependency to version 3.0.3 - Updated
tape
dependency to 4.6.3
1.0.0-beta.7
- Deleted
stream-consumer-config
module:- Deleted some of its logic and moved remainder into
stream-consumer
module - Removed configuration of
context.streamConsumer.resubmitStreamName
property in favour of getting the resubmit stream name from each message's record's eventSourceARN
- Deleted some of its logic and moved remainder into
- Changes to
stream-processing
module:- Simplified stream processing configuration to enable full-customisation of settings and/or options and to synchronize with changes made to logging configuration and stage handling configuration
- Moved
kinesisOptions
and newdynamoDBDocClientOptions
understreamProcessingOptions
- Changed stream processing configuration to also configure its dependencies (i.e. logging & stage handling)
- Changed stream processing configuration to manage configuration and caching of a Kinesis instance and/or DynamoDB.DocumentClient instance
- Updated the README.md document to reflect the changes made to stream processing configuration
- Added configuration support for default DynamoDB stream event consumers
- Started implementing some of the default functions to be used by DynamoDB stream event consumers
- Renamed old
configureStreamProcessing
function toconfigureStreamProcessingWithSettings
- Renamed
configureStreamProcessingAndDependencies
function to newconfigureStreamProcessing
- Deleted
configureDependenciesIfNotConfigured
function - Deleted
configureDefaultStreamProcessingIfNotConfigured
function - Deleted
configureStreamProcessingIfNotConfigured
function - Added a
taskTrackingTableName
configuration setting andoption - Renamed
getResubmitIncompleteMessagesFunction
function togetHandleIncompleteMessagesFunction
- Added new
getDefaultDynamoDBStreamProcessingSettings
function - Added new
configureDefaultDynamoDBStreamProcessing
function - Added new
getLoadTaskTrackingStateFunction
andgetSaveTaskTrackingStateFunction
functions - Added initial, but incomplete skeleton implementations of default DynamoDB stream event processing functions
- Added
useStreamEventRecordAsMessage
function as a configurable extractMessageFromRecord implementation - Added a configurable loadTaskTrackingState function with a default
skipLoadTaskTrackingState
implementation for default Kinesis stream event processing - Added a configurable saveTaskTrackingState function with a default
skipSaveTaskTrackingState
implementation for default Kinesis stream event processing - Added
replayAllMessagesIfIncomplete
function as a configurable handleIncompleteMessages implementation - Changes and fixes to
discardUnusableRecordsToDRQ
anddiscardRejectedMessagesToDMQ
to also support DynamoDB stream event records
- Added
- Changes to
stream-consumer
module:- Added
isStreamConsumerConfigured
,configureStreamConsumer
&configureRegionStageAndAwsContext
functions from deletedstream-consumer-config.js
module - Added update of tasks' new last executed at property to
taskExecutePromiseFactory
function - Started implementing some of the functions needed by default DynamoDB stream event consumers
- Added
saveAllMessagesTaskTrackingState
function, which delegates to a configurable saveTaskTrackingState function - Renamed
resubmitAnyIncompleteMessages
function tohandleAnyIncompleteMessages
- Added
- Added
dynamodb-options.json
file, which contains default DynamoDB stream processing options - Renamed
config-kinesis.json
file tokinesis-options.json
- Updated
aws-core-utils
dependency to version 5.0.0 - Updated
core-functions
dependency to version 2.0.5 - Updated
logging-utils
dependency to version 3.0.0 - Updated
task-utils
dependency to version 3.0.2
1.0.0-beta.6
- JSDoc changes to
stream-consumer-config
module:- Added
Settings
typedef to clarify & simplify argument types - Added
Options
typedef to clarify & simplify argument types
- Added
- JSDoc changes to
stream-processing
module:- Added
OtherSettings
typedef to clarify &n simplify argument types - Added
OtherOptions
typedef to clarify & simplify argument types
- Added
1.0.0-beta.5
- Changes to
stream-consumer
module:- Changed
processStreamEvent
function to throw an error if the stream consumer is not configured on the context rather than set up incorrect default configuration - Removed unused
awsContext
argument fromprocessStreamEvent
function
- Changed
1.0.0-beta.4
- Changes to
stream-consumer-config
module:- Changed
configureStreamConsumer
function to accept newsettings
andoptions
arguments to enable complete configuration of the stream consumer via the arguments - Removed
configureLoggingIfNotConfigured
function, which was migrated tologging-utils/logging.js
- Removed
configureDefaultStreamProcessingIfNotConfigured
function, which was migrated toaws-core-utils/stages.js
- Removed
configureDefaultKinesisStreamProcessingIfNotConfigured
function, which was migrated tostream-processing.js
- Changed
- Changes to
stream-processing
module:- Removed module-scope default variables
- Added a typedef for
StreamProcessingOptions
to be used in JsDoc for parameters & return values - Added new
configureDependenciesIfNotConfigured
function to configure stream processing dependencies (i.e. logging, stage handling & kinesis for now) - Added new
configureStreamProcessingIfNotConfigured
function to replace thestream-consumer-config
module'sconfigureDefaultKinesisStreamProcessingIfNotConfigured
function and to first invoke the newconfigureDependenciesIfNotConfigured
function - Changed
configureStreamProcessing
function to acceptotherSettings
andotherOptions
as 3rd & 4th arguments to enable configuration of dependencies and to first invoke invoke newconfigureDependenciesIfNotConfigured
function - Changed
configureDefaultKinesisStreamProcessing
function to acceptoptions
,otherSettings
andotherOptions
as 2nd, 3rd & 4th arguments to enable customisation of default options and configuration of dependencies, and to always invokeconfigureStreamProcessing
- Changed
configureKinesisIfNotConfigured
to use local default options fromconfig-kinesis.json
if no kinesisOptions are provided and context.kinesis is not already configured - Changed
getDefaultKinesisStreamProcessingSettings
function to accept an explicit stream processingoptions
object of typeStreamProcessingOptions
as its sole argument instead of an arbitraryconfig
object to enable customization of default options - Added new
loadDefaultKinesisStreamProcessingOptions
function to load default stream processing options from the localconfig-kinesis.json
file - Changed
getDefaultKinesisStreamProcessingSettings
function to use newloadDefaultKinesisStreamProcessingOptions
function - Changed
getKinesis
function to useconfigureKinesisIfNotConfigured
instead of directly callingaws-core-utils/kinesis-utils#configureKinesis
to enable use of local default kinesis options
1.0.0-beta.3
- Changes to
stream-consumer
module:- Removed unused module-scope region constant.
- Changed validation of stream event records to do specific validation based on stream type.
- Changes to
stream-processing
module:- Renamed
configureDefaultStreamProcessing
function toconfigureDefaultKinesisStreamProcessing
. - Renamed
getDefaultStreamProcessingSettings
function togetDefaultKinesisStreamProcessingSettings
.
- Renamed
- Changes to
stream-consumer-config
module:- Renamed
configureDefaultStreamProcessingIfNotConfigured
function toconfigureDefaultKinesisStreamProcessingIfNotConfigured
.
- Renamed
- Removed unused
computeChecksums
setting fromconfig.json
. - Updated
aws-core-utils
dependency to version 2.1.4. - Updated
README.md
usage and limitations documentation.
1.0.0-beta.2
- Changes to
stream-processing
module:- Changed
discardRejectedMessagesToDMQ
function to wrap the original message in a rejected message "envelope" with metadata
- Changed
1.0.0-beta.1
- First beta release - unit tested, but not battle tested