@flowcore/sdk-data-pump-client
v1.8.1
Published
A lightweight client for fetching events from the Flowcore platform
Downloads
488
Readme
Flowcore SDK Module - Data Pump Client
A Flowcore SDK module that provides a lightweight client for fetching events from the Flowcore platform
Installation
install with npm:
npm install @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-client
or yarn:
yarn add @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-client
Usage
Create a new instance of the Data Pump client:
import {DataPump} from '@flowcore/sdk-data-pump-client';
import {OidcClient} from "@flowcore/sdk-oidc-client";
const client = new OidcClient("your client id", "your client secret", "well known endpoint");
const dataPump = new DataPump("https://graph.api.flowcore.io/graphql", client);
You can configure the page size in the last argument of the constructor, the default is
1000
.
Then create a RXJS observable to listen to the events:
import {Subject} from 'rxjs';
import {SourceEvent} from "@flowcore/sdk-data-pump-client";
const subject = new Subject<SourceEvent>();
subject.subscribe({
next: (event) => {
console.log(event);
},
complete: () => {
console.log("completed");
},
});
Then you can fetch all events with the fetchAllEvents
method:
await dataPump.fetchAllEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
});
This will loop through all the events for the specified event types and push them to the observable.
You can specify how many time buckets should be run in parallel with the
parallel
argument, the default is1
.
To fetch events for a specific time bucket you can use the fetchEvents
method:
await dataPump.fetchEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
});
This will fetch all events for the specified time bucket and push them to the observable.
To close the observable set the last argument of the
fetchEvents
method totrue
.
Limits
You can specify the from and to event id to fetch events between a specific range:
await dataPump.fetchEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
afterEventId: "your from event id",
beforeEventId: "your to event id",
});
These are both exclusive, meaning that the events with the specified id's will not be included in the result. Either and both can be omitted.
Indexes
You can fetch time buckets for a specific event type with the fetchIndexes
method:
await dataPump.fetchIndexes({
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventType: "your event type",
});
This will return a list of time buckets for the specified event type.
You can also specify the from and to time bucket with the
from
andto
arguments. and it will return the time buckets between the specified range.
Pumping Events
You can also pump events to a destination with the pumpEvents
method:
const abortController = new AbortController();
await dataPump.pumpEvents(
"cache-key",
"observable",
{
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
},
abortController
);
this will pump all events using backfilling, then switch to live mode and pump all new events. The cache-key
is used to store the last event id in the cache, so that the client can resume from the last event id if it is restarted. The abort controller can be used to stop the pumping.
Note: the default cache is in memory, you can implement your own cache by extending the
SimpleCache
class and passing it to theDataPump
constructor. via the options object.
Note: You can also specify from time bucket with the
from
argument and control the backfilling parallelism with theparallel
argument.
Note: When not passing the abort controller, the data pump will run once until it has fetched all events currently present in the data container.
Reseting the data pump
You can reset the data pump with the reset
method:
await dataPump.reset("cache-key");
Note: it is only possible to reset the data pump if it is not currently running. You can check if the data pump is running with the
isRunning
method. To stop the data pump you can use theabort
controller.
Creating your own pump
You can create your own pump by manually calling the pumpPage
method:
let cursor: string | undefined = undefined;
do {
const result = await dataPump.pumpPage({
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
afterEventId: "your from event id" | undefined,
beforeEventId: "your to event id" | undefined,
}, cursor);
for (const event of result.events) {
console.log(event);
}
cursor = result.cursor;
} while(cursor);
This will allow you to control the flow of events and prevent the dump from pumping too many events at once.
Development
yarn install
or with npm:
npm install