real-value-lang
v0.49.1
Published
A library providing an expression language to compose and evaluate stream processing
Downloads
114
Readme
Real Value Library
The real-value library is an expression language for composing flows of data, control and presentation events
The motivations in this are to:
- allow expression of end to data flows in a form which can be rendered to easily understand end to end logic
- to express end to end data flows such that value chains can be tested end to end inside unit tests
- to separate infrastructure complexity out from end to end data flows so that application of infrastructure is a separate concern
- to allow processing of infinite streams of data within finite computational resource
- to allow expression of data flows processing logic as functional composition
The value in this is expected to be
- faster and higher quality solution develop
- allow contribution to development by a larger pool of resource
Value
The value in this language is going to be provided by the ability for the language across multiple problems domains such that the time to understand how to pick up and modify a solution developed by others is reduced.
Model
A real-value flow model is a graph of data stream nodes intended to have data/events flowing between them.
A flow model should be renderable into a graphical expression language as a way to view the data flow at a macro level.
There will be different flow model nodes. The set of nodes aspire to provide functions similar to EAI Integration Patterns nodes.
The flow model is a factory used to set up data streams and the subsequent flow processing.
let model = Model()
Data Streams
A data stream can be a set of values. The simplest example is a common separated set of values.
//logical stream of values
,1,,,,2,,3
It would be convenient to have a from
stream node that can reify a stream.
model.from(',1,,,,2,,3')
Stream data can come from csv content. The following would produce a stream of a single value with AssetId=1 and SerialNo=A
from(<reference to CSV file>)
Streams should be constructable from csv string, arrays, sql tables, iterators, generators.
from([{key:1, value:1},{key: 2, value: 2}])
from(someGeneratorFunction)
Outputing streams
Data streams can be written to files using to
stream nodes
from('1,2,3').to('test.log')
from([{key:1,value:1},{key:2,value:2}]).toCSV('test.csv')
It is likely we want output a stream to other data sinks including queues or databases.
syntax to be decided
A Stream Propagation
A data stream likely represents a data set arriving over time.
For the purpose of this readme we can think data transformation through a flow by considering an input and output stream which is represented as below.
,1,,,,2,,3 -=> ,1,,,,2,,3
A Stream Inspection
A log
operator can be used to view a stream.
from(',1,,,,2,,3').log()
Motivation: To interogate what the stream looks like at nodes in a data flow model.
Stream Propagation Delay
The values may be delayed through a stream
,1,,,,2,,3,, -=> ,,,1,,,,2,,3
A delay
operator would achieve the above translation.
from(',1,,,,2,,3').delay(2)
Motiviation: We could use to represent the activity of a truck moving a payload from mine face to processing conveyor belt.
from(',1,,,,2,,3').delay(transportTime)
Stream Filtering
It will be necessary to select content from a stream.
from('1,2,3,4').filter(x=>x%2)
Streams May Diverge/Split
Streams may need to diverge
b:1,a:1,a:2:b:2,,,b:3,a:3 => ,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3,
Streams can be piped into any number of downstream stream operators
let stream = from('1,2,3,4,5')
stream.filter(selectOdd).to('odd.log')
stream.filter(selectEven).to('event.log')
Motivation: In a mine a digger may split material into waste and product stockpiles.
let stream = from('waste:1,product:1,product:2,,,waste:3,product:3')
stream.filter(filterproduct).tap(...)
stream.filter(filterWaste).tap(...)
Streams May Merge
Streams may need to flow together.
,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3, -=> b:1,a:1,a:2:b:2,,,b:3,a:3
Note that the a:1 syntax indicate a key with a value
A merge
operator could merge 2 streams.
from(',a:3,a:3,,,,,a:3,').merge(from('b:3,,b:2,,b:3,'))
Motivation: In an IOT solution there are streams of asset data from different channels but this source is irrelevant to the UI.
from(',Asset111:on,Asset222:off,,').merge(from('Asset333:on,,Asset333:off,'))
As an alternative a from operation applied to a stream should create a merged stream.
from('1,3'{column: 'odd}).from('2,4',{column: 'event'}) //{odd:1,even:2},{odd:3,even:4},
Join operations on streams
Streams can be joined using inner,outer,left,right,xor semantics
{key:1,b:1},{key:2,b:2} : {key:1,c:1},{key:3,b:3} => {key:1,b:1,c:1},{key:2,b:2},{key:3,b:3}
Implemented as
let s1 = model.from([{ 'key': 1, 'b': 1 }, { 'key': 2, 'value': 2 }])
let s2 = model.from([{ 'key': 1, 'c': 1 }, { 'key': 3, 'value': 3}])
s1.join(s2,{type:'outer'})
Batch split stream to process in groups
This functionailty is useful there is a slow process for handling individual events but relatively fast for a set of events (such as DB connections). It lets you take a bunch of separate events and group them into batches of a max batch size.
from('data.xlsx').batch({
typeFn: x=>x.type,
addToBatchFb: x=>x,
delay: 1000/* batch up to 1 second*/,
maxbatch: 100/*max batch size*/
}).tap(somefunction)
Motivation: In a mine a truck can accumulate some load from a digger before moving it to a processing stockpile.
Stream Reduction
Reduce the events into a stream
The stream values may accumulate before propagating
1,1,1 -=> ,,3
from(',1,,,,2,,3').reducer(truckCapaciityReducer)
Stream Propagation Limiting
TODO
The values may need to be limited before propagating
,1,,,,2,,3,, -=> ,1,,,,1,1,1,1,1
A limitTo
operator could facilitate splitting defined chunks
from(',1,,,,2,,3').limitTo(1).log
Motivation: A mine conveyor system has a defined throughput.
from(stockpile).limitTo(1)
Stream Processing Composition
It should be possible to compose stream operations such that specific processing logic can be built up from smaller testable units.
Motivation: In a mine it would take a truck some time to return from the stockpile before it can be reloaded.
from(',1,,,,2,,3').compose(accumulateTo(truckCapacity),delay(truckReturnTrip))
Streams May Be Accumulated into Categories
Streams may need to accumulate into categories
,a:1,a:2,,c:2,,b:3 => { a: 4, b: 3, c:2 }
A table
operator would acheive this
from(',a:1,a:2,,c:2,,,b:2,').table(accumulate)
For some production, utilization was accumulated into assets
from(',E16:10,D10:20,,E15:10,,,E16:10,').table(accumulate)
The aggregation into a table should be capable of updating statistics
,a:1,a:3,,b:2,,b:4 => average:{ a: 2, b: 3 }
For IOT solution we may want to capture statistics across the type/class of asset
from(',Asset1:10,Asset11:20,,Asset12:10,,,Asset10:20,').map(mapToAssetType).table(movingAverage,'averageUtilizationByType')
Note that table should accumulate but also emits changes to the tables.
from('A:1,A:10,,B:2,B:2,C:3,').table(accumulate).log() //should generate A:1,A:10,B:2,/*no change*/,C:3
Enrichment
One reason to accumulate into categories is to be able to enrich event streams with meta data.
let latest = from([{id: 'a',value: 1},{id: 'b',value: 1},{id: 'a',value: 2}]).table()
from('a,a,a,a,').join(latest).log()
Motivation: For a set of timesheet entries it may be necessary to augment records with the employee details
let employeeData = from([{id: '123',name: 'Jim Smith'},{id: '234',name: 'Sally Smith'}]).table()
from([{id: '234', time: 6}],a,a,a,').join(latest).log()
Change Propagation
When there is a change to a stream it may be desirable to reprocess the content of a table stream. For instance getting new information about depreciation rates could require reprocessing a table stream representing the value of some assets.
from(somestream).table({propagate:change}) //only table changes are propagated.
from(somestream).table({propagate:all}) //any table change generates a stream of the entire table.
Streams May Need Accumulated Between Values
Streams may need to accumulate between values
,a:1,b:1,,b:0,,a:0 => { a: 5, b: 3 }
A delta
operation might allow this to be expressed
from(',a:1,b:1,,b:0,,a:0').table(compose(delta,accumulate))
MOtivation: In an IOT solution there is utilization related to time periods between on/off events
from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))
Streams may need to be compared against targets
Streams may need to be compared against targets
Motivation: In a mine there was a need to show up to date production versus target production during the course of an interval.
let targets = from('targets.csv')
let actuals = from(productionGenerator).join(targets,{joinFn: (actual,target)=>{
actual.percentage = actual.value/target.value
}})
Streams as components
It should be possible to define a stream with a source to be re-used multiple times.
let identityStream = map(x=>x) //The identity processing stream
from(testStream).thru(identity) // Used to test the stream definition
from(productionStream2).thru(identity) //Used to process production
Streams that update functions
Streams should be usable to update functions used in other streams. A stream of market data is used to update a function which can be used to value an individual item.
let valuer = () => ({
let valuationModel = //the model
trainValue: (historicalSaleData)=>{//update model },
value: (equipment)=>valuationModel.value(equipment)
})
from(historicalSaleData).map(x=>valuer.train(x)) //We need to update a valuation model
from(onSaleData).map(x=>valuer.value(x)).table() //then we want to use the model
Buffer stream to process aggregate statistics
It will be require to apply aggregate statistics against values in a stream. For example if we have a set of asset values we may want to normalize those values. Aggregation is a difficult concept for stream processing as you never expect to have the full set of data. To support this a buffer node is used to process a set of events (so that aggregrates can be computed) before emitting all those events which can then use those aggregates.
let statsProcessing = ()=>{
let stats = {}
let calcStats = () => {...}
let useStats = () => {...}
}
from('data.csv').map(calcStats).buffer(1000).map(useStats)
Cognitive Load
Directly Address Streams
When developing a flow model there will be need to introspect the stream of data from any one node. Specifically when unit testing a model provided by some library it is useful to be able to directly address a stream node.
There are a couple ways to acheive this.
All stream nodes have names which default to the type of the node.
let filterNode = model.from('1,2,3').filter(x=>x<2>)
filterNode.getName()
Architectural Concerns
Mapping streams to different processes
While we should be able to test an entire stream process in a single simple run time, we might want to be able to deploy that stream distributed in multiple processes for scale or to solve other architectural constraints.
A pub node is used to allow streams to support both outcomes.
let streamUrl = from('data1.csv').map(tx).pub(options) //deploy to one node
from(streamUrl).map(tx2) //deploy to another node
Examples
Scheduling
There can be solutions scheduling resources to empty and load shipping containers.
- There are a stream of containers to be unloaded from(port)
- Containers accumulate stuff .limitTo()
- Containers are transported delay().
- Containers are unloaded splitTo()
Mining operations
- There are streams of mine faces from(mineplan)
- Faces need to be cleared splitTo(diggerLoad).accumulateTo(truckCapacitiy)
- Trucks transport waste or mineral delay(travelTime)
Asset Whole of Life Model
Purchase = from('100,,,,,')
PlannedMaintenance = from(',-10,-10,-10,-10)
UnplannedMaintance = from(',,-5,-7,')
PlannedMaintenace.depreciate(depreciationrate).table(accumulate)
UnplannedMaintenace.depreciate(depreciationrate).table(accumulate)
merge(Purchase,PlannedMaintance,Unplanned).depreciate(depreciationrate).table(accumulate)
Asset Valuation
Assets needs to be valued by composing assets information with depreciation information related to age, utilization , location, marketsupply
function ageValuer {assettype,age} // build linear interpol from asset type and age to a value
from('...',{columns:['assetType','assetAge','value']}).map(ageValuer)
function smuValuer {assetType,smu} //calculate variances of value based on utilization at age and then use deviation from mean utilization to adjust value
from('...',{columns:['assettype','assetSMU','value']}.map(smuvaluer)
function locationValue{assetType,location} //
from('...',{column:['assettype','location','value']}).map(deriveLocationDepreciation)
from('assets.csv').
Install
npm install real-value-lang
yarn install real-value-lang
How to use
see the unit tests
References
Kafka Stream
Most.js reactive programming library
It might be appropriate to consider how to use real-value-lang to perform simulations such as those possible with SLX