npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

piipeline

v0.3.3

Published

A class-based pipeline that supports rollback and cancellation on executing

Downloads

1

Readme

piipeline

A class-based pipeline that supports rollback and cancellation on executing


What is it?

Piipeline is an asynchronous, flexible but steady implementation of the pipeline pattern. It allows you to create a pipeline that acts as a strongly controllable object, which means it only does one thing at a time and only does another or redo it when the previous was done. Thereby, you can stop it at the right time at any stage it runs to, and immediately rollback it from that moment (instead of waiting for it to finish).

Installation

Install with npm:

npm install piipeline

Install with yarn:

yarn add piipeline

Basic Usage

const pipeline = new Pipeline()
    .pipe(stage1)
    .pipe(stage2)
    .pipe(stage3)
const result = await pipeline.execute(input)

Documentation

Pipeline

Construct a stateful Pipeline object. This one passes a value with type I as an input to the first stage and returns a value with type O which received from the final stage.

new Pipeline<I, O>()

Pipeline.pipe()

Append a stage to the end of this pipeline. An acceptable stage must be an instance class which implemented from PipelineStage abstract, or a JavaScript object with the same structure as PipelineStageLike.

Pipeline.pipe(stage: AcceptableStage): this

Examples

  • Piping class instances
import { Pipeline, PipelineStage } from 'piipeline'

class PlusTen extends PipelineStage<number, number> {
  public process(input: number): number {
    return input + 10
  }
}
class ConvertNumberToString extends PipelineStage<number, string> {
  public process(input: number): string {
    return input.toString()
  }
}
class ConcatWithDollarSign extends PipelineStage<string, string> {
  public process(input: string): string {
    return input.concat('$')
  }
}

// number is the input type of PlusTen, string is the output type of ConcatWithDollarSign
const pipeline = new Pipeline<number, string>()
  .pipe(new PlusTen())
  .pipe(new ConvertNumberToString())
  .pipe(new ConcatWithDollarSign())

const output1 = await pipeline.execute(10); // Return '20$'
const output2 = await pipeline.execute(5); // Return '15$'
  • Piping objects
const pipeline = new Pipeline<number, string>()
  .pipe({
    process: (input: number) => input + 10
  })
  .pipe({
    process: (input: number) => input.toString()
  })
  .pipe({
    process: (input: string) => input.concat('$')
  })

const output1 = await pipeline.execute(10); // Return '20$'
const output2 = await pipeline.execute(5); // Return '15$'

Pipeline.execute()

Run the pipeline.

Pipeline.execute(input?: I): Promise<O>

Returns the result of the final stage. If it were canceled before, it would return the result of the stage which got canceled before it really stop.

Notice: This method will keep returning the same Promise instance from the time it gets called until the time it is fulfilled or rejected.

const pipeline = new Pipeline()
  .pipe(/** ... **/)
  .pipe(/** ... **/)

const a = pipeline.execute() // Promise { <pending> }
const b = pipeline.execute() // Promise { <pending> }

console.log(a === b) // true
console.log(Object.is(a, b)) // true

So this will happen if you get their results this way instead of using await for two different calls:

const pipeline = new Pipeline()
  .pipe({
    process: (input) => input + 7,
  })
  .pipe({
    process: (input) => input * 2,
  })
const promise1 = pipeline.execute(3)
const promise2 = pipeline.execute(11)

promise1.then((result) => console.log(result)) // 20
promise2.then((result) => console.log(result)) // 20 😲❗
  • Solution 1: Use await Using the same pipeline, solve the same problem twice for two different inputs.
const result1 = await pipeline.execute(3)
const result2 = await pipeline.execute(11)

console.log(result1) // 20
console.log(result2) // 36
  • Solution 2: Use .clone() See .clone() for more details.
const pipeline2 = pipeline.clone()
console.log(Object.is(pipeline, pipeline2)) // false

const promise1 = pipeline.execute(3)
const promise2 = pipeline2.execute(11)

promise1.then((result) => console.log(result)) // 20
promise2.then((result) => console.log(result)) // 36

Pipeline.cancel()

Stop the pipeline. If the pipeline is running, it will prevent the next stage from running. If it has been done, it won't be able to execute anymore.

Pipeline.cancel(): Promise<any>

Returns the result of the last stage which the pipeline reached

Examples

  • If you cancel a pipeline when it is already running:
const pipeline = new Pipeline()
  .pipe({
    process: () => {
      return new Promise((resolve) => setTimeout({
        resolve(123)
      }, 5000))
    }
  })
  .pipe({
    process: (input) => input * 2
  })

const executeResult = pipeline.execute() // It was supposed to return 246, but...
const cancelResult = await pipeline.cancel() // It got canceled, so it returns 123

executeResult.then((result) => {
  console.log(executeResult) // 123
  console.log(cancelResult) // 123
})
  • If you cancel a pipeline before the next run:
const pipeline = new Pipeline()
  .pipe({
    process: () => {
      return new Promise((resolve) => setTimeout({
        resolve(123)
      }, 5000))
    }
  })
  .pipe({
    process: (input) => input * 2
  })

await pipeline.cancel()
const result = await pipeline.execute()
// It just returns undefined, better clone it if you want to use it again

Pipeline.rollback()

Undo the stages that have been run because of .execute() but in reversed order. It acts differently depending on the case:

  • If the pipeline execution has finished, it will immediately rollback from the end of it.
  • Otherwise, it will wait till the current running stage completes, then do.
Pipeline.rollback(): Promise<I>

Examples

const pipeline = new Pipeline<number, string>()
  .pipe({
    process: (input: number) => input + 10,
    rollback: (output: number) => output - 10
  })
  .pipe({
    process: (input: number) => input.toString(),
    rollback: (output: string) => Number(output)
  })
  .pipe({
    process: (input: string) => input.concat('$'),
    rollback: (output: string) => output.replace(/\$$/, '')
  })

const output = await pipeline.execute(10); // Return '20$'
const input = await pipeline.rollback(); // Return 10

Pipeline.clone()

Clone this pipeline. Use it if you want to reuse a cancelled pipeline or run two identical pipelines in parallel.

Pipeline.clone(): Pipeline<I, O>

It returns a new Pipeline instance containing a cloned stage collection.

PipelineStage

An abstract class representing a stage in the pipeline. A concrete class can concretize this abstract class by inheriting and overriding its methods.

// Just override the method you need, it's ok
class Baz extends PipelineStage<string, string> {
  public override process(input: string): string {
    return input + 'Baz'
  }
}

// A pipeline stage can also define its own properties and methods
class FooWhat extends PipelineStage<undefined, string> {
  private what: string
  public constructor(what: string) {
    super()
    this.what = what
  }
  public override process(): string {
    return 'Foo' + this.what
  }
}

const pipeline = new Pipeline()
  .pipe(new FooWhat('Bar'))
  .pipe(new Baz())

console.log(await pipeline.execute()); // FooBarBaz

PipelineStageLike

Pipeline also supports piping PipelineStage as an object

{
    process: function (input) {
        //...
        return output
    }
    rollback: function (output) {
        //...
        return input
    }
}

ParallelStage

An asynchronous implementation of PipelineStage that helps to run other pipeline stages in parallel

new ParallelStage<I, O>(stages: AcceptableStage[])

I: An array of types, where each element of it corresponds to I of each stage in stages O: An array of types, where each element of it corresponds to O of each stage in stages

Examples

type InputSequence = [number, number]
type OutputSequence = [number, string]

const parallel = new ParallelStage<InputSequence, OutputSequence> ([
  { process: (input: number): number => input * 2 },
  { process: (input: number): string => input.toString() }
])

const pipeline = new Pipeline<number, OutputSequence>()
  .pipe({
    process: (input: number): number[] => Array(2).fill(input)
  })
  .pipe(parallel)

const result = await pipeline.execute(5) // [10, '5']

License

MIT