piperunner
v0.2.3
Published
Run functions against data
Downloads
5
Maintainers
Readme
Piperunner
Manage complex application logic using in memory pipelines.
Composed by three objects:
- Pipeline: define a series of functions in order accomplish a job
- Runner: run Pipeline against a series of data
- Scheduler: run Runner[s] with start/execution/sharing policies
## Install
npm install --save piperunner
Usage
You can use any of the objects independently from the others. See the examples folder for complex examples.
Complete example
let PipeRunner = require('piperunner')
let scheduler = new PipeRunner.Scheduler()
/**
* Define a scheduler pipeline named
* 'processing_video', and a pipeline
* *step*.
*/
scheduler.pipeline('processing_video')
.step('first step', (pipeline, job) => {
console.log('acquiring video ->', job)
pipeline.next()
})
/**
* Add another step to the pipeline
*/
scheduler.pipeline('processing_video')
.step('second step', (pipeline, job) => {
console.log('> publish video ->', job, pipe.env.videoFrameRate)
pipeline.next()
})
/**
* Configure the scheduler in order
* to run the pipeline at the *start.processing.video*
* event and every 1000 milliseconds
*/
scheduler.run({
name: 'processing_video',
run: {
onEvent: 'start.processing.video',
everyMs: 1000
}
})
/**
* Assign data to the pipelines runner (internal
* to the scheduler)
*/
scheduler.feed({
name: 'processing_video',
data: [{name: 'video1'}, {name: 'video2'}, {name: 'video3'}]
})
scheduler.log(true)
scheduler.emit('start.processing.video')
/** Output
*
* Emitting start.processing.video
* Running pipeline processing_video with 1 running process
* acquiring video -> { name: 'video1' }
* > publish video -> { name: 'video1' }
* acquiring video -> { name: 'video2' }
* > publish video -> { name: 'video2' }
* acquiring video -> { name: 'video3' }
* > publish video -> { name: 'video3' }
*
*/
Scheduler example options
scheduler.run({
name: 'processing_video',
run: {
// will run when this event is emitted
onEvent: 'start.processing.video',
// will run when these events are emitted
onEvents: ['event1', 'event2'],
// run every milliseconds
everyMs: 1000
},
on: {
end: {
// emit this series of event when the pipeline ends
emit: ['end.event'],
// exec these functions then the pipeline ends
exec: [
// Pass to another pipeline the data
(scheduler, pipeline) => {
scheduler.assignData('processing_audio', 'audio_frames', pipeline.data().processedAudio)
}
]
}
}
})
How To
Stop a pipeline
scheduler.stop({name: 'processing_video'})
Ovverride the internal EventEmitter
scheduler.emitter(new EventEmitter)
End a job before reaching the pipe end
let pipe = scheduler.pipeline('processing_video')
pipe.step('maybe', (pipe, job) => {
if (job.name == 'api1-step1') {
pipe.end() // Ending
} else {
pipe.next()
}
})
End the entire runner before time [no processing the other jobs]
let pipe = scheduler.pipeline('processing_video')
pipe.step('maybe', (pipe, job) => {
if (job.name == 'api1-step1') {
pipe.endRunner() // Exit the runner
} else {
pipe.next()
}
})
Pass data between pipe functions
let pipe = scheduler.pipeline('processing_video')
pipe.step('compute data', (pipe, job) => {
pipe.next('Pass data')
})
pipe.step('need data', (pipe, job, incomingData) => {
pipe.next('Received data ->', incomingData)
})
Store internal data
let pipe = scheduler.pipeline('processing_video')
pipe.step('compute data', (pipe, job) => {
pipe.data['customdata'] = 'Yeah'
pipe.next()
})
pipe.step('need data', (pipe, job,) => {
pipe.next('Received data ->', pipe.data['customdata'])
})
Set the end callback
let runner = new Runner(jobs, pipe, () => {
console.log('All Jobs finished')
})