@flowcore/sdk-transformer-client
v1.0.1
Published
An SDK that can handle the transport logic of chainable flowcore transformers
Downloads
3
Readme
sdk-transformer-client
An SDK that can handle the transport logic of chainable flowcore transformers
Installation
install with npm:
npm install @flowcore/sdk-transformer-client
or yarn:
yarn add @flowcore/sdk-transformer-client
Usage
Express Middleware
This SDK comes with a middleware that you can use to handle the transport logic of chainable flowcore transformers.
first install express:
npm install express body-parser
yarn add express body-parser
then you can use the middleware like this:
import express from 'express';
import bodyParser from 'body-parser';
import {
transformerMiddleware,
transformerHandlerFactory
} from '@flowcore/sdk-transformer-client';
import z from 'zod'; // optional
const transformerId = 'transformer-id';
const app = express();
app.use(bodyParser.json());
app.use(
transformerMiddleware("scenario name", transformerId, "strand name", {
transformerCache: new InMemoryCache(), // or any other that implements SimpleCache interface
transferHeaders: (req: Request) => { // optional if you want to transfer headers to the next transformers
return {
'x-custom-header': req.headers['x-custom-header'],
};
};
}),
);
app.post(
"/transform",
transformerHandlerFactory(transformerId, async (req) => {
console.log(`Transforming data for ${transformerId}`, req.body);
const event = EventData<{
hello: string;
}>(
req.body.event,
z.object({ hello: z.string() }) // optional zod schema
);
return {
something: "else",
};
}),
);
app.listen(3000, () => {
console.log('Server is running on port 3000');
});
Note: the default behaviour of the middleware is to send and forget the result to the next transformers, if you want to wait for the next transformers to finish before sending a response to the requester, you can use the
waitForNext
option:
app.post(
"/transform",
transformerHandlerFactory(transformerId, async (req) => {
// logic
}),{
waitForNext: true,
},
);
Custom Implementation
You can also use the SDK to create your own custom implementation:
import {
Transformer,
transformerDataDto,
} from '@flowcore/sdk-transformer-client';
import {TRANSFORMER_VERSION_HEADER} from "./constants";
async function handleRequest(req, res) {
// validate that the request has the correct information
const validRequest = transformerDataDto.parse(req.body);
// initialize the transformer
const transformer = new Transformer(
"scenario name",
"strand name",
"transformer-id",
validRequest.instructions,
new InMemoryCache(), // or any other that implements SimpleCache interface
);
// set transformer version for response header
res.setHeader(TRANSFORMER_VERSION_HEADER, req.transformer.getVersion());
try {
// register the event id for dependency tracking
if (validRequest.sender) {
await transformer.registerDependentCall(
validRequest.sender,
validRequest.event.eventId,
validRequest.result,
);
}
// check if this transformers dependencies are met
const combinedResult = await transformer.dependenciesMet(
validRequest.event.eventId,
);
if (combinedResult === false) {
res.send({
message: "Dependencies not met, waiting for next event",
success: true,
});
return;
}
// process the logic
const result = await processLogic(req, res);
// get the urls of the next transformers in the chain
const getNextUrls = transformer.getNextTransformerUrls();
// send the result to the next transformers
getNextUrls.map(async (url) => {
await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
sender: transformerId,
instructions: transformer?.getRawInstructions(),
result: {
...validRequest.result,
...combinedResult,
...result,
},
event: parsedBody.event,
}),
});
});
// send an OK response to requester
res.send({
data: result,
success: true,
});
} catch (e) {
if (transformer.hasErrorTransformer()) {
if (error instanceof Error) {
await fetch(`${transformer.getErrorTransformerUrl()}/failed`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
...req.body,
error: error.message,
}),
});
res.send({
message: error.message,
success: false,
});
return;
}
}
console.error(`Error in transformerHandler: ${error}`);
throw error;
}
};
Utilities
The SDK also comes with some utilities that you can use when working with SourceEvents:
import { EventData } from '@flowcore/sdk-transformer-client';
// get the parsed and validated event payload
const event = EventData<{
hello: string;
}>(req.body.event, z.object({ hello: z.string() }));
Cache
The SDK uses an interface for the cache, you can implement your own cache by implementing the SimpleCache interface:
import { SimpleCache } from '@flowcore/sdk-transformer-client';
class InMemoryCache implements SimpleCache {
private cache: Record<string, any> = {};
async get(key: string): Promise<any> {
return this.cache[key];
}
async set(key: string, value: any): Promise<void> {
this.cache[key] = value;
}
async delete(key: string): Promise<void> {
delete this.cache[key];
}
}
Note: This interface supports the ioredis client out of the box, you can use it by passing the client to the transformer constructor.
Development
yarn install
or with npm:
npm install