@avanzu/eventstore
v2.1.4
Published
An eventsourcing backend supporting multiple storage engines.
Downloads
73
Maintainers
Readme
Introduction
This is a heavily modified version of the original node-eventstore by Adriano Raiano.
The project goal is to provide an eventstore implementation for node.js:
- load and store events via EventStream object
- event dispatching to your publisher (optional)
- supported Dbs (inmemory, mongodb, ~~redis, tingodb, elasticsearch, azuretable, dynamodb~~)
- snapshot support
- query your events
Upgrade instructions
Installation
npm install @avanzu/eventstore
Usage
Require the module and init the eventstore:
var eventstore = require('@avanzu/eventstore')
var es = eventstore()
By default the eventstore will use an inmemory Storage.
Logging
For logging and debugging you can use debug by TJ Holowaychuk
simply run your process with
DEBUG=@avanzu/eventstore/* node app.js
Provide implementation for storage
example with mongodb:
var es = require('@avanzu/eventstore')({
type: 'mongodb',
host: 'localhost', // optional
port: 27017, // optional
dbName: 'eventstore', // optional
eventsCollectionName: 'events', // optional
snapshotsCollectionName: 'snapshots', // optional
transactionsCollectionName: 'transactions', // optional
timeout: 10000, // optional
// emitStoreEvents: true // optional, by default no store events are emitted
// maxSnapshotsCount: 3 // optional, defaultly will keep all snapshots
// authSource: 'authedicationDatabase' // optional
// username: 'technicalDbUser' // optional
// password: 'secret' // optional
// url: 'mongodb://user:pass@host:port/db?opts // optional
// positionsCollectionName: 'positions' // optional, defaultly wont keep position
})
catch connect and disconnect events
es.on('connect', function () {
console.log('storage connected')
})
es.on('disconnect', function () {
console.log('connection to storage is gone')
})
define event mappings [optional]
Define which values should be mapped/copied to the payload event.
es.defineEventMappings({
id: 'id',
commitId: 'commitId',
commitSequence: 'commitSequence',
commitStamp: 'commitStamp',
streamRevision: 'streamRevision',
})
initialize
await es.init()
working with the eventstore
get the eventhistory (of an aggregate)
const { events } = await es.getEventStream({ query: 'streamId' })
or
const { events } = await es.getEventStream({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
})
'streamId' and 'aggregateId' are the same... In ddd terms aggregate and context are just to be more precise in language. For example you can have a 'person' aggregate in the context 'human ressources' and a 'person' aggregate in the context of 'business contracts'... So you can have 2 complete different aggregate instances of 2 complete different aggregates (but perhaps with same name) in 2 complete different contexts
you can request an eventstream even by limit the query with a 'minimum revision number' and a 'maximum revision number'
const { events } = await es.getEventStream({
query:
'streamId' ||
{
/* query */
},
revMin: 5,
revMax: 8,
})
store a new event and commit it to store
const stream = await es.getEventStream({ query: 'streamId' })
stream.addEvent({ my: 'event' })
stream.addEvents([{ my: 'event2' }])
await stream.commit()
console.log(stream.eventsToDispatch)
if you defined an event publisher function the committed event will be dispatched to the provided publisher
if you just want to load the last event as stream you can call getLastEventAsStream instead of ´getEventStream´.
working with snapshotting
get snapshot and eventhistory from the snapshot point
const [snapshot, stream] = await es.getFromSnapshot({ query: 'streamId' })
or
const [snapshot, stream] = await es.getFromSnapshot({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
})
you can request a snapshot and an eventstream even by limit the query with a 'maximum revision number'
const [snapshot, stream] = es.getFromSnapshot({
query:
'streamId' ||
{
/* query */
},
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
create a snapshot point
const [snapshot, stream] = await es.getFromSnapshot('streamId')
const snap = snapshot.data
const history = stream.events
// create a new snapshot depending on your rules
if (history.length > myLimit) {
await es.createSnapshot({
streamId: 'streamId',
data: myAggregate.getSnap(),
revision: stream.lastRevision,
version: 1 // optional
});
// or
await es.createSnapshot({
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr' // optional
data: myAggregate.getSnap(),
revision: stream.lastRevision,
version: 1 // optional
});
}
// go on: store new event and commit it
// stream.addEvents...
You can automatically clean older snapshots by configuring the number of snapshots to keep with maxSnapshotsCount
in eventstore
options.
own event dispatching (no event publisher function defined)
const evts = await es.getUndispatchedEvents()
Deleting aggregates
currently supported by:
- mongodb
You can delete an aggregate including the event history, snapshots and transactions by calling deleteStream
.
const deletedStream = await es.deleteStream('myStreamId')
The return value is the EventStream
that has just been deleted.
This stream will contain an undispatched TombstoneEvent
ready to be processed.
The payload
attribute of that event contains the complete event history.
const [tombstoneEvent] = deletedStream.eventsToDispatch
query your events
for replaying your events or for rebuilding a viewmodel or just for fun...
skip, limit always optional
var skip = 0,
limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end
const events = await es.getEvents({ skip, limit })
// or
const events = await es.getEvents({ query: 'streamId', skip, limit })
// or
const events = await es.getEvents({
query: {
// free choice (all, only context, only aggregate, only aggregateId...)
context: 'hr',
aggregate: 'person',
aggregateId: 'uuid',
},
skip,
limit,
})
by revision
revMin, revMax always optional
const events = await es.getEventsByRevision({
query: 'streamId',
revMin: 5,
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
// or
const events = await es.getEventsByRevision({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
revMin: 5,
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
by commitStamp
skip, limit always optional
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
skip: 10,
limit: 100, // if you omit limit or you define it as -1 it will retrieve until the end
})
// or
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
limit: 50,
})
// or
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
})
streaming your events
Some databases support streaming your events, the api is similar to the query one
skip, limit always optional
var skip = 0,
limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end
var stream = es.streamEvents({ skip, limit })
// or
var stream = es.streamEvents({ query: 'streamId', skip, limit })
// or by commitstamp
var stream = es.streamEventsSince({ commitStamp: new Date(2015, 5, 23), skip, limit })
// or by revision
var stream = es.streamEventsByRevision({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person',
context: 'hr',
},
})
stream.on('data', function (e) {
doSomethingWithEvent(e)
})
stream.on('end', function () {
console.log('no more evets')
})
// or even better
stream.pipe(myWritableStream)
currently supported by:
- mongodb (driver version <= 4.0.0)
get the last event
for example to obtain the last revision nr
const event = await es.getLastEvent('streamId')
// or
const event = await es.getLastEvent({
// free choice (all, only context, only aggregate, only aggregateId...)
context: 'hr',
aggregate: 'person',
aggregateId: 'uuid',
})
obtain a new id
const id = await es.getNewId()
position of event in store
some db implementations support writing the position of the event in the whole store additional to the streamRevision.
currently those implementations support this:
- inmemory ( by setting ``trackPosition` option )
- mongodb ( by setting
positionsCollectionName
option)
special scaling handling with mongodb
Inserting multiple events (documents) in mongodb, is not atomic.
For the eventstore tries to repair itself when calling getEventsByRevision
.
But if you want you can trigger this from outside:
const [firstTransaction] = await es.store.getPendingTransactions()
const lastEvent = await es.store.getLastEvent({
aggregateId: firstTransaction.aggregateId,
aggregate: firstTransaction.aggregate, // optional
context: firstTransaction.context, // optional
})
await es.store.repairFailedTransaction(lastEvent)
Upgrade instructions
From 1.x.x to 2.x.x
Starting from version 2.0.0 the eventstore does not longer support multiple positional arguments. Instead, you have to pass in a params object. The general idea, that you only have to specify the arguments that deviate from the defaults remains.
Please refer to the following table to see how the signatures have changed
| 1.x.x | 2.x.x |
| ----------------------------------------------- | ------------------------------------------------- |
| streamEvents(query, skip, limit)
| streamEvents({query, skip, limit})
|
| streamEventsSince(commitStamp, skip, limit)
| streamEvents({commitStamp, skip, limit})
|
| streamEventsSince(commitStamp, skip, limit)
| streamEventsSince({commitStamp, skip, limit})
|
| streamEventsByRevision(query, revMin, revMax)
| streamEventsByRevision({query, revMin, revMax})
|
| getEvents(query, skip, limit)
| getEvents({query, skip, limit})
|
| getEventsSince(commitStamp, skip, limit)
| getEventsSince({commitStamp, skip, limit})
|
| getEventsByRevision(query, revMin, revMax)
| getEventsByRevision({query, revMin, revMax})
|
| getEventStream(query, revMin, revMax)
| getEventStream({query, revMin, revMax})
|
| getFromSnapshot(query, revMax)
| getFromSnapshot({query, revMax})
|
Inspiration
- Jonathan Oliver's EventStore for .net.
Database Support
Currently these databases are supported:
- inmemory
- mongodb (node-mongodb-native)
- ~~redis (redis)~~
- ~~tingodb (tingodb)~~
- ~~azuretable (azure-storage)~~
- ~~dynamodb (aws-sdk)~~
own db implementation
You can use your own db implementation by extending this...
var Store = require('@avanzu/eventstore').Store,
util = require('util'),
_ = require('lodash')
class MyDB extends Store {
constructor(options) {
super(options)
}
}
module.exports = MyDB
and you can use it in this way
var es = require('@avanzu/eventstore')({
type: MyDB,
})
// es.init...