npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@neuralcontext/pivex

v0.3.0

Published

TypeScript-native library for managing asynchronous tasks in a pipeline.

Downloads

362

Readme

pivex

pivex helps you orchestrate pipelines and workflows that consist of complex asynchronous interactions. This is especially helpful in multi-agent AI pipelines, RAG, ETL, or anything where you need to orchestrate asyncronous (and synchronous) calls.

What does pivex do?

Makes it easier to

1. Create complicated pipelines in code
1. Handle coordination of asynchronous calls
1. Use steps across pipelines
1. Debug complicated pipelines
    - NOTE: Use 'inline breakpoints' for lambdas

Provides:

1. Declarative syntax for creating pipelines
1. Native async interface that also supports synchronous actions
1. Some type safety for pipelines and steps when using TypeScript

Examples

The syntax for creating pipelines leverages your IDE to help you understand how your pipeline works.

All pipelines expose an asynchronous start function but pipelines handle both synchronous and asynchronous steps correctly.

See the Important considerations section for more information.

Example 1 (sequential asynchronous flow)

const testPipeline = pipeline<number, number>()
    .sequential(
        run((x: number) => x + 1),
        run((x: number) => x + 2)
    )
const result = await testPipeline.start(1)
expect(result).eq(4)
flowchart TB
    start(("Input=2"))
        subgraph Step 1
            add1["AsyncAction<hr>Multiply by 3"]
            add2["AsyncAction<hr>Multiply by 5"]
    end
    stop(("Output=30"))
    start-->add1-->add2-->stop

Example 2 (parallel asynchronous flow)

const testPipeline = pipeline<number, number[]>()
    .parallel(
        defer((x: number) => Promise.resolve(x * 3)),
        defer((x: number) => Promise.resolve(x * 5))
    )
const result = await testPipeline.start(2)
expect(result[0]).eq(6)
expect(result[1]).eq(10)
flowchart TB
    start(("Input=2"))
    subgraph step1["Step 1 (Actions in Parallel)"]
        mult1["AsyncAction<hr>Multiply by 3"]
        mult2["AsyncAction<hr>Multiply by 5"]
    end
    step1Output["--pivex coalesces output--"]
    stop(("Output=[6, 10]"))
    start-->step1
    mult1-->step1Output
    mult2-->step1Output
    step1Output-->stop
    style step1Output fill:#FFFFFF

Example 3 (parallel and sequential flow)

const testPipeline = pipeline<number, number>()
    .sequential(
        parallel(
            defer((x: number) => Promise.resolve(x * 2)),
            defer((x: number) => Promise.resolve(x * 3))
        ),
        sequential(
            run((nums: number[]) => nums.reduce((p, c) => p + c)),
            run((x: number) => x / 2)
        )
    )
const result = await testPipeline.start(2)
expect(result).eq(5)
flowchart TB
    start(("Input=2"))
    subgraph step1["Step 1 (Actions in Parallel)"]
        mult1["AsyncAction<hr>Multiply by 3"]
        mult2["AsyncAction<hr>Multiply by 5"]
    end
    subgraph step2["Step 2 (Sequential actions)"]
        sum["Sum the numbers"]
        div["Divide the sum by 2"]
    end
    step1Output["--pivex coalesces output--"]
    stop(("Output=5"))
    start-->step1
    mult1-->step1Output
    mult2-->step1Output
    step1Output-->sum
    sum-->div
    div-->stop
    style step1Output fill:#FFFFFF

Naming and logging

You can provide names and enable logging for any step in the pipeline

       const testPipeline = pipeline<number, number>()
            .sequential(
                name('Step 1')
                    .log(true)
                    .run((x: number) => x + 1),
                name('Step 2')
                    .log(true)
                    .run((x: number) => x + 2)
            )
        const result = await testPipeline.start(1)
        expect(result).eq(4)

Design overview

Definitions

pipeline

  • Takes initial input
  • Contains steps
  • Will execute child steps in sequential order
  • Output from the first step is passed to the second, whose output is passed to the third, etc.
  • Returns output from the last step

step

  • Contains a single action or an array of child steps
  • An action can be either synchronous or asynchronous. Either way the step will not complete until the action has completed
  • Child steps can be executed in parallel or sequentially (in order)
  • If parallel, the step starts all its children and awaits all before coalescing the results and returning an array of the results
  • If sequential, executes all child steps in order, awaiting the results from each child step before passing results from previous step to the next step. The parent step returns the value returned from the last step.

action

  • Contains a single function
  • Can be synchronous or asynchronous

Important considerations

WARNING: pivex does not check for infinite loops. We recommmend you use the constructor-builder syntax (see code examples above) to ensure no infinite loops are created.

  • The pipeline start function is natively async but handles tasks with synchronous or asynchronous actions. If you have pipelines that are entirely synchronous this imposes a very small performance penalty (~65ms for 1M steps)

TODO

  1. Improve typings between steps
  2. Update typings for step.sync so they don't allow passing asynchronous calls
  3. Add a stop function to cancel the pipeline