loop-stream
v1.4.1
Published
Simple function for looping your Node.js stream with C-like for loop features: break and continue
Downloads
6
Maintainers
Readme
loop-stream
Simple function for looping your Node.js stream with C-like for loop features:
break
andcontinue
Why use this package?
If all you want to do is consume a Node.js stream in total, then probably the easiest way is async iteration.
However if you only want to consume it partly, there are some limitations. Using break
will result in closing the reading stream.
Imagine the user is writing to stdin
of your program and you want to reply to their 'Hello" with 'Hi!' and then just leave the stream for consumption to the other part of your application.
for await (const chunk of process.stdin) {
if(chunk.toString() === 'Hello\n') {
console.log('Hi!');
break;
}
}
process.stdin.on('data', (chunk) => {
// This will never run :(
});
instead you could just use loopStream
:
await loopStream(process.stdin, (chunk) => {
if(chunk.toString() === 'Hello\n') {
console.log('Hi!');
return { action: 'break' };
}
return { action: 'continue' };
})
process.stdin.on('data', (chunk) => {
// This will work as expected!
});
More examples
Parsing HTTP headers
There are some more use case's for this little function. Let's suppose you want to parse incoming HTTP headers and then read the body using different listeners.
async function readHttpHeaders(stream: Readable): Promise<Record<string, string>> {
/* Assuming the structure of the request is
Header1: Value1\r\n
Header2: Value2\r\n
\r\n
[Body]
*/
return loopStream(stream, '', (chunk, acc) => {
acc += chunk;
const headEndSeqIndex = acc.indexOf("\r\n\r\n");
// if we didn't get to the end of headers section of HTTP request, just continue reading the headers
if (headEndSeqIndex === -1) {
return { action: "continue", acc };
}
const rawHeaders = acc.slice(0, headEndSeqIndex);
// when we know we already got to the end of headers we have to make sure we didn't read a part of HTTP body
const bodyBeginning = acc.slice(headEndSeqIndex + "\r\n\r\n".length);
// if so, we want to return it as 'unconsumedData' so it can be unshifted into the original stream
return {
action: "break",
acc: rawHeaders,
unconsumedData: bodyBeginning
};
});
}
// Consume just the headers part of the HTTP request
const headers = await readHttpHeaders(request);
// and then we just read the remaining stream to get the body
let body = '';
for await (const bodyChunk of request)
body += bodyChunk;
Install
npm install loop-stream
Reference
loopStream(stream, iter)
stream
- Readable streamiter
- IterateWithoutState- Returns - Promise that resolves either when
{ action: 'break' }
is returned or when the stream has ended.
IterateWithoutState
- chunk - Chunk of stream data obtained using
stream.read()
- Returns -
{ action: 'continue'; }
or{ action: 'break'; unconsumedData?: any; };
Consumes chunk of data.
Example:
await loopStream(process.stdin, (chunk) => {
if(chunk === 'Hi\n') {
console.log('Hello');
return { action: 'break' };
}
return { action: 'continue' };
});
loopStream(stream, acc, iter)
stream
- Readable streaminitialAcc
- initial value of the accumulatoriter
- IterateWithState- Returns - Promise that resolves either when
{ action: 'break' }
is returned or when the stream has ended.
It's a bit like arr.reduce()
for streams. You can accumulate chunks from streams into some value the will be later returned from loopStream.
IterateWithState
- chunk - Chunk of stream data obtained using
stream.read()
- acc - Accumulator returned from the previous iteration. In the first iteration it's
initialAcc
. - Returns -
{ action: 'continue'; acc: Accumulator; }
or{ action: 'break'; unconsumedData?: any; acc: Accumulator; };
Consumes chunk of data and accumulates processed stream.
Example:
const stream = new PassThrough({ objectMode: true });
[2, 1, 3, -1, 3].forEach(num => stream.write(num));
const sum = await loopStream(stream, 0, (chunk, acc: number) => {
if(chunk === -1) {
return { action: 'break', acc };
}
return { action: 'continue', acc: acc + chunk };
});
// sum: 6