npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@sha1n/pipelines

v0.0.12

Published

A mini-framework for multi-stage pipeline execution

Downloads

3

Readme

CI Coverage GitHub npm type definitions npm

Pipelines

A mini-framework for building state driven pipelines. A pipeline can be a simple chain of responsibilities that run in process, but it really gets interesting when the process is distributed.

What's a Pipeline?

A pipeline is an abstract process that goes through several execution steps until it reaches a terminal state. A pipeline is stateful and defines several abstractions:

  1. a StatefulPipelineEntity - carries the state of the process represented by the pipeline.
  2. a StateRepository - used to persist the state of the pipeline after every state transition.
  3. a Handler or a number of them - a handler is a function that transitions the process from one state to another.
  4. a HandlerContext - any object that carries contextual volatile information throughout the process.

Why use Pipelines?

Pipelines break complex algorithms into smaller steps that have a lot of flexibility on one hand, but have an identical interface. This has many benefits:

  • Smaller consistent pieces are easy to test
  • Easy to add/remove/replace steps
  • Easy to understand the state graph (see example)
  • With consistent contextual information and data in all handlers, it is easier to monitor and create effective logs
  • A pipeline can be composed of steps that run local actions and remote actions. A pipeline can be driven by a mixture of HTTP/RPC requests, MQ messages, in process triggers and still look and behave like one consistent flow.

Use Cases

In Memory

If the entire pipeline starts and ends in one call in your process, you need something to drive the entity thought the pipeline. You can either create a pump function using createPump, or a PipelineDriver, or create your own version. See the build pipeline example here and demo here.

Distributed

If at least one state transition depends on an asynchronous execution (usually on an external systems), an in memory driver is not what you need. In such cases at least parts of the pipeline will have to be driven by external calls such as HTTP callbacks, MQ consumers etc.

Example

Lets say you have a pipeline that runs a Kubernetes job. Since Kubernetes jobs can take time to schedule resources and execute, we don't want to do it synchronously. In this case, we would normally have to pause our pipeline and continue the execution when job completes.

Here is what your pipeline state-machine might look like:

enum JobState {
  Initiated,
  Configured,
  Executed,
  Completed,
  Failed,
  Cancelled
}

// Here is what your pipeline definition might look like
const pipeline = createPipelineBuilder<Job, JobState, JobContext>()
  .withStateRepository(new YourPersistentRepository())
  // optional, this tells the pipeline to set the pipeline's state to this by default when a NonRecoverablePipelineError is caught
  .withFailedState(JobState.Failed)
  .withTransitionResolver(
    createTransitionResolverBuilder<Job, JobStats, JobContext>()
      .withTerminalStates(JobState.Completed, JobState.Failed, JobState.Cancelled)
      .withTransition(JobState.Initiated, JobState.Configured, configHandler)
      .withTransition(JobState.Configured, JobState.Executed, executionHandler)
      .withTransition(JobState.Executed, JobState.Completed, completionHandler)
      .build()
  )
  .build()

The part we want to focus on here is the transition from Executed to Completed, which is realized by the completionHandler. But first, lets understand how we get to the Executed state and what it represents in our pipeline. Executed in this case represents the fact that we requested Kubernetes to execute a job successfully. Since we lost contact with the flow, we cannot make any state transitions until the job completes, either successfully or with an error. So what now? Well, we have several options, all are equally valid from the pipeline's perspective.

We can schedule a recurrent polling job to monitor the Kubernetes job's state. In this case, once the polling job identifies that the job completed, it pushes the pipeline to completion by calling Pipeline.handle(job, ctx). If that job is not a part of the process that runs the pipeline, it makes sense that it will send a message or call an API that interacts with the pipeline. Either way the point is clear. At this point, the job is in state Executed, so calling Pipeline.handle(job, ctx) will trigger the transition handler that is associated with this state. In this case it's completionHandler. Alternatively, if we have control over the job's behavior, we can make it call an API or send an MQ message before it exits. In this case, the API controller, or MQ message handler will have to do the same thing.

Simple Build Pipeline Example

See full example code here

// Building a pipeline for a task
const pipeline = createPipelineBuilder<BuildTask, BuildState, BuildContext>()
  .withStateRepository(new BuildTasksRepository())
  .withFailedState(BuildState.Failed)
  .withOnBeforeHandler(async (task, ctx) => {
    ctx.logger.info(`[elapsed: ${ctx.elapsed(TimeUnit.Seconds)}]: ${task.state}`);
    return task;
  })
  .withOnAfterHandler(async (task, ctx) => {
    ctx.logger.info(`[elapsed: ${ctx.elapsed(TimeUnit.Seconds)}]: State is now ${task.state}`);
  })
  .withTransitionResolver(
    createTransitionResolverBuilder<BuildTask, BuildState, BuildContext>()
      .withTerminalStates(BuildState.Completed, BuildState.Failed, BuildState.Cancelled)
      // eslint-disable-next-line prettier/prettier
      .withTransition(BuildState.Initiated, BuildState.WorkspaceSetup, 
        async (task: BuildTask, ctx: BuildContext) => {
        await execute('mkdir', ['-p', ctx.workspaceDir]);
        await execute('git', ['clone', '--depth=1', task.repositoryUrl, ctx.workspaceDir]);

        return task;
      })
      .withTransition(
        BuildState.WorkspaceSetup,
        BuildState.InstallCompleted,
        async (task: BuildTask, ctx: BuildContext) => {
          await execute('yarn', ['install'], ctx.workspaceDir);
          await execute('yarn', ['build'], ctx.workspaceDir);
          return task;
        }
      )
      .withTransition(
        BuildState.InstallCompleted,
        BuildState.TestCompleted,
        async (task: BuildTask, ctx: BuildContext) => {
          await execute('yarn', ['test'], ctx.workspaceDir);
          return task;
        }
      )
      .withTransition(BuildState.TestCompleted, BuildState.Completed, async (task: BuildTask, ctx: BuildContext) => {
        await execute('echo', ['🥳 🎉 Build pipeline finished successfully!']);
        await cleanup(ctx);
        return task;
      })
      .build()
  )
  .build();


// Creating an in-memory pipeline pump
const pump = createPump(pipeline);

// Using the pipeline driver to run a task
const task = new BuildTask('[email protected]:sha1n/fungus.git');
const wsBasePath = path.join(os.tmpdir(), 'build-pipelines');
const ctx = <BuildContext>{
  workspaceDir: path.join(wsBasePath, task.id),
  elapsed: stopwatch(),
  logger: createLogger(`build:${task.id}`)
};

pump(task, ctx).finally(() => {
  return retryAround(() => execute('rm', ['-rf', wsBasePath]), exponentialBackoffRetryPolicy(2));
});

Build Pipeline Demo

The demo code can be found here and below is how you can run it.

yarn install && yarn run demo

or using NPM

npm i && npm run demo

Install

yarn install @sha1n/pipelines
npm i @sha1n/pipelines