kinesiserapy
v0.0.4
Published
AWS Kinesis stream consumer and producer
Downloads
3
Readme
Kinesiserapy: a AWS Kinesis event stream interface
This module provides an AWS Kinesis event stream consumer and emitter implementations.
Configuration
AWS credentials and a stream name must be provided to connect to Kinesis. The credentials are passed in an object that must have the following keys
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_REGION
The stream name is simply a string.
Emit an event
var kinesiserapy = require('kinesiserapy');
var kinesisAuth = {
AWS_ACCESS_KEY_ID: "the key id",
AWS_SECRET_ACCESS_KEY: "the access key",
AWS_REGION: "the region"
};
var stream = "my-stream";
var emitter = new kinesiserapy.KEmitter(stream,kinesisAuth);
emitter.emit(
{
"key": "value"
},
function(err) {
if (err) {
console.error(err);
} else {
console.log('event emitted');
}
});
Consuming from a stream
The easiest way to consumer from a stream is to list the shards and spawn a new process to consume from each shard.
var cluster = require('cluster');
var kinesiserapy = require('kinesiserapy');
var kinesisAuth = {
AWS_ACCESS_KEY_ID: "the key id",
AWS_SECRET_ACCESS_KEY: "the access key",
AWS_REGION: "the region"
};
var stream = "my-stream";
if (cluster.isMaster) {
var kinfo = new kinesiserapy.KInfo(stream, kinesisAuth);
kinfo.listShards(function(err, shardIds) {
if (err) {
console.error(err);
} else {
shardIds.forEach(function (shardId, index, shardIds) {
cluster.fork({stream: stream, shardId: shardId});
});
}
});
} else if (cluster.isWorker) {
var kConsumer = new kinesiserapy.KConsumer(
process.env.stream,
process.env.shardId,
lambda,
function(err) {
console.error(err)
},
kinesisAuth);
kConsumer.consume();
}
function lambda(data) {
console.log(data);
}
API
kinesiserapy exposes three objects:
- KInfo to obtain information about the stream
- listShards(callback) where callback is a fn(err, shardIds). The callback function is called with err null and shardIds being an array of string. If an error occurred, err is not null.
- KConsumer to consume from a shard
- consume() starts an asynchronous consumer that loops
- KEmitter to emit an event
- emit(obj, cb) emits the object obj and calls the callback cb after the event is emitted.
The callback has the signature cb(err, data).- err is null unless an error occurred.
- data contains two keys:
- SequenceNumber: the sequence number of the emitted event
- ShardId: the shard on which the event was emitted
- emit(obj, cb) emits the object obj and calls the callback cb after the event is emitted.