npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

aws-kinesis-agg-temp

v4.0.3

Published

Node.js module to simplify working with Amazon Kinesis Records using Protcol Buffers encoding

Downloads

11

Readme

Node.js Kinesis Aggregation & Deaggregation Modules

These Kinesis Aggregation and Deaggregation modules provide a simple interface for creating and working with data using the Kinesis Aggregated Record Format from any type of application. If you are generating Kinesis records using the Kinesis Producer Library (KPL), you can easily deaggregate and process that data from node.js. Alternatively, if you want to create Kinesis records that are tightly packed to reach the maximum record size, then this is straightforward to achieve. Using record aggregation improves throughput and reduces costs when writing applications that publish data to and consume data from Amazon Kinesis.

Installation

The Node.js record aggregation/deaggregation modules are available on NPM as aws-kinesis-agg. To get started, include the aws-kinesis-agg module from npm into your new or existing NodeJS application:

var agg = require('aws-kinesis-agg');

Record Deaggregation

When using deaggregation, you provide a single aggregated Kinesis Record and get back multiple Kinesis User Records. If a Kinesis record that is provided is not using the Kinesis Aggregated Record Format, that's perfectly fine - you'll just get a single record output from the single record input. A Kinesis User Record that is returned from the deaggregation module looks like:

{
	partitionKey : String - The Partition Key provided when the record was submitted
	explicitPartitionKey : String - The Partition Key provided for the purposes of Shard Allocation
	sequenceNumber : BigInt - The sequence number assigned to the record on submission to Kinesis
	subSequenceNumber : Int - The sub-sequence number for the User Record in the aggregated record, if aggregation was in use by the producer
	data : Buffer - The original data transmitted by the producer (base64 encoded)
}

When you receive a Kinesis Record in your consumer application, you will extract the User Records using deaggregation methods in the aws-kinesis-agg module. The aws-kinesis-agg module provides both syncronous and asyncronous methods of deaggregating records.

Synchronous

The syncronous model of deaggregation extracts all the Kinesis User Records from the received Kinesis Record, and accumulates them into an array. The method then makes a callback with any errors encountered, and the array of User Records that were deaggregated:

deaggregateSync(kinesisRecord, computeChecksums, afterRecordCallback(err, UserRecord[]);

Asyncronous

The asyncronous model of deaggregation allows you to provide a callback which is invoked for each User Record that is extracted from the Kinesis Record. When all User Records have been extracted from the Kinesis Record, an afterRecordCallback is invoked which allows you to continue processing additional Kinesis Records that your consumer receives:

deaggregate(kinesisRecord, computeChecksums, perRecordCallback(err, UserRecord), afterRecordCallback(err, errorKinesisRecord));

If any errors are encountered during processing of the perRecordCallback, then the afterRecordCallback is called with the err plus an error Record which contains the failed subSequenceNumber from the aggregated data with details about the enclosing Kinesis Record:

{
	partitionKey : String - The Partition Key of the enclosing Kinesis Aggregated Record
	explicitPartitionKey : String - The Partition Key provided for the purposes of Shard Allocation
	sequenceNumber : BigInt - The sequence number assigned to the record on submission of the Aggregated Record by the encoder
	subSequenceNumber : Int - The sub-sequence number for the failing User Record in the aggregated record, if aggregation was in use by the producer
	data : Buffer - The original protobuf message transmitted by the producer (base64 encoded)
}

The computeChecksums parameter accepts a Boolean that indicates whether the checksum in the kinesis record should be validated. If the checksum is incorrect, an error will be returned via the afterRecordCallback.

Record Aggregation

Applications implemented in AWS Lambda often emit new events based on the events that were received in the function. They may be doing enrichment, parsing, or filtering, and the ability to take advantage of Kinesis record-based aggregation will result in more efficient systems.

To use Aggregation, you simply use aggregate function.

aggregate(records, encodedRecordHandler, afterPutAggregatedRecords, errorCallback)

With:

  • encodedRecordHandler: a callback function (record, (err, data) => ...) invoked when the number of records supplied exceeds the Kinesis maximum record size (1MB as of March 2016). record has field {data, partitionKey, explicitHashKey}
  • afterPutAggregatedRecords: a callback invoked when all call of encodedRecordHandler are finished.
  • errorCallback: a function (err, data) called each time that an error occurs

The following example demonstrates an AWS Lambda function that was does per-record processing and then transmits those record to Amazon Kinesis (pseudo code):

// create a record aggregator
const aggregate = require('aws-kinesis-agg').aggregate;

// create the function which sends data to Kinesis with a random partition key
var onReady = function(encodedRecord, callback) {
	// build putRecords params
	const params = {
		Data: encodedRecord.data,
		PartitionKey: encodedRecord.partitionKey,
		StreamName: 'my-stream'
	}
	if (encodedRecord.explicitHashKey) {
		params.ExplicitHashKey = encodedRecord.explicitHashKey
	}
	// send to kinesis
	// kinesisClient.putRecord(param , ...)
  	myKinesisConnection.putRecord(params, callback)
};

aggregator.aggregate(event.Records, onReady, () => { 
	// lambda is done close context
	...
}, (err, data) => {
    console.log(`${err}, ${data}`) 
})

Examples

This module includes an example AWS Lambda function in the example folder, which gives you easy ability to build new functions to process Kinesis aggregated message data. Both examples use async.js to process the received Kinesis Records.


Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at

    http://aws.amazon.com/asl/

or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.