strict-stream
v1.0.26
Published
strict-stream - strictly / strongly typed stream
Downloads
494
Maintainers
Readme
strict-stream
strict-stream
is a tiny and lightweight library that helps manage strictly/strongly typed streams
using AsyncIterable<Type>
as the core principle to enable strict data pipelines with useful behavior.
It ensures that the data flowing through a stream conforms to a specific data type or structure, which helps catch errors early on, reduce bugs, and make code more reliable and easier to maintain.
Why Iterable<T>
and AsyncIterable<T>
Matter
In JavaScript
and TypeScript
, Iterable<T>
and AsyncIterable<T>
are two important interfaces that allow you to work with sequences of values.
An Iterable<T>
is an object that can be iterated over using a for...of
loop or the Spread
operator, while an AsyncIterable<T>
represents a sequence of values that are produced asynchronously, such as through a network request or database query.
Using these interfaces has several advantages:
Type safety: By using
Iterable<T>
andAsyncIterable<T>
, you can ensure that the data you're working with is strongly typed and conforms to a specific schema. This helps catch errors early in the development process and makes your code more robust and reliable.Composability: Because
Iterable<T>
andAsyncIterable<T>
are composable, you can easily create complex data pipelines that process, transform, and filter data in a modular way. This makes it easier to reason about your code and maintain it over time.Performance:
Iterable<T>
andAsyncIterable<T>
are highly optimized for performance, allowing you to process data streams with high throughput up to millions of records per second. This makes them an ideal choice for working with large datasets or real-time data streams.
For more information on Iterable<T>
and AsyncIterable<T>
, check out the following links:
Installation
To install strict-stream
, you can use your preferred package manager:
npm install strict-stream
or
yarn add strict-stream
Usage
Here's a simple example that demonstrates how to use strict-stream
:
import {of} from 'strict-stream';
import {filter} from 'strict-stream/filter';
async function* generateData() {
yield {name: 'Alice', age: 30};
yield {name: 'Bob', age: 40};
yield {name: 'Charlie', age: 50};
}
async function example() {
// AsyncIterable<{name: string, age: number}>
const stream = of(generateData())
.pipe(
filter(({age}) => age > 30)
);
for await (const data of stream) {
console.log(`Name: ${data.name}, Age: ${data.age}`);
}
// Name: Bob, Age: 40
// Name: Charlie, Age: 50
}
await example();
- This code demonstrates how to use the
of
andfilter
functions from the library to create a typed stream and filter the data. - First, the
generateData
function is anasync generator
function that yields objects with aname
andage
property. - Next, the of function is used to create a
typed stream
from the generator functiongenerateData
. The resulting stream is anAsyncIterable
of objects with aname
andage
property. - The
pipe
method is then used to apply afilter
to thestream
, keeping only the objects where theage
property isgreater
than30
. - Finally, the resulting stream is iterated over using a
for-await-of
loop. The output shows only the objects whereage
isgreater
than30
.
IDE hints
It gives you these IDE hints with strong types and guides you to follow types.
There is a function getUsers()
that gives you AsyncIterable<{name: string, age: number}>
and all pipeline operations are type-safe.
A quick look at transformations
import {sequence} from "strict-stream/sequence";
import {map} from "strict-stream/map";
import {from} from "strict-stream/from";
import {filter} from "strict-stream/filter";
async function example() {
const usersStream =
from(
// gives AsyncIterable<number>
// sequence 0,1,2,3,4;
sequence(5)
)
.pipe(
// takes only 0, 2, 4
filter((id) => id % 2 === 0)
)
.pipe(
// maps to {type: string, id: number, name: string}
map((id) => ({
type: 'User',
id,
name: `User ${id}`
}))
)
// inferred type
// AsyncIterable<{type: string, id: number, name: string}>
for await (const user of usersStream) {
console.log(user)
}
// { type: 'User', id: 0, name: 'User 0' }
// { type: 'User', id: 2, name: 'User 2' }
// { type: 'User', id: 4, name: 'User 4' }
}
- There is a
sequence
function that generates a sequence of numbers 0,1,2,3,4. - This sequence is filtered using the
filter
function to include only the even numbers (i.e., 0, 2, and 4). - And the resulting sequence is mapped using the
map
function to convert each number into auser object
with atype
,name
andid
. - Finally
userStreams
is a strictly typedAsyncIterable<{type: string, id: number, name: string}>
How to make a stream with AsyncGenerator?
An example of how to create a stream / AsyncIterable
with a generator:
async function* generateData() {
yield {name: 'Alice', age: 30};
yield {name: 'Bob', age: 40};
yield {name: 'Charlie', age: 50};
}
async function example() {
const stream = generateData();
for await (const data of stream) {
console.log(`Name: ${data.name}, Age: ${data.age}`);
}
// Name: Alice, Age: 30
// Name: Bob, Age: 40
// Name: Charlie, Age: 50
}
await example();
- In this example,
generateData
is a generator function that yields three objects withname
andage
properties. - The
example
function creates a stream from the generator by simply calling it and assigns it to thestream
variable. - Then, it iterates over the stream using a
for await...of
loop and logs thename
andage
properties of each object.
How to make a stream with reader?
reader
function which creates an async iterable stream from a reader function.
The reader function is called every time a new value is requested from the stream and should return the value or DONE if there are no more values.
import { reader } from 'strict-stream/reader';
async function example() {
const array = [1, 2, 3];
const stream = reader<number>(() => {
const value = array.shift();
return value === undefined ? reader.DONE : value;
});
for await (const number of stream) {
console.log(number);
}
// Output: 1
// Output: 2
// Output: 3
}
await example();
- In this example, the reader function is called with a generator function that pops a value from the array on each call.
- When there are no more values, it returns
reader.DONE
, which signals to the stream that there are no more values to yield. - Finally, the for
await...of
loop is used to consume the values from thestream
.
How to transform a stream?
import {reader} from 'strict-stream/reader';
import {map} from 'strict-stream/map';
async function example() {
const array = [1, 2, 3];
const stream = reader<number>(async () => {
const value = array.shift();
return value !== undefined ? value : reader.DONE;
});
const transformedStream = of(stream)
.pipe(
map((value) => value * 2)
);
for await (const value of transformedStream) {
console.log(value);
}
// Output: 2, 4, 6
}
await example();
- In this example, the function passed to reader returns the next value in the array each time it is called, using
array.shift()
. - If there are no more values in the array, it returns the special
reader.DONE
value to indicate that the stream is complete. - The
of
function is then used to create acomposable
stream from theAsyncIterable
returned byreader
. - This stream has a
pipe
method that can be used to apply aseries of transformations
to thestream
. - The
map
operator is used totransform
the stream by multiplying each value by2
. - The
map
operator takes a function that is applied toeach value in the stream
, and returns anew stream with the transformed values
. - Finally, the transformed stream is iterated over using a for
await...of
loop. - In this case, the output will be
2, 4, 6
which are the values of the original array multiplied by2
.
API
of<Type>(inputStream: StrictStream<Type>): StrictStreamOf<Type>
of
is a factory function that creates a new instance of a composable stream by wrapping anAsyncIterable
- The resulting stream can be composed with other stream functions using the
pipe
method.
An example:
import {of} from "strict-stream";
import {map} from "strict-stream/map";
async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
const stream = of(generateIds())
.pipe(
map(async (id) => ({id, name: `User ${id}`}))
);
for await (const data of stream) {
console.log(`Id: ${data.id}, Name: ${data.name}`);
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}
await example();
- The code above is a simple to create and
transform
streams of data. - The
generateIds
function is a generator that yields three numbers (1, 2, and 3) in sequence. - The
of
function is used to create a stream from the generator by passinggenerateIds
as its argument. - The
pipe
method is used to apply a transformation to the stream. - In this case, the
map
function is used to transform each item in the stream. - The
map
function takes a callback that is called with each item in the stream, and returns a new value for that item. - In this case, the callback takes an
id
value and returns an object with two properties:id
andname
. - Finally, the transformed stream is consumed with a
for-await-of
loop, which iterates through each item in the stream and logs its id and name properties to the console. - The output will be
Id: 1, Name: User 1
,Id: 2, Name: User 2
, andId: 3, Name: User 3
.
An example (advanced, custom mapper):
import {of, StrictStreamMapper} from "strict-stream";
async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
// my first stream mapper; maps inputStream to mappedStream;
function myMap<Input, Output>(mapper: (input: Input) => Promise<Output>): StrictStreamMapper<Input, Output> {
// receives inputStream
return (inputStream) => {
return (
async function* () {
// reads input stream
for await (const record of inputStream) {
// map values
yield await mapper(record)
}
}
)()
};
}
const stream = of(generateIds())
.pipe(
myMap(async (id) => ({id, name: `User ${id}`}))
);
for await (const data of stream) {
console.log(`Id: ${data.id}, Name: ${data.name}`);
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}
await example();
of
creates a new stream instance from thegenerateIds
async generator.- the resulting stream is composed with the
myMap
function that transforms eachid
into an object withid
andname
properties. - finally, the transformed stream is iterated using a
for await...of
loop.
from<Input>(streamLike: StrictStreamLike<Input>): StrictStreamOf<Input>
The from
function is used to convert any iterable
object, whether synchronous
or asynchronous
, to a StrictStream
.
It takes a single argument of type StrictStreamLike<Input>
, which can be either an Iterable
or an AsyncIterable
;
The from
function returns a StrictStream
object of type StrictStreamOf<Input>
, which has a pipe
method that can be used to transform the stream
.
StrictStreamLike<Type>
type means AsyncIterable<Type> | Iterable<Type> | Type[]
An example
import {from} from "strict-stream/from";
import {map} from "strict-stream/map";
async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
const streamLike1: Iterable<number> = [1, 2, 3];
const streamLike2: AsyncIterable<number> = generateIds(); // is equivalent
// could consume `streamLike1` or `streamLike2`
const stream = from(streamLike1)
.pipe(
map(async (id) => ({id, name: `User ${id}`}))
);
for await (const data of stream) {
console.log(`Id: ${data.id}, Name: ${data.name}`);
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}
await example();
- The example demonstrates how to use the
from
function to turn an iterable into acomposable stream
. - An asynchronous generator function called
generateIds
is defined that yields the numbers1, 2, and 3.
streamLike1
is defined as an array containing the numbers1, 2, and 3
.streamLike2
is defined as anasync iterable
that is equivalent togenerateIds
.- The
from
function is then used to create a stream fromstreamLike1
. - This stream is then piped through a
map
function that maps each number to an object containing anid
and aname
field. - Finally, the resulting stream is consumed using a
for await
loop
tap<Input>(fn: (input: Input) => Promised<any>): StrictStreamMapper<Input, Input>
tap
is a utility function that allows you to perform side-effects
on each element of a stream without modifying the stream itself.
It works by taking a callback function as an argument, which is invoked for each element of the stream,
but then simply returns the original element, so that it can be passed on to the next step in the pipeline unchanged.
An example:
import {of} from "strict-stream";
import {tap} from "strict-stream/tap";
async function example() {
async function* generateIds() {
yield 1
yield 2
yield 3
}
const transformedStream = of(generateIds())
.pipe(
tap((value) => console.log(value))
);
for await (const value of transformedStream) {
/* empty */
}
// 1
// 2
// 3
}
await example();
- In this example, we start with an asynchronous generator that yields three numbers: 1, 2, and 3.
- We then use the
of
function to wrap this generator in a composable stream, and then use thepipe
method to apply thetap
function to the stream. - The
tap
function simply logs each element of the stream to the console. - Finally, we iterate over the transformed stream using a
for-await-of
loop, which triggers the evaluation of the stream and executes theside-effects
of thetap
function. - However, since tap returns each element unchanged, the loop does not actually output anything to the console.
- The output of the example, therefore, is simply the values 1, 2, and 3, printed to the console by the tap function.
run<Type, Default = undefined>(stream: StrictStream<Type>, defaultValue?: Default): Promise<Type | Default>
- Consumes the given
AsyncIterable
, iterating over its values, and returns a Promise that resolves to thelast value
of thestream
. - If the
stream
is empty, the function returns adefault value
, which is optional and defaults toundefined
.
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
async function example() {
async function* generateIds() {
yield 1
yield 2
yield 3
}
const stream = of(generateIds())
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// Output
// 1
// 2
// 3
}
await example();
- It then creates a stream by calling of with
generateIds
as its argument. It then pipes this stream through a tap operation which logs each value emitted by the stream to the console. - Finally, it calls the run function to execute the stream. The run function returns a Promise that resolves when the stream has completed.
- In this case, it logs the numbers 1, 2, and 3 to the console.
sequence(length: number): StrictStream<number>
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
async function example() {
const sequenceStream = of(sequence(3))
.pipe(
tap((value) => console.log(value))
);
await run(sequenceStream)
// 0
// 1
// 2
}
await example();
- The code is an example of how to use the
sequence
function to generate a stream of numbers with agiven length
- And then use the of and
pipe
functions totransform
thestream
by appending atap
function thatlogs each value
in thestream
to theconsole
. - Specifically, the
sequence
function generates astream of numbers
from0 up to the given length
. - The of function is then used to create a new stream from the output of the
sequence
function, and thepipe
method is called to add thetap
function as a transform to the stream. - Finally, the
run
function is called to consume thestream
andlog each value
to the console.
When the example function is called
- It creates a new stream using
of(sequence(3))
, which generates a stream of numbers from0 to 2
. - The
pipe
method is then used to append atap
function thatlogs each value
in thestream
to the console. - Finally, the
run
function is called to consume the stream and log each value to the console. - The output is: 1, 2, 3
map<Input, Output>(mapper: (input: Input) => Promised<Output>): StrictStreamMapper<Input, Output>
The map
function is a higher-order function that takes a function mapper
as input and returns another function that applies the mapper function to every element in a stream.
The mapper function transforms
each element of the stream and returns a new output element.
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
import {map} from "strict-stream/map";
async function example() {
const sequenceStream = of(sequence(3))
.pipe(
map((id) => id * 2)
)
.pipe(
tap((value) => console.log(value))
);
await run(sequenceStream)
// 0
// 2
// 4
}
await example();
- In the example function, the
of
function is used to create a new stream from thesequence generator
function that generates a sequence of numbers from0 to 2
. - This stream is then
piped
through themap
function, which multiplies each number in the stream by 2. - The resulting stream is then piped through the
tap
function, whichlogs each element
in the stream to the console. - Finally, the
run
function is called to consume the stream and output its elements. - The output of the example function will log the numbers
0, 2, and 4
to the console, which are the result of multiplying the original numbers generated by sequence by2
.
filter<Input>(condition: (input: Input) => Promised<boolean | undefined | null>): StrictStreamMapper<Input, Input>
The filter
function is a higher-order function that takes a condition
function as its input and returns a function that can be used as a stream mapper
.
The condition function is applied to each item in the stream, and only those items for which the condition function returns a truthy value are included in the output stream.
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
import {filter} from "strict-stream/filter";
async function example() {
const stream = of(sequence(3))
.pipe(
filter((id) => id > 0)
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
}
await example();
- In the example function, the
of
function is used to create a stream from thesequence
generator that yields three numbers (0, 1, and 2
). - This stream is then piped to a
filter
mapper that only allows numbersgreater than 0
to pass through. - The resulting stream is then piped to a
tap
mapper thatlogs each item
to the console. - Finally, the
run
function is used to execute the stream and log the output to the console. - The output of this code will be the numbers
1 and 2
, since those are the only numbers in the original sequence that meet the filter conditiongreater than 0
.
reduce<Input, Accumulator>(reducer: (accumulator: Accumulator, input: Input) => Promised<Accumulator>, initial: Accumulator): StrictStreamMapper<Input, Accumulator>
The reduce
function is a higher-order function that takes a reducer
function and an initial value
as input, and returns a new function that can be used to transform
a stream
of values.
The reducer function takes an accumulator
value and an input
value, and returns a new accumulator
value.
The reduce function applies the reducer function to each value in the stream, accumulating
the results into a final value that is emitted by the resulting stream
.
An example
import {of, run} from "strict-stream";
import {sequence} from "strict-stream/sequence";
import {reduce} from "strict-stream/reduce";
async function example() {
const stream = of(sequence(5))
.pipe(
reduce(({counter}) => ({counter: counter + 1}), {counter: 0})
);
const result = await run(stream);
console.log(result)
// { counter: 5 }
}
await example();
- In the example, the reduce function is used to count the number of values in a stream.
- The stream is created using the
sequence
function, which generates a stream of numbers from0 to 4
. - The
reduce
function takes an object with acounter
property as theinitial value
, and a reducer function that increments the counter property for each input value. - The
resulting stream
emits asingle object
with the final value of the counter property, which is5
in this case. - The
run
function is used to execute the stream and log the final result.
batch<Input>(size: number): StrictStreamMapper<Input, Input[]>
batch
is a function that returns amapper
function that takes aninput stream
and emits an array of inputs that are processed inbatches of a given size
.- And when the batch reaches the desired size it emits the batch downstream.
An example
import {of, run} from "strict-stream";
import {sequence} from "strict-stream/sequence";
import {batch} from "strict-stream/batch";
async function example() {
const stream = of(sequence(3))
.pipe(
batch(2)
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// Output
// [ 0, 1 ]
// [ 2 ]
}
await example();
- The example code creates a
sequence
stream of 3 numbers - And pipes it through the
batch
function with abatch size of 2
. - The resulting stream emits two arrays,
- The first with the values
[0, 1]
and the second with the value[2]
. - The
tap
function is used to log each emitted value to the console.
flat<Type>(): StrictStreamMapper<Type | StrictStreamLike<Type>, Type>
- The
flat
function is astream transformer
that flattens the first level of stream or an array (Iterable
). - If the input stream contains arrays or nested streams
- the
flat
function will iterate over each element in the array or nested stream and emit it as a separate item in the output stream.
An example
import {run} from "strict-stream";
import {from} from "strict-stream/from";
import {flat} from "strict-stream/flat";
import {tap} from "strict-stream/tap";
async function example() {
const stream = from(
[
[1, 2],
[3, 4],
5
]
)
.pipe(
flat()
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
// 3
// 4
// 5
}
await example();
- In the example code, the
from
function is used to create a stream from an array that contains nested arrays and a single value. - The
flat
function is then used toflatten
the first level of stream so that each element in the nested arrays is emitted as a separate item in the output stream. - Finally, the
tap
function is used to log each item. - When the
example
function is run, the output stream contains each element in the nested arrays and the single value, emitted as separate items in the stream.
flatMap<Input, Output>(mapper: (input: Input) => Promised<Output | StrictStreamLike<Output>>): StrictStreamMapper<Input, Output>
flatMap
is a function thatmaps each element
of a stream to another stream andthen flattens the first level of resulting stream
of streams into a single stream.- It takes a
mapper
function thatmaps the input element
. - The resulting
stream
is thenflat
mapped, meaning that it is flattened so that all elements are emitted in a single stream.
An example
import {run} from "strict-stream";
import {from} from "strict-stream/from";
import {flatMap} from "strict-stream/flatMap";
async function example() {
type User = {
id: number;
name: string;
orders: Order[];
};
type Order = {
id: number;
product: string;
price: number;
};
const users: User[] = [
{
id: 1,
name: "Alice",
orders: [
{id: 101, product: "Widget A", price: 10.0},
{id: 102, product: "Widget B", price: 20.0},
],
},
{
id: 2,
name: "Bob",
orders: [
{id: 201, product: "Widget C", price: 30.0},
{id: 202, product: "Widget D", price: 40.0},
{id: 203, product: "Widget E", price: 50.0},
],
},
];
async function fetchStreamOfUsers(): Promise<StrictStreamOf<User>> {
return from(users);
}
// StrictStreamOf<{userId: number, orderId: number}
const stream = (await fetchStreamOfUsers())
.pipe(
flatMap(async (user) => {
return from(user.orders)
.pipe(
map(
async (order) => {
return {
userId: user.id,
orderId: order.id,
price: order.price
}
})
)
})
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// { userId: 1, orderId: 101, price: 10 }
// { userId: 1, orderId: 102, price: 20 }
// { userId: 2, orderId: 201, price: 30 }
// { userId: 2, orderId: 202, price: 40 }
// { userId: 2, orderId: 203, price: 50 }
}
await example();
- In the provided example,
flatMap
is used toflatten
the orders of theusers
. - A stream of
users
is created using thefrom
function. - The
flatMap
function is then called on this stream, mapping each user to a stream of orders using thefrom
function again. - The resulting
stream of orders
is thenmapped to an object
with theuserId
,orderId
, andprice
using themap
function. - Finally, the resulting stream of objects is logged using the
tap
function. - When the stream is run using the
run
function, it logs each object in the stream, which contains theuserId
,orderId
, andprice
for each order.
pipe<In, Out>(mapper: StrictStreamMapper<In, Out>): StrictStreamPlumber<In, Out>
- The
pipe
function is used to createcomposable behavior
forStrictStream
s. - It takes a
StrictStreamMapper
as an input, which is a function that transforms aStrictStream
of one type to aStrictStream
of another type. pipe
then returns aStrictStreamPlumber
, which is a function that takes aStrictStream
of the original input type and returns aStrictStream
of the final output type.pipe
also has apipe
method on the returned function, which allows for easycomposition of multiple
StrictStreamMapper
s.
An example
import {run, pipe} from "strict-stream";
import {from} from "strict-stream/from";
import {map} from "strict-stream/map";
async function example() {
// composable behavior
const addFive = pipe(
map((input: number) => input + 4)
)
.pipe(
map(async (input) => input + 1)
)
// High order function to manage / compose part of the pipe
function multiple(x: number) {
return pipe(
map(async (value: number) => value * x)
);
}
const stream = from([1, 2, 3])
.pipe(
addFive
)
.pipe(multiple(2))
.pipe(tap((value) => console.log(value)))
await run(stream)
// 12
// 14
// 16
}
await example();
- In the
example
function, we create two separateStrictStreamMapper
s usingpipe
. - We then use the
multiple
function to create anotherStrictStreamMapper
thatmultiplie
s the input value by agiven number
. - We then
compose
these three mappers usingpipe
and use the resultingStrictStreamPlumber
to create astream of numbers
. - Finally, we
run
thestream
and log each value as it is processed. - The output will be
12, 14, 16
.
An example of flatMap
implementation
There is a composition of map
and flat
functions.
export function flatMap<Input, Output>(mapper: (input: Input) => Promised<Output | StrictStreamLike<Output>>): StrictStreamMapper<Input, Output> {
return pipe(
map(mapper)
).pipe(
flat()
);
}
- The
flatMap
function is implemented using thepipe
function, whichcomposes
a set ofStrictStreamMapper
functions into a singleStrictStreamMapper
. - In the implementation of
flatMap
, themap
function is first applied to themapper
argument - Resulting in a new
StrictStreamMapper
that transforms the input values using themapper
function. - This transformation may result in an output value or a
StrictStreamLike
object that contains a set of output values. - The resulting
StrictStreamMapper
is then piped into theflat
function, which flattens anyStrictStreamLike
objects into a stream of individual output values.
scaleSync<Input, Output>(size: number, mapper: (input: Input) => Promised<Output>): StrictStreamMapper<Input, Output>
- Basically the
map
function with desiredconcurrency
to process records. That keeps the ordering of output stream unchanged. - The
scaleSync
function takes two arguments, the first one is anumber
which represents the concurrency, and the second one is a mapper function that maps theinput
to theoutput
.
An example
import {run, of} from "strict-stream";
import {scaleSync} from "strict-stream/scaleSync";
async function fetchUserById(id: number) {
// some logic to fetch the use
return {
id,
userName: `User ${id}`
};
}
async function getUserIds() {
return sequence(3);
}
async function example() {
const usersStream = of(await getUserIds())
.pipe(
// run's the async queries concurrently, keeps the ordering of output stream unchanged
scaleSync(5, async (id) => fetchUserById(id))
)
.pipe(
tap((value) => console.log(value))
);
await run(usersStream)
// { id: 0, userName: 'User 0' }
// { id: 1, userName: 'User 1' }
// { id: 2, userName: 'User 2' }
}
await example();
- In the example, the
scaleSync
function is used tofetch user
details for a given set of user ids. - The
fetchUserById
function fetches the user detailsasynchronously
for a givenuser id
, and thegetUserIds
functiongenerates a stream
of user ids. - The
usersStream
is created with concurrency of 5, and executing thefetchUserById
function for each id. - The resulting
user details
are logged to the console using thetap
function.
concatenate<T>(...streams: StrictStream<any>[]): StrictStream<T>
concatenate
is a function that concatenates multiple streams into a single stream- ensuring that the records are read sequentially one by one, and maintains the ordering of the output stream unchanged.
- The implementation of the function is done using rest parameters to allow for an
arbitrary number of streams to be concatenated
An example
import {run, of} from "strict-stream";
import {concatenate} from "strict-stream/concatenate";
import {from} from "strict-stream/from";
import {tap} from "strict-stream/tap";
async function* generateIds() {
yield 10
yield 20
yield 30
}
async function example() {
const streamLike1: Iterable<number> = [1, 2, 3];
const streamLike2: AsyncIterable<number> = generateIds(); // is equivalent
const stream = from(
concatenate(
from(streamLike1),
from(streamLike2),
)
).pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
// 3
// 10
// 20
// 30
}
await example();
- In the provided example, two stream-likes, one iterable and one async iterable, are concatenated using
concatenate
. - The resulting stream is then converted into a strict stream using the
from
function - And a
tap
operation is performed on it to log each record. - Finally, the stream is run using the
run
function, which is a utility function to consume and execute the stream. - The output shows that the resulting stream contains all the records from both input streams in the correct order.
interval(ms: number, startImmediate = false): IInterval
interval
is a function that creates astream
that emits a sequence of integers at regular intervals.- It takes two parameters: the
duration
of the interval inmilliseconds
, and aboolean flag
indicating whether the stream should start emittingimmediately
or after one interval has elapsed. - The function returns a
StrictStream
object with an additional methodstop
that can be used to stop the interval stream.
An example
import {run, of} from "strict-stream";
import {tap} from "strict-stream/tap";
import {map} from "strict-stream/map";
import {interval} from "strict-stream/interval";
async function example() {
// every 300ms
const source = interval(300);
let counter = 0;
const stream = of(source)
.pipe(
map(() => {
counter++
if (counter > 3) {
// stops the interval stream
source.stop()
}
return counter;
})
)
.pipe(
tap((value) => console.log(value))
)
await run(stream)
// 1
// 2
// 3
// 4
}
await example();
- This example creates an
interval stream
that emits every300ms
- And uses the
map
operator to increment a counter and stop the stream after4 emissions
. - The
tap
operator is used to log the emitted values to the console.
Node.JS integration
nodeReadable<Output>(readable: Readable): StrictStreamOf<Output>
Turns readable to StrictStreamOf
import {nodeReadable} from "strict-stream/nodeReadable";
import {Readable} from "stream";
const readable = Readable.from('Hello Stream');
const stream = nodeReadable<string>(readable)
.pipe(map((chunk) => `${chunk} + OK`))
nodeWritable<Type>(writable: Writable, encoding: BufferEncoding = 'utf-8'): StrictStreamMapper<Type, Type>
Integrates writable stream
import {from} from "strict-stream/from";
import {nodeWritable} from "strict-stream/nodeWritable";
import {Readable, Writable} from "stream";
const written: { chunk: any }[] = []
const myWritable = new Writable({
write(chunk, encoding: BufferEncoding, callback) {
written.push({chunk})
callback()
},
});
const buffer = Buffer.from([100, 101, 102]);
const stream = from([buffer])
.pipe(nodeWritable(myWritable));
nodeTransform<Input, Output>(transform: Transform, options: ReadableOptions = {}): StrictStreamMapper<Input, Output>
Integrates transform stream
import {from} from "strict-stream/from";
import {nodeTransform} from "strict-stream/nodeTransform";
import {Readable, Transform} from "stream";
const myTransform = new Transform({
transform(chunk: any, encoding, callback) {
callback(null, `${chunk} + OK`)
},
});
const stream = from(Readable.from('Hello'))
.pipe(nodeTransform(myTransform));
Beta API
Beta Transformations
scale<Input, Output>(max: number, mapper: (input: Input) => Promised<Output>): StrictStreamMapper<Input, Output>
Maps the stream with max
concurrently. Does not guarantee the ordering of stream items for sure. See scaleSync
for the ordered stream.
const out = of(sequence(4))
.pipe(
scale(10, async (value) => {
return value
})
);
batchTimed<Input>(size: number, maxTimeout: Milliseconds): StrictStreamMapper<Input, Input[]>
Emit batches by size
or maxTimeout
; Useful in the infinity
streams to handle batches.
// batch by timeout
const stream = of(sequence(5))
.pipe(tap(() => {
return delay(100)
}))
.pipe(batchTimed(2, 10));
// batch by timeout
const stream = of(sequence(5))
.pipe(tap(() => {
return delay(100)
}))
.pipe(batchTimed(2, 10));
// batch by size
const stream = of(sequence(5))
.pipe(tap(() => {
return delay(10)
}))
.pipe(batchTimed(2, 500));
buffer<Input>(size: number): StrictStreamMapper<Input, Input>
Simply adds a bit of buffer to have more room for reader / upstream.
const out = of(sequence(4))
.pipe(
buffer(3)
);
Beta Source Operations
merge<Type>(...streams: StrictStream<any>[]): StrictStream<Type>
Merge streams concurrently. Does not guarantee the ordering. See concatenate
for ordered streams.
const usersV1Stream = from([{type: 'userV1', name: 'User Name'}])
.pipe(tap(() => delay(100)));
const usersV2Stream = from([{type: 'userV2', firstName: 'User', lastName: 'Name'}]);
const usersStream = merge(usersV1Stream, usersV2Stream);
objectReader<T extends object | object[]>(read: () => Promised<T | null | undefined | boolean | number>): StrictStream<T>
Simplifies reading source of objects;
const array = [{id: 1}, {id: 2}]
const stream = objectReader(() => array.shift());
Utilities
toArray<T>(input: StrictStream<T>): Promise<T[]>
Not recommended for production usage. Could lead to RAM consumption.
const stream = from([1, 2, 3]);
const outputs = await toArray(stream);
expect(outputs).toEqual([1, 2, 3])
License
strict-stream
is licensed under the MIT License.