@castore/dam
v1.25.3
Published
Data maintenance & migration tooling for the Castore library.
Downloads
14
Readme
Dam
Data maintenance & migration tooling for the Castore library.
📥 Installation
# npm
npm install --save-dev @castore/dam
# yarn
yarn add --dev @castore/dam
This package has @castore/core
as peer dependency, so you will have to install it as well:
# npm
npm install @castore/core
# yarn
yarn add @castore/core
👩💻 Usage
@castore/dam
exposes a series of utils that scan past events and re-publish them in message channels – or "pour them" as in "pouring water from a container to another" 🫗.
Those utils are typically very useful for data maintenance and migration. They publish messages with the replay
option enabled and can be rate limited to limit impact on production traffic. They are the following:
pourEventStoreAggregateIds
: Pour all the aggregate ids of an event store in anAggregateExistsMessageChannel
.pourAggregateEvents
: Pour all the events of a specific aggregate in a providedNotificationMessageChannel
.pourEventStoreEvents
: Pour all the events of an event store in a providedNotificationMessageChannel
.pourEventStoreCollectionEvents
: Pour all the events of a collection of event stores in a providedNotificationMessageChannel
.
pourEventStoreAggregateIds
Pour all the aggregate ids of an event store in a provided AggregateExistsMessageChannel
. Aggregate ids are published in the order in which they are retrieved (by default, ordered by their initial timestamps).
import { pourEventStoreAggregateIds } from '@castore/dam';
// 👇 ...or AggregateExistsMessageBus
const maintenanceMessageQueue = new AggregateExistsMessageQueue({
sourceEventStores: [pokemonEventStore],
...
});
const results = await pourEventStoreAggregateIds({
eventStore: pokemonEventStore,
messageChannel: maintenanceMessageQueue,
// 👇 Optional `listAggregateIds` options (except "pageToken")
options: {
limit: 100,
initialEventAfter: '2020-01-01T00:00:00.000Z',
initialEventBefore: '2023-01-01T00:00:00.000Z',
reverse: false,
},
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});
const {
// 👇 Count of poured aggregate ids
pouredAggregateIdCount,
// 👇 Infos about first/last scanned aggregates (potentially undefined)
firstScannedAggregate,
lastScannedAggregate,
} = results;
pourAggregateEvents
Pour all the events of a specific aggregate in a provided NotificationMessageChannel
. Events are published in the order in which they are retrieved (by default, ordered by their timestamps).
import { pourAggregateEvents } from '@castore/dam';
// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
sourceEventStores: [pokemonEventStore],
...
});
const results = await pourAggregateEvents({
eventStore: pokemonEventStore,
messageChannel: maintenanceMessageQueue,
aggregateId: 'pikachu1',
// 👇 Optional `getEvents` options
options: {
minVersion: 1,
maxVersion: 10,
limit: 5,
reverse: false,
},
// 👇 Optional `timestamp` filters
filters: {
from: '2020-01-01T00:00:00.000Z',
to: '2023-01-01T00:00:00.000Z',
},
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});
const {
// 👇 Count of poured events
pouredEventCount,
// 👇 Infos about first/last scanned events (potentially undefined)
firstPouredEvent,
lastPouredEvent,
} = results;
pourEventStoreEvents
Pour all the events of an event store in a provided NotificationMessageChannel
. Events are published in the order of their timestamps (independently of their aggregate).
import { pourEventStoreEvents } from '@castore/dam';
// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
sourceEventStores: [pokemonEventStore],
...
});
const results = await pourEventStoreEvents({
eventStore: pokemonEventStore,
messageChannel: maintenanceMessageQueue,
// 👇 Optional `timestamp` filters
filters: {
from: '2020-01-01T00:00:00.000Z',
to: '2023-01-01T00:00:00.000Z',
},
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});
const {
// 👇 Count of poured events
pouredEventCount,
// 👇 Infos about first/last scanned aggregates (potentially undefined)
firstScannedAggregate,
lastScannedAggregate,
} = results;
pourEventStoreCollectionEvents
Pour all the events of a collection of event stores in a provided NotificationMessageChannel
. Events are published in the order of their timestamps (independently of their aggregate and event store).
import { pourEventStoreEvents } from '@castore/dam';
// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
sourceEventStores: [pokemonEventStore, trainerEventStore],
// ...
});
const results = await pourEventStoreCollectionEvents({
eventStores: [pokemonEventStore, trainerEventStore],
messageChannel: maintenanceMessageQueue,
// 👇 Optional `timestamp` filters
filters: {
from: '2020-01-01T00:00:00.000Z',
to: '2023-01-01T00:00:00.000Z',
},
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});
const {
// 👇 Count of poured events
pouredEventCount,
// 👇 Infos about first/last scanned aggregates (potentially undefined)
scans: {
// 👇 By event store id
POKEMONS: { firstScannedAggregate, lastScannedAggregate },
TRAINERS: { firstScannedAggregate, lastScannedAggregate },
},
} = results;