@frando/kappa-core
v6.0.0
Published
Minimal peer-to-peer database, based on kappa architecture.
Downloads
6
Readme
kappa-core
kappa-core WIP rewrite
kappa-core is a database abstraction for append-only logs and add-only sets. A kappa-core is a container for pairs of sources and views, called flows. In each flow, data flows from a source into a view. Sources have a pull function that fetches new messages since the last pull. Views have a map function which is called for each batch of messages from the source.
kappa-core itself is dependencyless, but this module also contains stateful source handlers for hypercores, multifeed, and corestore.
API
const { Kappa } = require('kappa-core')
const kappa = new Kappa()
Create a new kappa core.
kappa.use(name, source, view)
Register a flow.
name
(string) the name of the flow, has to be unique per kappa coresource
object with properties:pull: function (next)
: (required) Handler to pull new messages from the source. Should callnext(error, messages, finished, onindexed)
:error
: optional errormessages
: array of messagesfinished: true
: if set to false, signal that more messages are pendingonindexed: function (cb)
: Function that will be called when the view finished indexing. Use this to update the source state and callcb(err, stateContext)
when finished.stateContext
is an optional state object with, by convention, the following keys:{ totalBlocks: Number, indexedBlocks: Number, prevIndexedBlocks: Number }
open: function (flow, cb)
(optional) Handler to call on open.flow
is the current flow object (see below for docs). Callcb
when done with opening.close: function (cb)
: (optional) Handler to call on close. Has to callcb
.ready: function (cb)
: (optional): Handler to wait for running operations to finish. Callcb
after currently pending operations that might callupdate
on the flow have finished.reset: function (cb)
: (required) Handler to reset internal state. This is called when a full reindex is necessary. This means that the next pull ought to start at the beginning.storeVersion: function (version, cb)
: (required) Handler to store the flow version number.fetchVersion: function (cb)
: (required) Handler to fetch the version stored withstoreVersion
.- See the
SimpleState
docs below how to easily implement thereset
,storeVersion
andfetchVersion
methods.
view
object with properties:open: function (flow, cb)
(optional) Handler to call on open.flow
is the current flow object (see below for docs). Callcb
when done with opening.close: function (cb)
: (optional) Handler to call on close. Has to callcb
.map: function (messages, next)
(required) Handler for each batch of messages. Callnext
when done indexing this batch of messages. Can also return a promise which will be awaited (do not callnext
if returning a promise).reset: function (cb)
: (required) Handler to delete all indexed data. This is called by the Kappa core when a complete reindex is necessary. Themap
function will receive messages from the start on afterwards.version: int
The view version. If the version is increased, the Kappa core will clear and restart the indexing for this view after the next reopening of the core. Defaults to1
.
Both source
and view
can have an api
property with an object of functions. The functions are exposed on kappa.view[name]
/ kappa.source[name]
. Their this
object refers to the flow they are part of.
The source has to track its state, so that subsequent calls to pull()
do not return the same messages. Use the onindexed
callback to update state. How to track its state is up to the source implementation. kappa-core provides a SimpleState
helper to simplify this, see its documentation below.
There are several source handlers included in kappa-core (TODO: document sources). See the tests and sources directories.
kappa.reset(name, cb)
Reset a specific flow, to restart indexing. This is equal to reopening the kappa-core with a changed view version for this flow.
kappa.ready(names, cb)
Call cb
exactly once, after all flows with a name in the names
array have finished processing. If names
is empty, all flows will be awaited. This names
is a string, the flow of this name will be awaited. If the requested flows are already ready, cb
is called immediately.
kappa.pause()
Pause processing of all flows
kappa.resume()
Resume processing of all flows
Flow
When calling kappa.use()
a new Flow is created. A Flow is the combination of a source and a view - where the data flows from the source into the view. The Flow
object is passed to sources and views in their open
handler. It has this public API:
flow.name
: (string) A name that uniquely identifies this flow within the Kappa core.flow.update()
: Signal to the flow that the source has new data available. You want to call this from a source when the source has new data. If the Kappa core is not paused, this will cause thepull
handler to be called.flow.ready(cb)
: Callscb
(with no arguments) when this flow has finished processing all messages.cb
is called immediately if the flow is already finished.flow.getState()
: Get the current indexing state. Returns an object:{ state: 'idle' | 'running' | 'paused' | 'error', error: null | Error, // ... other keys as returned by the source // by convention this should include the following keys: totalBlocks: Number, indexedBlocks: Number, prevIndexedBlocks: Number }
flow.view
: Object with the view's API functionsflow.source
: Object with the source's API functions
SimpleState
kappa-core
exports a SimpleState
class that can be used by sources for a simple state handling. It persists state either in-memory, and supports a LevelDB (or compatible) option for persistence.
Example:
const { Kappa, SimpleState } = require('kappa-core')
function createSource (opts) {
const state = new SimpleState({ db: opts.db })
return {
pull (next) {
// get your current state
state.get((err, state) => {
if (err) return next()
// fetch messages from your data source
fetchMessages(state, ({ messages, finished, nextState }) => {
// call next with an onindexed handler
next(
null,
messages,
finished,
function onindexed (cb) {
// store the new state
state.put(nextState, cb)
}
)
})
})
},
fetchVersion: state.fetchVersion,
storeVersion: state.storeVersion,
reset (cb) {
state.put('', cb)
}
}
}
Sources
hypercore
const createHypercoreSource = require('kappa-core/sources/hypercore')
const source = createHypercoreSource({ feed, db })
where feed
is a hypercore instance and db
is a levelup instance (for persisting state)
multifeed
const createMultifeedSource = require('kappa-core/sources/multifeed')
const source = createMultifeedSource({ feeds, db })
where feeds
is a multifeed instance and db
is a levelup instance (for persisting state)
This source exposes an API method feed (key)
that return a feed by key from the underlying multifeed.
corestore
const createCorestoreSource = require('kappa-core/sources/corestore')
const source = createCorestoreSource({ store, db })
where store
is a corestore instance and db
is a levelup instance (for persisting state)
This source exposes an API method feed (key)
that return a feed by key from the underlying corestore.
kappa-core is a minimal peer-to-peer database, based on append-only logs and materialized views.
Introduction
kappa-core is built on an abstraction called a kappa architecture, or "event sourcing". This differs from the traditional approach to databases, which is centered on storing the latest value for each key in the database. You might have a table like this:
|id|key|value| |--|--|--| |51387|soup|cold| |82303|sandwich|warm| |23092|berries|room temp|
If you wanted to change the value of soup
to warm
, you would modify the
entry with id=51387
so that the table was now
|id|key|value| |--|--|--| |51387|soup|warm| |82303|sandwich|warm| |23092|berries|room temp|
This table now, once again, represents the current state of the data.
There are some consequences to this style of data representation:
- historic data is lost
- there is exactly one global truth for any datum
- no verifiable authorship information
- data is represented in a fixed way (changing this requires "table migrations")
In contrast, kappa architecture centers on a primitive called the "append-only log" as its single source of truth.
An append-only log is a data structure that can only be added to. Each entry in a log is addressable by its "sequence number" (starting at 0, then 1, 2, 3, ...). In the case of kappa-core, which uses hypercore underneath, each log is also identified by a cryptographic public key, which allows each log entry to be digitally signed with that log's private key, certifying that each entry in the log was indeed authored by the same person or device. A single kappa-core database can have one, ten, or hundreds of append-only logs comprising it.
kappa-core still uses tables like the above, though. However, instead of being the source of truth, these tables are generated (or materialized) from the log data, providing a view of the log data in a new or optimized context. These are called materialized views.
The twin concepts of append-only logs and materialized views are the key concepts of kappa-core. Any kappa-core database does only a few things:
- define various materialized views that it finds useful
- write data to append-only logs
- query those views to retrieve useful information
Let's look at an example of how the traditional table from the beginning of this section could be represented as a kappa architecture. The three initial rows would begin as log entries first:
[
{
id: 51387,
key: 'soup',
value: 'cold'
},
{
id: 82303,
key: 'sandwich',
value: 'warm'
},
{
id: 23092,
key: 'berries',
value: 'room temp'
}
]
These might be written to one log, or perhaps spread across several. They all get fed into materialized views in a nondeterministic order anyway, so it doesn't matter.
To produce a look-up table like before, a view might be defined like this:
when new log entry E:
table.put(E.key, E.value)
This would map each key
from the full set of log entries to its value
,
producing this table:
|key|value| |--|--| |soup|cold| |sandwich|warm| |berries|room temp|
Notice id
isn't present. We didn't need it, so we didn't bother writing it to
the view. It's still stored in each log entry it came from though.
Now let's say an entry like { id: 51387, key: 'soup', value: 'warm' }
is
written to a log. The view logic above the table dictates that the key
is
mapped to the value
for this view, so the a table would be produced:
|key|value| |--|--| |soup|warm| |sandwich|warm| |berries|room temp|
Like the traditional database, the table is mutated in-place to produce the new current state. The difference is that this table was derived from immutable log data, instead of being the truth source itself.
This is all very useful:
- log entries are way easier to replicate over a network or USB keys than tables
- the log entries are immutable, so they can be cached indefinitely
- the log entries are digitally signed, so their authenticity can be trusted
- views are derived, so they can be regenerated
#4 is really powerful and worth examination: views can be regenerated. In kappa-core, views are versioned: the view we just generated was version 1, and was defined by the logic
when new log entry E:
table.put(E.key, E.value)
What if we wanted to change this view at some point, to instead map the entry's
id
to its value
? Maybe like this:
when new log entry E:
table.put(E.id, E.value)
With kappa-core, this would mean bumping the view's version to 2
.
kappa-core will purge the existing table, and regenerate it from scratch by
processing all of the entries in all of the logs all over again. This makes
views cheap, and also means no table migrations! Your data structures can
evolve as you program evolves, and peers won't need to worry about migrating to
new formats.
Lastly, a kappa-core database is able to replicate itself to another
kappa-core database. The replicate
API (below) returns a Node Duplex
stream. This stream can operate over any stream-compatible transport medium,
such as TCP, UTP, Bluetooth, a Unix pipe, or even audio waves sent over the
air! When two kappa-core databases replicate, they exchange the logs and the
entries in the logs, so that both sides end up with the same full set of log
entries. This will trigger your database's materialized views to process these
new entries to update themselves and reflect the latest state.
Because this is all built on hypercore, replication can be done over an encrypted channel.
Thanks for reading! You can also try the kappa-core workshop to use kappa-core yourself, or get support and/or chat about development on
- IRC: #kappa-db on Freenode
Example
This example sets up an on-disk log store and an in-memory view store. The view tallies the sum of all of the numbers in the logs, and provides an API for getting that sum.
var kappa = require('kappa-core')
var view = require('kappa-view')
var memdb = require('memdb')
// Store logs in a directory called "log". Store views in memory.
var core = kappa('./log', { valueEncoding: 'json' })
var store = memdb()
// View definition
var sumview = view(store, function (db) {
// Called with a batch of log entries to be processed by the view.
// No further entries are processed by this view until 'next()' is called.
map: function (entries, next) {
db.get('sum', function (err, value) {
var sum
if (err && err.notFound) sum = 0
else if (err) return next(err)
else sum = value
})
entries.forEach(function (entry) {
if (typeof entry.value === 'number') sum += entry.value
})
db.put('sum', sum, next)
}
// Whatever is defined in the "api" object is publicly accessible
api: {
get: function (core, cb) {
this.ready(function () { // wait for all views to catch up
cb(null, sum)
})
}
},
})
// the api will be mounted at core.api.sum
core.use('sum', 1, sumview) // name the view 'sum' and consider the 'sumview' logic as version 1
core.writer('default', function (err, writer) {
writer.append(1, function (err) {
core.api.sum.get(function (err, value) {
console.log(value) // 1
})
})
})
API
var kappa = require('kappa-core')
var core = kappa(storage, opts)
Create a new kappa-core database.
storage
is an instance of random-access-storage. If a string is given, random-access-file is used with the string as the filename.- Valid
opts
include:valueEncoding
: a string describing how the data will be encoded.multifeed
: A preconfigured instance of multifeed
core.writer(name, cb)
Get or create a local writable log called name
. If it already exists, it is
returned, otherwise it is created. A writer is an instance of
hypercore.
var feed = multi.feed(key)
Fetch a log / feed by its public key (a Buffer
or hex string).
var feeds = core.feeds()
An array of all hypercores in the kappa-core. Check a feed's key
to find the
one you want, or check its writable
/ readable
properties.
Only populated once core.ready(fn)
is fired.
core.use(name[, version], view)
Install a view called name
to the kappa-core instance. A view is an object of
the form
// All are optional except "map"
{
// Process each batch of entries
map: function (entries, next) {
entries.forEach(function (entry) {
// ...
})
next()
},
// Your useful functions for users of this view to call
api: {
someSyncFunction: function (core) { return ... },
someAsyncFunction: function (core, cb) { process.nextTick(cb, ...) }
},
// Save progress state so processing can resume on later runs of the program.
// Not required if you're using the "kappa-view" module, which handles this for you.
fetchState: function (cb) { ... },
storeState: function (state, cb) { ... },
clearState: function (cb) { ... }
// Runs after each batch of entries is done processing and progress is persisted
indexed: function (entries) { ... },
// Number of entries to process in a batch
maxBatch: 100,
}
NOTE: The kappa-core instance core
is always passed as the fist parameter
in all of the api
functions you define.
version
is an integer that represents what version you want to consider the
view logic as. Whenever you change it (generally by incrementing it by 1), the
underlying data generated by the view will be wiped, and the view will be
regenerated again from scratch. This provides a means to change the logic or
data structure of a view over time in a way that is future-compatible.
The fetchState
, storeState
, and clearState
functions are optional: they
tell the view where to store its state information about what log entries have
been indexed thus far. If not passed in, they will be stored in memory (i.e.
reprocessed on each fresh run of the program). You can use any backend you want
(like leveldb) to store the Buffer
object state
. If you use a module like
kappa-view, it will handle state
management on your behalf.
indexed
is an optional function to run whenever a new batch of entries have
been indexed and written to storage. Receives an array of entries.
core.ready(viewNames, cb)
Wait until all views named by viewNames
are caught up. e.g.
// one
core.ready('sum', function () { ... })
// or several
core.ready(['kv', 'refs', 'spatial'], function () { ... })
If viewNames is []
or not included, all views will be waited on.
core.pause([viewNames], [cb])
Pause some or all of the views' indexing process. If no viewNames
are given,
they will all be paused. cb
is called once the views finish up any entries
they're in the middle of processing and are fully stopped.
core.resume([viewNames])
Resume some or all paused views. If no viewNames
is given, all views are
resumed.
core.replicate([opts])
Create a duplex replication stream. opts
are passed in to
multifeed's API of the same name.
core.on('error', function (err) {})
Event emitted when an error within kappa-core has occurred. This is very important to listen on, lest things suddenly seem to break and it's not immediately clear why.
Install
With npm installed, run
$ npm install kappa-core
Useful helper modules
Here are some useful modules that play well with kappa-core for building materialized views:
- unordered-materialized-bkd: spatial index
- unordered-materialized-kv: key/value store
- unordered-materialized-backrefs: back-references
Why?
kappa-core is built atop two major building blocks:
- hypercore, which is used for (append-only) log storage
- materialized views, which are built by traversing logs in potentially out-of-order sequence
hypercore provides some very useful superpowers:
- all data is cryptographically associated with a writer's public key
- partial replication: parts of logs can be selectively sync'd between peers, instead of all-or-nothing, without loss of cryptographic integrity
Building views in arbitrary sequence is more challenging than when order is known to be topographic or sorted in some way, but confers some benefits:
- most programs are only interested in the latest values of data; the long tail of history can be traversed asynchronously at leisure after the tips of the logs are processed
- the views are tolerant of partially available data. Many of the modules listed in the section below depend on topographic completeness: all entries referenced by an entry must be present for indexes to function. This makes things like the equivalent to a shallow clone (think [git][git-shallow]), where a small subset of the full dataset can be used and built on without breaking anything.
Acknowledgments
kappa-core is built atop ideas from a huge body of others' work:
- flumedb
- secure scuttlebutt
- hypercore
- hyperdb
- forkdb
- hyperlog
- a harmonious meshing of ideas with @substack in the south of spain
Further Reading
License
ISC
[git-shallow]: https://www.git-scm.com/docs/gitconsole.log(one#gitconsole.log(one---depthltdepthgt) [kappa]: http://kappa-architecture.com