@neuralcontext/pivex
v0.3.0
Published
TypeScript-native library for managing asynchronous tasks in a pipeline.
Downloads
15
Maintainers
Readme
pivex
pivex helps you orchestrate pipelines and workflows that consist of complex asynchronous interactions. This is especially helpful in multi-agent AI pipelines, RAG, ETL, or anything where you need to orchestrate asyncronous (and synchronous) calls.
What does pivex do?
Makes it easier to
1. Create complicated pipelines in code
1. Handle coordination of asynchronous calls
1. Use steps across pipelines
1. Debug complicated pipelines
- NOTE: Use 'inline breakpoints' for lambdas
Provides:
1. Declarative syntax for creating pipelines
1. Native async interface that also supports synchronous actions
1. Some type safety for pipelines and steps when using TypeScript
Examples
The syntax for creating pipelines leverages your IDE to help you understand how your pipeline works.
All pipelines expose an asynchronous start function but pipelines handle both synchronous and asynchronous steps correctly.
See the Important considerations section for more information.
Example 1 (sequential asynchronous flow)
const testPipeline = pipeline<number, number>()
.sequential(
run((x: number) => x + 1),
run((x: number) => x + 2)
)
const result = await testPipeline.start(1)
expect(result).eq(4)
flowchart TB
start(("Input=2"))
subgraph Step 1
add1["AsyncAction<hr>Multiply by 3"]
add2["AsyncAction<hr>Multiply by 5"]
end
stop(("Output=30"))
start-->add1-->add2-->stop
Example 2 (parallel asynchronous flow)
const testPipeline = pipeline<number, number[]>()
.parallel(
defer((x: number) => Promise.resolve(x * 3)),
defer((x: number) => Promise.resolve(x * 5))
)
const result = await testPipeline.start(2)
expect(result[0]).eq(6)
expect(result[1]).eq(10)
flowchart TB
start(("Input=2"))
subgraph step1["Step 1 (Actions in Parallel)"]
mult1["AsyncAction<hr>Multiply by 3"]
mult2["AsyncAction<hr>Multiply by 5"]
end
step1Output["--pivex coalesces output--"]
stop(("Output=[6, 10]"))
start-->step1
mult1-->step1Output
mult2-->step1Output
step1Output-->stop
style step1Output fill:#FFFFFF
Example 3 (parallel and sequential flow)
const testPipeline = pipeline<number, number>()
.sequential(
parallel(
defer((x: number) => Promise.resolve(x * 2)),
defer((x: number) => Promise.resolve(x * 3))
),
sequential(
run((nums: number[]) => nums.reduce((p, c) => p + c)),
run((x: number) => x / 2)
)
)
const result = await testPipeline.start(2)
expect(result).eq(5)
flowchart TB
start(("Input=2"))
subgraph step1["Step 1 (Actions in Parallel)"]
mult1["AsyncAction<hr>Multiply by 3"]
mult2["AsyncAction<hr>Multiply by 5"]
end
subgraph step2["Step 2 (Sequential actions)"]
sum["Sum the numbers"]
div["Divide the sum by 2"]
end
step1Output["--pivex coalesces output--"]
stop(("Output=5"))
start-->step1
mult1-->step1Output
mult2-->step1Output
step1Output-->sum
sum-->div
div-->stop
style step1Output fill:#FFFFFF
Naming and logging
You can provide names and enable logging for any step in the pipeline
const testPipeline = pipeline<number, number>()
.sequential(
name('Step 1')
.log(true)
.run((x: number) => x + 1),
name('Step 2')
.log(true)
.run((x: number) => x + 2)
)
const result = await testPipeline.start(1)
expect(result).eq(4)
Design overview
Definitions
pipeline
- Takes initial input
- Contains steps
- Will execute child steps in sequential order
- Output from the first step is passed to the second, whose output is passed to the third, etc.
- Returns output from the last step
step
- Contains a single action or an array of child steps
- An action can be either synchronous or asynchronous. Either way the step will not complete until the action has completed
- Child steps can be executed in parallel or sequentially (in order)
- If parallel, the step starts all its children and awaits all before coalescing the results and returning an array of the results
- If sequential, executes all child steps in order, awaiting the results from each child step before passing results from previous step to the next step. The parent step returns the value returned from the last step.
action
- Contains a single function
- Can be synchronous or asynchronous
Important considerations
WARNING: pivex does not check for infinite loops. We recommmend you use the constructor-builder syntax (see code examples above) to ensure no infinite loops are created.
- The pipeline start function is natively async but handles tasks with synchronous or asynchronous actions. If you have pipelines that are entirely synchronous this imposes a very small performance penalty (~65ms for 1M steps)
TODO
- Improve typings between steps
- Update typings for step.sync so they don't allow passing asynchronous calls
- Add a stop function to cancel the pipeline