npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

real-value-lang

v0.49.1

Published

A library providing an expression language to compose and evaluate stream processing

Downloads

114

Readme

Real Value Library

The real-value library is an expression language for composing flows of data, control and presentation events

The motivations in this are to:

  1. allow expression of end to data flows in a form which can be rendered to easily understand end to end logic
  2. to express end to end data flows such that value chains can be tested end to end inside unit tests
  3. to separate infrastructure complexity out from end to end data flows so that application of infrastructure is a separate concern
  4. to allow processing of infinite streams of data within finite computational resource
  5. to allow expression of data flows processing logic as functional composition

The value in this is expected to be

  1. faster and higher quality solution develop
  2. allow contribution to development by a larger pool of resource

Value

The value in this language is going to be provided by the ability for the language across multiple problems domains such that the time to understand how to pick up and modify a solution developed by others is reduced.

Model

A real-value flow model is a graph of data stream nodes intended to have data/events flowing between them.

A flow model should be renderable into a graphical expression language as a way to view the data flow at a macro level.

There will be different flow model nodes. The set of nodes aspire to provide functions similar to EAI Integration Patterns nodes.

The flow model is a factory used to set up data streams and the subsequent flow processing.

let model = Model() 

Data Streams

A data stream can be a set of values. The simplest example is a common separated set of values.

//logical stream of values
,1,,,,2,,3

It would be convenient to have a from stream node that can reify a stream.

model.from(',1,,,,2,,3')

Stream data can come from csv content. The following would produce a stream of a single value with AssetId=1 and SerialNo=A

from(<reference to CSV file>)

Streams should be constructable from csv string, arrays, sql tables, iterators, generators.

from([{key:1, value:1},{key: 2, value: 2}])

from(someGeneratorFunction)

Outputing streams

Data streams can be written to files using to stream nodes

from('1,2,3').to('test.log')

from([{key:1,value:1},{key:2,value:2}]).toCSV('test.csv')

It is likely we want output a stream to other data sinks including queues or databases.

syntax to be decided

A Stream Propagation

A data stream likely represents a data set arriving over time.

For the purpose of this readme we can think data transformation through a flow by considering an input and output stream which is represented as below.

,1,,,,2,,3 -=> ,1,,,,2,,3

A Stream Inspection

A log operator can be used to view a stream.

from(',1,,,,2,,3').log()

Motivation: To interogate what the stream looks like at nodes in a data flow model.

Stream Propagation Delay

The values may be delayed through a stream

,1,,,,2,,3,, -=> ,,,1,,,,2,,3

A delay operator would achieve the above translation.

from(',1,,,,2,,3').delay(2)

Motiviation: We could use to represent the activity of a truck moving a payload from mine face to processing conveyor belt.

from(',1,,,,2,,3').delay(transportTime)

Stream Filtering

It will be necessary to select content from a stream.

from('1,2,3,4').filter(x=>x%2)

Streams May Diverge/Split

Streams may need to diverge

b:1,a:1,a:2:b:2,,,b:3,a:3 => ,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3,

Streams can be piped into any number of downstream stream operators

let stream = from('1,2,3,4,5')
stream.filter(selectOdd).to('odd.log')
stream.filter(selectEven).to('event.log')

Motivation: In a mine a digger may split material into waste and product stockpiles.

let stream = from('waste:1,product:1,product:2,,,waste:3,product:3')
stream.filter(filterproduct).tap(...)
stream.filter(filterWaste).tap(...)

Streams May Merge

Streams may need to flow together.

,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3, -=> b:1,a:1,a:2:b:2,,,b:3,a:3 

Note that the a:1 syntax indicate a key with a value

A merge operator could merge 2 streams.

from(',a:3,a:3,,,,,a:3,').merge(from('b:3,,b:2,,b:3,'))

Motivation: In an IOT solution there are streams of asset data from different channels but this source is irrelevant to the UI.

from(',Asset111:on,Asset222:off,,').merge(from('Asset333:on,,Asset333:off,'))

As an alternative a from operation applied to a stream should create a merged stream.

from('1,3'{column: 'odd}).from('2,4',{column: 'event'})  //{odd:1,even:2},{odd:3,even:4},

Join operations on streams

Streams can be joined using inner,outer,left,right,xor semantics

{key:1,b:1},{key:2,b:2}  : {key:1,c:1},{key:3,b:3}  => {key:1,b:1,c:1},{key:2,b:2},{key:3,b:3}

Implemented as

let s1 = model.from([{ 'key': 1, 'b': 1 }, { 'key': 2, 'value': 2 }])
let s2 = model.from([{ 'key': 1, 'c': 1 }, { 'key': 3, 'value': 3}])
s1.join(s2,{type:'outer'})

Batch split stream to process in groups

This functionailty is useful there is a slow process for handling individual events but relatively fast for a set of events (such as DB connections). It lets you take a bunch of separate events and group them into batches of a max batch size.

from('data.xlsx').batch({
    typeFn: x=>x.type,
    addToBatchFb: x=>x,
    delay: 1000/* batch up to 1 second*/,
    maxbatch: 100/*max batch size*/
}).tap(somefunction)

Motivation: In a mine a truck can accumulate some load from a digger before moving it to a processing stockpile.

Stream Reduction

Reduce the events into a stream

The stream values may accumulate before propagating

1,1,1 -=> ,,3 
from(',1,,,,2,,3').reducer(truckCapaciityReducer)

Stream Propagation Limiting

TODO

The values may need to be limited before propagating

,1,,,,2,,3,, -=> ,1,,,,1,1,1,1,1 

A limitTo operator could facilitate splitting defined chunks

from(',1,,,,2,,3').limitTo(1).log

Motivation: A mine conveyor system has a defined throughput.

from(stockpile).limitTo(1)

Stream Processing Composition

It should be possible to compose stream operations such that specific processing logic can be built up from smaller testable units.

Motivation: In a mine it would take a truck some time to return from the stockpile before it can be reloaded.

from(',1,,,,2,,3').compose(accumulateTo(truckCapacity),delay(truckReturnTrip))

Streams May Be Accumulated into Categories

Streams may need to accumulate into categories

,a:1,a:2,,c:2,,b:3 => { a: 4, b: 3, c:2 } 

A table operator would acheive this

from(',a:1,a:2,,c:2,,,b:2,').table(accumulate)

For some production, utilization was accumulated into assets

from(',E16:10,D10:20,,E15:10,,,E16:10,').table(accumulate)

The aggregation into a table should be capable of updating statistics

,a:1,a:3,,b:2,,b:4 => average:{ a: 2, b: 3 } 

For IOT solution we may want to capture statistics across the type/class of asset

from(',Asset1:10,Asset11:20,,Asset12:10,,,Asset10:20,').map(mapToAssetType).table(movingAverage,'averageUtilizationByType')

Note that table should accumulate but also emits changes to the tables.

from('A:1,A:10,,B:2,B:2,C:3,').table(accumulate).log() //should generate A:1,A:10,B:2,/*no change*/,C:3

Enrichment

One reason to accumulate into categories is to be able to enrich event streams with meta data.

let latest = from([{id: 'a',value: 1},{id: 'b',value: 1},{id: 'a',value: 2}]).table()
from('a,a,a,a,').join(latest).log() 

Motivation: For a set of timesheet entries it may be necessary to augment records with the employee details

let employeeData = from([{id: '123',name: 'Jim Smith'},{id: '234',name: 'Sally Smith'}]).table()
from([{id: '234', time: 6}],a,a,a,').join(latest).log() 

Change Propagation

When there is a change to a stream it may be desirable to reprocess the content of a table stream. For instance getting new information about depreciation rates could require reprocessing a table stream representing the value of some assets.

from(somestream).table({propagate:change}) //only table changes are propagated.
from(somestream).table({propagate:all}) //any table change generates a stream of the entire table.

Streams May Need Accumulated Between Values

Streams may need to accumulate between values

,a:1,b:1,,b:0,,a:0 => { a: 5, b: 3 } 

A delta operation might allow this to be expressed

from(',a:1,b:1,,b:0,,a:0').table(compose(delta,accumulate))

MOtivation: In an IOT solution there is utilization related to time periods between on/off events

from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))

Streams may need to be compared against targets

Streams may need to be compared against targets

Motivation: In a mine there was a need to show up to date production versus target production during the course of an interval.

let targets = from('targets.csv')
let actuals = from(productionGenerator).join(targets,{joinFn: (actual,target)=>{
    actual.percentage = actual.value/target.value
}})

Streams as components

It should be possible to define a stream with a source to be re-used multiple times.

let identityStream = map(x=>x) //The identity processing stream
from(testStream).thru(identity) // Used to test the stream definition
from(productionStream2).thru(identity) //Used to process production

Streams that update functions

Streams should be usable to update functions used in other streams. A stream of market data is used to update a function which can be used to value an individual item.

let valuer = () => ({
    let valuationModel = //the model
    trainValue: (historicalSaleData)=>{//update model },
    value: (equipment)=>valuationModel.value(equipment)
})

from(historicalSaleData).map(x=>valuer.train(x)) //We need to update a valuation model

from(onSaleData).map(x=>valuer.value(x)).table() //then we want to use the model

Buffer stream to process aggregate statistics

It will be require to apply aggregate statistics against values in a stream. For example if we have a set of asset values we may want to normalize those values. Aggregation is a difficult concept for stream processing as you never expect to have the full set of data. To support this a buffer node is used to process a set of events (so that aggregrates can be computed) before emitting all those events which can then use those aggregates.

let statsProcessing = ()=>{
    let stats = {}
    let calcStats = () => {...}
    let useStats = () => {...}
}
from('data.csv').map(calcStats).buffer(1000).map(useStats)

Cognitive Load

Directly Address Streams

When developing a flow model there will be need to introspect the stream of data from any one node. Specifically when unit testing a model provided by some library it is useful to be able to directly address a stream node.

There are a couple ways to acheive this.

All stream nodes have names which default to the type of the node.

let filterNode = model.from('1,2,3').filter(x=>x<2>)
filterNode.getName()

Architectural Concerns

Mapping streams to different processes

While we should be able to test an entire stream process in a single simple run time, we might want to be able to deploy that stream distributed in multiple processes for scale or to solve other architectural constraints.
A pub node is used to allow streams to support both outcomes.

let streamUrl = from('data1.csv').map(tx).pub(options) //deploy to one node
from(streamUrl).map(tx2) //deploy to another node

Examples

Scheduling

There can be solutions scheduling resources to empty and load shipping containers.

  • There are a stream of containers to be unloaded from(port)
  • Containers accumulate stuff .limitTo()
  • Containers are transported delay().
  • Containers are unloaded splitTo()

Mining operations

  • There are streams of mine faces from(mineplan)
  • Faces need to be cleared splitTo(diggerLoad).accumulateTo(truckCapacitiy)
  • Trucks transport waste or mineral delay(travelTime)

Asset Whole of Life Model

  • Purchase = from('100,,,,,')

  • PlannedMaintenance = from(',-10,-10,-10,-10)

  • UnplannedMaintance = from(',,-5,-7,')

  • PlannedMaintenace.depreciate(depreciationrate).table(accumulate)

  • UnplannedMaintenace.depreciate(depreciationrate).table(accumulate)

  • merge(Purchase,PlannedMaintance,Unplanned).depreciate(depreciationrate).table(accumulate)

Asset Valuation

Assets needs to be valued by composing assets information with depreciation information related to age, utilization , location, marketsupply

function ageValuer {assettype,age} // build linear interpol from asset type and age to a value
from('...',{columns:['assetType','assetAge','value']}).map(ageValuer)  


function smuValuer {assetType,smu} //calculate variances of value based on utilization at age and then use deviation from mean utilization to adjust value
from('...',{columns:['assettype','assetSMU','value']}.map(smuvaluer) 

function locationValue{assetType,location} //
from('...',{column:['assettype','location','value']}).map(deriveLocationDepreciation)

from('assets.csv').

Install

npm install real-value-lang
yarn install real-value-lang

How to use

see the unit tests

References