@ms-cloudpack/data-bus
v0.4.2
Published
A data bus implementation.
Downloads
969
Keywords
Readme
@ms-cloudpack/data-bus
Provides a general api for subscribing/unsubscribing from data and defining providers to that data.
Examples of usage:
Create a registry of NPM package metadata - a provider monitors needs and fetches data on demand. Many consumers can consume the metadata for any package; providers will automatically debounce many requests, centralize the cache expiration strategy, polling vs push notifications, etc. Consumers have a simple approach for subscribing to data and get push notifications when the data changes.
Create a unified logging system - Publishers can publish data from many distributed sources. Logging is published in a hierarchy:
/logging/${app}/${severity}/${area}
Consumers can subscribe to any part of this tree. For example, if a consumer subscribes to/logging/my-app/error
, they will receive change events for all errors inmy-app
.
Usage
Initialization
A data bus can be created using the createDataBus
api:
import { createDataBus } from '@ms-cloudpack/data-bus';
export const bus = createDataBus();
Creating data paths
Data in the data bus is represented as a tree. Each node in the tree has a parent and children, and can contain data. To represent the expectations of what data is in each node, we define a data path object and reference it when publishing and subscribing to it. This ensures proper type safety.
The purpose of the path
parameter is to define the path in the tree where the data lives. This is useful because you can subscribe to any branch of the tree and receive change notifications (via subscriptions or provider activations) for all nodes under that branch.
Example of defining a data path:
import z from 'zod';
const itemCountPath = createDataPath({
path: ['items', 'itemCount'],
type: z.number(),
});
Subscribing to data
The data bus provides a method subscribe
for consumers to subscribe to different types of data. As that data changes, consumers will be notified of changes. Any number of consumers can subscribe for the same data. Data is
hierarchical, so you can subscribe to a full branch, or to a leaf node within the tree. Paths are represented by
string arrays.
import { itemCountPath } from './dataPaths';
const dispose = bus.subscribe(itemCountPath, (count) => {
console.log(`${data} (${path.join('/')})`);
});
Callers can unsubscribe from data by calling the dispose
function returned from the subscribe
call.
Publishing data
Data can be published using the publish
api:
import { itemCountPath } from './dataPaths';
bus.subscribe(itemCountPath, (count) => console.log(count));
bus.publish(itemCountPath, 42);
// The console will read: 42
Using providers to produce data on demand
Providers can also be registered with the bus to provide data on demand, so that data does not need to be gathered unless it is being observed:
import { itemCountPath } from './dataPaths';
const intervalId;
bus.addProvider({
path: itemCountPath,
// Called when activated.
onActivate: (itemCountPath) => {
let counter = 0;
intervalId = setInterval(() => {
bus.publish(itemCountPath, counter++);
}, 1000);
},
// Called when deactivated.
onDeactivate: (itemCountPath) => {
bus.publish(itemCountPath, 0);
clearInterval(intervalId);
},
});
In this case, the counter will start counting only when subscribers care about a particular type, and will cease to count when subscribers unsubscribe. Providers are activated when at least 1 subscriber becomes active for a path, and deactivate when all subscribers have unsubscribed to that path. This ensures that providers don't do extra work to provide the same data redundantly.
Reading data
In some cases, we may need to simply read the data without subscribing. In this case, the getData
api can be used:
import { itemCountPath } from './dataPaths';
const currentValue = bus.getData(itemCountPath);
Motivation
Cloudpack runs a bundle service which is accessed to retrieve esm package bundles on the fly. As dependencies are requested, the service produces assets into a cache folder.
Many consumers may request the same bundle at the same time. This implies 1. We need to dedupe redundant requests, and 2. We need to reuse the output to fulfill future requests.
In the original implementation, requests to the same resource would first check cache for results, find it missing, and start bundling by calling an imperative api like async bundle(options: Options): Result
. This would write to a cache location, which would then fulfill future requests. But this left race conditions because we weren't deduping in-flight requests, nor were we re-populating bundles that changed inputs slightly. This led to the question: what kind of architecture would we need to essentially dedupe redundant requests, and provide a general architecture for caching content?
We ran into other similar scenarios:
- 1 or more Dashboard pages need a list of packages installed. When the dependencies change, the page should be notified so that it can update ui.
- 1 or more Dashboard instances needs metadata for a package. Metadata access is expensive, so we should dedupe requests and ensure the results go to all subscribers.
- Dashboards need to show the current build state for a session. Green for pass, red for fail, and should render errors when clicked. This also requires live updates, and may have multiple subscribers.
This led to the idea of a data-bus; a general mediator for managing subscriptions and informing providers when results are available or have changed.
Future considerations
Is there existing tech to accomplish the goals? What about Fluid? What about lage as a service?
Should data be represented hierarchically, or is it tag based, or a mix? Hierarchies are better than flat, but not as robust as tags. Scenario: the logging scenario; I want logging, this specific app, and these various levels, but for a specific area. Maybe this means subscribers/producers should have matchers. Values should just have tags. Maybe tags could have hierarchies (app areas, logging levels). This would be more expensive to resolve notifications - we'd have to linearly filter subscribers based on matchers. Same with producers for pre-publishing transforms. But it would enable more fine-tuned resolution.
How do subscribers subscribe to collections rather than individual items? In some cases, UX needs to list what data is available as it becomes available. For example, I might care about a list of sessions. One approach is to maintain the list separately as an individual item - then get the items one at a time. A better approach would be for the data-bus to allow subscribers to subscribe to all children in a node. This would return as a flat array of items. We'd want to consider large lists and how they become paginated to avoid perf problems downstream.