npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

xor-ack-dynamodb

v0.3.1

Published

XOR-based ack module for tracking distributed message processing using AWS DynamoDB

Downloads

10

Readme

xor-ack-dynamodb

Stability: 1 - Experimental

NPM version

Contributors

@tristanls

Contents

Overview

xor-ack-dynamodb is a tracker mechanism inspired by XOR tracking in Storm. It can track multitudes of messages/events in ack chains and report whether or not all of them have been processed.

How it works

xor-ack-dynamodb uses properties of bitwise XOR operation to implement tracking functionality. Here is a quick overview of the relevant aspects of XOR. The XOR (^) operation has the following properties:

A ^ A = 0

and

A ^ B ^ C = B ^ A ^ C, A ^ D ^ B ^ A ^ B ^ D = 0.

Ack chain

Let's say that you want to do a word count, and you have a database containing text files that contain words.

database -> files -> words

Our approach will be event driven, so we will iterate through the database and emit a file event for each file we encounter.

          +-> 'file'
         /
database ---> 'file'
         \
          +-> 'file'

Another part of our computation will accept those file events and emit word events for each word encountered.

          +-> 'word'
         /
... file ---> 'word'
         \
          +-> 'word'

How do we track that a particular file has been fully processed when assuming that at each point the processing of any one of the words in that file could fail?

For every file, you can create a unique tag and a random xorStamp. We will then initialize an ack chain. Below example uses specifically crafted buffers for illustration purposes. In real use, you want to use a random Buffer, perhaps generated via:

const xorStamp = crypto.randomBytes(64);

In our example, we generate tag and xorStamp.

const AWS = require("aws-sdk");
const XorAckDynamoDB = require("xor-ack-dynamodb");

const dynamoDB = new AWS.DynamoDB.DocumentClient(
    {
        region: "us-east-1"
    }
);

const ack = new XorAckDynamoDB(
    {
        dynamoDB,
        partitionKey: "tagID",
        table: "my-table-name"
    }
);

const tag = "database/file13";
const fileStamp = Buffer.from("29", "hex"); // illustration only (use random Buffer)

await ack.create(tag, fileStamp);

Next, each file is broken up into words. Here comes the tricky part. We are going to do a lot of things at once.

First, we already registered the start of file processing via ack.create(...), now, we will acknowledge finishing the processing of that file. To acknowledge, we will send the fileStamp again (remember A ^ A = 0).

Second, at the same time, we will acknowledge starting the processing of each word. Let's say we have word1, word2, word3. We will generate a stamp for each word, so word1Stamp, word2Stamp and word3Stamp. To acknowledge the starting of the processing we will send those word stamps to the acker.

Now, remember that A ^ A ^ B ^ C ^ D = 0 ^ B ^ C ^ D = B ^ C ^ D. More precisely:

const fileStamp = Buffer.from("29", "hex"); // we mark completing file
// 00101001
const word1Stamp = Buffer.from("25", "hex"); // we mark start of computing word1
// 00100101
const word2Stamp = Buffer.from("a9", "hex"); // we mark start of computing word2
// 10101001
const word3Stamp = Buffer.from("e9", "hex"); // we mark start of computing word3
// 11101001

const xorOfAll = Buffer.from("4c", "hex"); // // fileStamp XOR word1Stamp XOR word2Stamp XOR word3Stamp
// 01001100

await ack.stamp(tag, xorOfAll); // we stamp with just one stamp for all ops above

At this point, what happened inside of ack.stamp(...) is the XOR of previous state with the newly stamped one.

const previousStamp = Buffer.from("29", "hex"); // original fileStamp
// 00101001
const inboundStamp = Buffer.from("4c", "hex"); // xorOfAll from above
// 01001100

let currentState = Buffer.from("65", "hex"); // previousStamp XOR inboundStamp
// 01100101

So, we've managed to acknowledge multiple operations all at once, and we are still storing only the currentState.

Next, notice what happens as we successfully process each word.

let currentState = Buffer.from("65", "hex"); // currentState from above
// 01100101

const word1Stamp = Buffer.from("25", "hex"); // we mark finishing of word1
// 00100101
await ack.stamp(tag, word1Stamp);

currentState = Buffer.from("40", "hex"); // currentState XOR word1Stamp
// 01000000

const word2Stamp = Buffer.from("a9", "hex"); // we mark finishing of word2
// 10101001
await ack.stamp(tag, word2Stamp);

currentState = Buffer.from("e9", "hex"); // currentState XOR word2Stamp
// 11101001

const word3Stamp = Buffer.from("e9", "hex"); // we mark finishing of word3
// 11101001
await ack.stamp(tag, word3Stamp);

currentState = Buffer.from("00", "hex"); // currentState XOR word3Stamp
// 00000000

That's it. The XOR math works out really well for tracking these types of computation where one event generates multiple child events. This can keep going further down the chain as long as we acknowledge completing our parent processing together with initiation of any child processing. Despite all that activity, the amount of information we store is always one state per entire ack chain.

The above example used specifically crafted buffers for illustrative purposes. The real implementation uses random buffers. Additionally, the stamps need to be sufficiently large and random to prevent erronous acked events. Storm implementation found a 64bit random integer to be sufficient in practice.

Installation

npm install xor-ack-dynamodb

Tests

npm test

Usage

const assert = require("assert").strict;
const AWS = require("aws-sdk");
const crypto = require("crypto");
const XorAckDynamoDB = require("xor-ack-dynamodb");

const dynamoDB = new AWS.DynamoDB.DocumentClient(
    {
        region: "us-east-1"
    }
);

const ack = new XorAckDynamoDB(
    {
        dynamoDB,
        partitionKey: "tagID",
        table: "my-table-name"
    }
);

const tag = "database/file13";
const fileStamp = crypto.randomBytes(64);

let stamp = await ack.create(tag, fileStamp);
assert.ok(!stamp.acked);

const word1Stamp = crypto.randomBytes(64);
const word2Stamp = crypto.randomBytes(64);
const word3Stamp = crypto.randomBytes(64);

let newStamp = XorAckDynamoDB.xor(fileStamp, word1Stamp, word2Stamp, word3Stamp);
stamp = await ack.stamp(tag, newStamp);
assert.ok(!stamp.acked);

newStamp = XorAckDynamoDB.xor(word1Stamp, word2Stamp, word3Stamp);
stamp = await ack.stamp(tag, newStamp);
assert.ok(stamp.acked);

Errors

const XorAckDynamoDB = require("xor-ack-dynamodb");
const errors = require("xor-ack-dynamodb/errors");

try
{
    await XorAckDynamoDB.xor();
}
catch (error)
{
    if (error instanceof errors.LessThanTwoBuffers)
    {
        console.log(error.message);
    }
    else
    {
        throw error;
    }
}

Documentation

XorAckDynamoDB

Public API

XorAckDynamoDB.xor(...stamps)

Performs binary XOR operation across all stamps. Throws if buffer lengths are unequal or if less than two buffers are specified.

new XorAckDynamoDB(config)

  • config: Object Configuration.
    • dynamoDB: AWS.DynamoDB.DocumentClient Instance of AWS DynamoDB DocumentClient.
    • partitionKey: String Name of table partition key to use.
    • table: String Name of table to use to store ack chains.

Creates a new XorAckDynamoDB instance.

xorAckDynamoDB.create(tag, stamp)

  • tag: String A unique identifier to track this ack chain.
  • stamp: Buffer Initial buffer stamp to create ack chain with.
  • Return: Buffer stamp.
    • acked: Boolean false.
  • Throws:

Creates a new ack chain for specified tag.

xorAckDynamoDB.delete(tag)

  • tag: String Identifier of ack chain delete.
  • Throws:

Deletes ack chain specified by tag.

xorAckDynamoDB.stamp(tag, stamp)

  • tag: String Identifier to update.
  • stamp: Buffer Stamp to XOR with existing stamp.
  • Return: Buffer XOR result of stamp and existing stamp.
    • acked: Boolean Boolean true if XOR result is all zeros, false otherwise.
  • Throws:

Updates ack chain for specified tag by XOR'ing existing stamp with provided stamp.

Errors

BufferLengthsUnequal

Attempted to XOR buffers of different lengths. This is probably a programming bug.

LessThanTwoBuffers

Attempted to XOR less than two buffers. This is probably a programming bug.

StaleLocalData

When attempting stamp(), it means that the stamp stored in the datastore changed since it was retrieved by the module to perform the stamp() operation. This is a transient condition and part of normal operation. Retries should be attempted until success.

TagExists

When attempting create(), it means that the specified tag already exists. This is probably a programming bug.

TagNotFound

When attempting delete(), it means that the specified tag does not exist in the datastore.

ZeroBufferNoOp

Attempted to create or stamp an existing tag with a buffer that contains all zeros. This is a no-op and probably a programming bug.

Releases

Current releases.

Policy

We follow the semantic versioning policy (semver.org) with a caveat:

Given a version number MAJOR.MINOR.PATCH, increment the:

MAJOR version when you make incompatible API changes, MINOR version when you add functionality in a backwards-compatible manner, and PATCH version when you make backwards-compatible bug fixes.

caveat: Major version zero is a special case indicating development version that may make incompatible API changes without incrementing MAJOR version.

References