memphis-functions-beta
v1.0.4
Published
Utility for writing Memphis functions. Memphis is a powerful messaging platform for modern developers
Downloads
3
Readme
Cloud - Docs - X - YouTube
Memphis.dev is more than a broker. It's a new streaming stack. Memphis.dev is a highly scalable event streaming and processing engine.
About
Before Memphis came along, handling ingestion and processing of events on a large scale took months to adopt and was a capability reserved for the top 20% of mega-companies. Now, Memphis opens the door for the other 80% to unleash their event and data streaming superpowers quickly, easily, and with great cost-effectiveness.
This repository is responsible for the Memphis Functions Javascript SDK
Installation
$ npm install memphis-functions
Importing
For Javascript, you can choose to use the import or required keyword. This library exports a singleton instance of memphis
with which you can consume and produce messages.
const { memphis } = require('memphis-functions');
Creating a Memphis function
Memphis provides a createFunction
utility for more easily creating Memphis Functions.
The user-created eventHandler
will be called for every message in the given batch of events. The user's eventHandler
will take in a payload
as a Uint8Array, headers
as an object, and inputs
as an object, and should return an object with keys { processedMessage, processedHeaders }
. The returned processedMessage
should be a Uint8Array, and processedHeaders
should be an object.
The user function should throw an exception if the message processing has failed. If any exception is thrown (deliberately or by a failed operation), the message will be sent to the dead letter station.
If the returned processedMessage
and processedHeaders
are null
, then the message will be skipped and will not be sent to the station or dead letter station.
Make sure to encode the
processedMessage
Uint8Array object with utf-8 encoding!
This example function takes the Uint8Array object payload
and decodes it from base64 encoding so that it may be processed.
const { memphis } = require("memphis-functions");
exports.handler = async (event) => {
return await memphis.createFunction(event, eventHandler);
};
function eventHandler(payload, headers, inputs) {
const decodedPayload = payload.toString('utf-8');
const asJson = JSON.parse(decodedPayload);
asJson.modified = true;
return {
processedMessage: Buffer.from(JSON.stringify(asJson), 'utf-8'),
processedHeaders: headers
};
}
A user created eventHandler
may also be async:
const { memphis } = require("memphis-functions");
exports.handler = async (event) => {
return await memphis.createFunction(event, eventHandler);
};
async function eventHandler(payload, headers, inputs) {
const decodedPayload = payload.toString('utf-8');
const asJson = JSON.parse(decodedPayload);
asJson.modified = true;
return {
processedMessage: Buffer.from(JSON.stringify(asJson), 'utf-8'),
processedHeaders: headers
};
}
The asJson
can be used to get a json payload instead of a Uint8Array
:
const { memphis } = require("memphis-functions");
exports.handler = async (event) => {
return await memphis.createFunction(event, eventHandler, asJson = true);
};
async function eventHandler(payload, headers, inputs) {
payload["modified"] = true;
return {
processedMessage: payload,
processedHeaders: headers
};
}
If the user wants to have a message that they would like to validate and send to the dead letter station if the validation fails, then the user can throw an exception. In the following example, the field check
is a boolean. The following function will send any messages that fail the check
to the dead letter station.
const { memphis } = require("memphis-functions");
exports.handler = async (event) => {
return await memphis.createFunction(event, eventHandler);
};
async function eventHandler(payload, headers, inputs) {
const decodedPayload = payload.toString('utf-8');
const asJson = JSON.parse(decodedPayload);
if (!asJson.check) {
throw new Error("Validation Failed!");
}
return {
processedMessage: Buffer.from(JSON.stringify(asJson), 'utf-8'),
processedHeaders: headers
};
}
If a user would rather just skip the message and not have it be sent to the station or dead letter station, the user could instead return { processedMessage: null, processedHeaders: null }
.
const { memphis } = require("memphis-functions");
exports.handler = async (event) => {
return await memphis.createFunction(event, eventHandler);
};
function eventHandler(payload, headers, inputs) {
const decodedPayload = payload.toString('utf-8');
const asJson = JSON.parse(decodedPayload);
if (!asJson.check) {
return { processedMessage: null, processedHeaders: null };
}
return {
processedMessage: Buffer.from(JSON.stringify(asJson), 'utf-8'),
processedHeaders: headers
};
}
LLastly, if the user is using another data format like Protocol Buffers, the user may simply decode the payload
into that format instead of JSON. The following example will be using the protobufjs package. Assuming we have a .proto definition like this:
syntax = "proto3";
package protobuf_example;
message Message{
string data_field = 1;
}
We can decode this and get the data_field out like this:
const { memphis } = require("memphis-functions");
exports.handler = async (event) => {
return await memphis.createFunction(event, eventHandler);
};
function eventHandler(payload, headers, inputs) {
const root = await protobuf.load("./message.proto");
const Message = root.lookupType('protobuf_example.Message');
const my_message = Message.decode(payload)
// data_field gets translated to dataField by the library...
// Arbitrarily changing the data field
my_message.dataField = "My new data"
// .finish() returns a Uint8Array so it may just be returned as the processedMessage
return {
"processedMessage" : Message.encode(my_message).finish(),
"processedHeaders" : headers
}
}