A class-based pipeline that supports rollback and cancellation on executing
A class-based pipeline that supports rollback and cancellation on executing
What is it?
Piipeline is an asynchronous, flexible but steady implementation of the pipeline pattern. It allows you to create a pipeline that acts as a strongly controllable object, which means it only does one thing at a time and only does another or redo it when the previous was done. Thereby, you can stop it at the right time at any stage it runs to, and immediately rollback it from that moment (instead of waiting for it to finish).
Install with npm:
npm install piipeline
Install with yarn:
yarn add piipeline
Basic Usage
const pipeline = new Pipeline()
const result = await pipeline.execute(input)
Construct a stateful Pipeline
object. This one passes a value with type I
as an input to the first stage and returns a value with type O
which received from the final stage.
new Pipeline<I, O>()
Append a stage to the end of this pipeline. An acceptable stage must be an instance class which implemented from PipelineStage
abstract, or a JavaScript object with the same structure as PipelineStageLike
Pipeline.pipe(stage: AcceptableStage): this
- Piping class instances
import { Pipeline, PipelineStage } from 'piipeline'
class PlusTen extends PipelineStage<number, number> {
public process(input: number): number {
return input + 10
class ConvertNumberToString extends PipelineStage<number, string> {
public process(input: number): string {
return input.toString()
class ConcatWithDollarSign extends PipelineStage<string, string> {
public process(input: string): string {
return input.concat('$')
// number is the input type of PlusTen, string is the output type of ConcatWithDollarSign
const pipeline = new Pipeline<number, string>()
.pipe(new PlusTen())
.pipe(new ConvertNumberToString())
.pipe(new ConcatWithDollarSign())
const output1 = await pipeline.execute(10); // Return '20$'
const output2 = await pipeline.execute(5); // Return '15$'
- Piping objects
const pipeline = new Pipeline<number, string>()
process: (input: number) => input + 10
process: (input: number) => input.toString()
process: (input: string) => input.concat('$')
const output1 = await pipeline.execute(10); // Return '20$'
const output2 = await pipeline.execute(5); // Return '15$'
Run the pipeline.
Pipeline.execute(input?: I): Promise<O>
Returns the result of the final stage. If it were canceled before, it would return the result of the stage which got canceled before it really stop.
This method will keep returning the same Promise
instance from the time it gets called until the time it is fulfilled or rejected.
const pipeline = new Pipeline()
.pipe(/** ... **/)
.pipe(/** ... **/)
const a = pipeline.execute() // Promise { <pending> }
const b = pipeline.execute() // Promise { <pending> }
console.log(a === b) // true
console.log(Object.is(a, b)) // true
So this will happen if you get their results this way instead of using await
for two different calls:
const pipeline = new Pipeline()
process: (input) => input + 7,
process: (input) => input * 2,
const promise1 = pipeline.execute(3)
const promise2 = pipeline.execute(11)
promise1.then((result) => console.log(result)) // 20
promise2.then((result) => console.log(result)) // 20 😲❗
- Solution 1: Use
Using the same pipeline, solve the same problem twice for two different inputs.
const result1 = await pipeline.execute(3)
const result2 = await pipeline.execute(11)
console.log(result1) // 20
console.log(result2) // 36
- Solution 2: Use
See .clone() for more details.
const pipeline2 = pipeline.clone()
console.log(Object.is(pipeline, pipeline2)) // false
const promise1 = pipeline.execute(3)
const promise2 = pipeline2.execute(11)
promise1.then((result) => console.log(result)) // 20
promise2.then((result) => console.log(result)) // 36
Stop the pipeline. If the pipeline is running, it will prevent the next stage from running. If it has been done, it won't be able to execute anymore.
Pipeline.cancel(): Promise<any>
Returns the result of the last stage which the pipeline reached
- If you cancel a pipeline when it is already running:
const pipeline = new Pipeline()
process: () => {
return new Promise((resolve) => setTimeout({
}, 5000))
process: (input) => input * 2
const executeResult = pipeline.execute() // It was supposed to return 246, but...
const cancelResult = await pipeline.cancel() // It got canceled, so it returns 123
executeResult.then((result) => {
console.log(executeResult) // 123
console.log(cancelResult) // 123
- If you cancel a pipeline before the next run:
const pipeline = new Pipeline()
process: () => {
return new Promise((resolve) => setTimeout({
}, 5000))
process: (input) => input * 2
await pipeline.cancel()
const result = await pipeline.execute()
// It just returns undefined, better clone it if you want to use it again
Undo the stages that have been run because of .execute()
but in reversed order.
It acts differently depending on the case:
- If the pipeline execution has finished, it will immediately rollback from the end of it.
- Otherwise, it will wait till the current running stage completes, then do.
Pipeline.rollback(): Promise<I>
const pipeline = new Pipeline<number, string>()
process: (input: number) => input + 10,
rollback: (output: number) => output - 10
process: (input: number) => input.toString(),
rollback: (output: string) => Number(output)
process: (input: string) => input.concat('$'),
rollback: (output: string) => output.replace(/\$$/, '')
const output = await pipeline.execute(10); // Return '20$'
const input = await pipeline.rollback(); // Return 10
Clone this pipeline. Use it if you want to reuse a cancelled pipeline or run two identical pipelines in parallel.
Pipeline.clone(): Pipeline<I, O>
It returns a new Pipeline instance containing a cloned stage collection.
An abstract class representing a stage in the pipeline. A concrete class can concretize this abstract class by inheriting and overriding its methods.
// Just override the method you need, it's ok
class Baz extends PipelineStage<string, string> {
public override process(input: string): string {
return input + 'Baz'
// A pipeline stage can also define its own properties and methods
class FooWhat extends PipelineStage<undefined, string> {
private what: string
public constructor(what: string) {
this.what = what
public override process(): string {
return 'Foo' + this.what
const pipeline = new Pipeline()
.pipe(new FooWhat('Bar'))
.pipe(new Baz())
console.log(await pipeline.execute()); // FooBarBaz
Pipeline also supports piping PipelineStage as an object
process: function (input) {
return output
rollback: function (output) {
return input
An asynchronous implementation of PipelineStage that helps to run other pipeline stages in parallel
new ParallelStage<I, O>(stages: AcceptableStage[])
: An array of types, where each element of it corresponds to I
of each stage in stages
: An array of types, where each element of it corresponds to O
of each stage in stages
type InputSequence = [number, number]
type OutputSequence = [number, string]
const parallel = new ParallelStage<InputSequence, OutputSequence> ([
{ process: (input: number): number => input * 2 },
{ process: (input: number): string => input.toString() }
const pipeline = new Pipeline<number, OutputSequence>()
process: (input: number): number[] => Array(2).fill(input)
const result = await pipeline.execute(5) // [10, '5']