npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

stream-editor

v1.11.0

Published

Utility for executing RegEx replacement on streams as well as transcoding/teeing/confluencing them.

Downloads

1,587

Readme

Stream-Editor

npm install size codecov CI Node.js Version

Features

Partial replacement

A partial replacement is replacing only the 1st parenthesized capture group substring match with replacement specified, allowing a simpler syntax and a minimum modification.

Take the following snippet converting something like import x from "../src/x.mjs" into import x from "../build/x.mjs" as an example:

import { sed as updateFileContent } from "stream-editor";

updateFileContent({
  file: "index.mjs",
  search: matchParentFolderImport(/(src\/(.+?))/),
  replacement: "build/$2",
  maxTimes: 2,
  required: true
});

function matchImport (addtionalPattern) {
  const parts = /import\s+.+\s+from\s*['"](.+?)['"];?/.source.split("(.+?)");

  return new RegExp([
    parts[0],
    addtionalPattern.source,
    parts[1]
  ].join(""));
}

Special replacement patterns (parenthesized capture group placeholders) are well supported in a partial replacement, either for function replacements or string replacements. And all other concepts are designed to keep firmly to their origins in vanilla String.prototype.replace method, though the $& (also the 1st supplied value to replace function) and $1 (the 2nd param passed) always have the same value, supplying the matched substring in 1st PCG.

You can specify a truthy isFullReplacement to perform a full replacment instead.

Asynchronous replacement

Yes, asynchronous function replacements just work like a charm.

import { streamEdit } from "stream-editor";

const filepath = "./index.js";
const dest = "./build/index.js";

streamEdit({
  from: fs.createReadStream(filepath),
  to: fs.createWriteStream(dest),
  replace: [
    {
      // match ${{ import("module.mjs") }} | ${{ expr( 1 + 1 ) }}
      match: /\$\{\{\s*([A-Z_-]+?)\s*\(\s*(.+?)\s*\)\s*\}\}/i,
      replacement: async (whole, method, input) => {
        switch (method.toUpperCase()) {
          case "IMPORT":
            input = input.replace(/^["']|["']$/g, "");
            const importFilePath = path.join(path.dirname(filepath), input);
            return fs.promises.readFile(importFilePath, "utf-8");
          case "EXPR":
            return (async () => String(await eval(input)))();
          default:
            throw new Error(`unknown method ${method} in ${whole}`);
        }
      }
    }
  ],
  defaultOptions: {
    isFullReplacement: true
  }
});

Substituting texts within files in streaming fashion

This package will create readable and writable streams connected to a single file at the same time, while disallowing any write operations to advance further than the current reading index. This feature is based on rw-stream's great work.

To accommodate RegEx replacement (which requires intact strings rather than chunks that may begin or end at any position) with streams, we brings separator (default: /(?<=\r?\n)/) and join (default: '') options into use. You should NOT specify separators that may divide text structures targeted by your RegEx searches, which would result in undefined behavior.

Moreover, as the RegEx replacement part in options is actually optional, stream-editor can also be used to break up streams and reassemble them like split2 does:

// named export sed is an alias for streamEdit
const { streamEdit } = require("stream-editor");

const filepath = join(__dirname, `./file.ndjson`);

/* replace CRLF with LF */
await streamEdit({
  file: filepath,
  separator: "\r\n",
  join: "\n"
});

/* parse ndjson */
await streamEdit({
  from: createReadStream(filepath),
  to: new Writable({
    objectMode: true,
    write(parsedObj, _enc, cb) {
      return (
        doSomething()
          .then(() => cb())
          .catch(cb)
      );
    }
  }),
  separator: "\n",
  readableObjectMode: true,
  postProcessing: part => JSON.parse(part),
  abortController: new AbortController() // ...
});

You can specify null as the separator to completely disable splitting.

Setting limits on Regular Expressions' maximum executed times

This is achieved by altering all replacement into replacement functions and adding layers of proxying on them.

const { sed: updateFiles } = require("stream-editor");

/**
 * add "use strict" plus a compatible line ending
 * to the beginning of every commonjs file.
 */

// maxTimes version
updateFiles({
  files: commonjsFiles,
  match: /^().*(\r?\n)/,
  replacement: `"use strict";$2`,
  maxTimes: 1
});

// limit version
updateFiles({
  files: commonjsFiles,
  replace: [
    {
      match: /^().*(\r?\n)/,
      replacement: `"use strict";$2`,
      /**
       * a local limit,
       * applying restriction on certain match's maximum executed times.
       */
      limit: 1 
    }
  ]
  // a global limit, limit the maximum count of every search's executed times.
  limit: 1 
});

Once the limit specified by option limit is reached, underlying transform stream will become a transparent passThrough stream if option truncate is falsy, otherwise the remaining part will be discarded. In contrast, maxTimes just performs a removal on that search.

Transcoding streams or files

streamEdit({
  from: createReadStream("gbk.txt"),
  to: createWriteStream("hex.txt"),
  decodeBuffers: "gbk",
  encoding: "hex"
});

Option decodeBuffers is the specific character encoding, like utf-8, iso-8859-2, koi8, cp1261, gbk, etc for decoding the input raw buffer. Some encodings are only available for Node embedded the entire ICU but the good news is that full-icu has been made the default since v14+ (see https://github.com/nodejs/node/pull/29522).

Note that option decodeBuffers only makes sense when no encoding is assigned and stream data are passed as buffers. Below are some wrong input examples:

streamEdit({
  from: 
    createReadStream("gbk.txt").setEncoding("utf8"),
  to: createWriteStream("hex.txt"),
  decodeBuffers: "gbk",
  encoding: "hex"
});

streamEdit({
  from: 
    createReadStream("gbk.txt", "utf8"),
  to: createWriteStream("hex.txt"),
  decodeBuffers: "gbk",
  encoding: "hex"
});

Option encoding is for encoding all processed and joined strings to buffers with according encoding. Following options are supported by Node.js: ascii, utf8, utf-8, utf16le, ucs2, ucs-2, base64, latin1, binary, hex.

Piping/teeing/confluencing streams with proper error handling & propagation

Confluence:

const yamlFiles = await (
  fsp.readdir(folderpath, { withFileTypes: true })
      .then(dirents =>
        dirents
          .filter(dirent => dirent.isFile() && dirent.name.endsWith(".yaml"))
          .sort((a, b) => a.name.localeCompare(b.name))
          .map(({ name }) => createReadStream(join(folderpath, name)))
      )
);

streamEdit({
  from: yamlFiles,
  to: createWriteStream(resultPath),
  contentJoin: "\n\n" // join streams
  // the encoding of contentJoin respects the `encoding` option
});

Teeing:

streamEdit({
  readableStream: new Readable({
    read(size) {
      // ...
    }
  }),
  writableStreams: new Array(6).fill(0).map((_, i) =>
    createWriteStream(join(resultFolderPath, `./test-source${i}`))
  )
});

You can have a look at tests regarding error handling here.

No dependency

stream-editor previously depends on rw-stream, but for some historical reasons, I refactored rw-stream and bundled it as a part of this package. See src/rw-stream.

Currently, stream-editor has zero dependency.

High coverage tests

See https://github.com/edfus/stream-editor/tree/master/test.


  Normalize & Replace
    √ can handle sticky regular expressions
    √ can handle string match with special characters
    √ can handle partial replacement with placeholders
    √ can handle non-capture-group parenthesized pattern: Assertions
    √ can handle non-capture-group parenthesized pattern: Round brackets
    √ can handle pattern starts with a capture group
    √ can handle malformed (without capture groups) partial replacement
    √ can await replace partially with function
    √ recognize $\d{1,3} $& $` $' and check validity (throw warnings)
    √ produce the same result as String.prototype.replace

  Edit streams
    √ should check arguments
    √ should warn unknown/unneeded options
    √ should respect FORCE_COLOR, NO_COLOR, NODE_DISABLE_COLORS
    √ should pipe one Readable to multiple dumps (54ms)
    √ should replace CRLF with LF
    √ should have replaced /dum(b)/i to dumpling (while preserving dum's case)
    √ should have global and local limits on replacement amount
    √ should have line buffer maxLength
    √ should edit and combine multiple Readable into one Writable
    √ has readableObjectMode
    √ can handle async replacements
    √ can signal an unsuccessful substitution using beforeCompletion
    √ can declare a limit below which a substitution is considered failed for a search
    cancelation
      √ should check validity
      √ can abort a substitution before it has completed.
      √ can handle already aborted controller
      √ doesn't have memory leaks (is using WeakRef)
    truncation & limitation
      √ truncating the rest when limitations reached
      √ not: self rw-stream
      √ not: piping stream
    transcoding
      √ gbk to utf8 buffer
      √ gbk to hex with HWM
    error handling
      √ destroys streams properly when one of them closed prematurely
      √ destroys streams properly if errors occurred during initialization
      √ multiple-to-one: can correctly propagate errors emitted by readableStreams
      √ multiple-to-one: can handle prematurely destroyed readableStreams
      √ multiple-to-one: can correctly propagate errors emitted by writableStream
      √ multiple-to-one: can handle prematurely ended writableStream
      √ multiple-to-one: can handle prematurely destroyed writableStream
      √ one-to-multiple: can correctly propagate errors emitted by writableStreams
      √ one-to-multiple: can handle prematurely ended writableStreams
      √ one-to-multiple: can handle prematurely destroyed writableStreams
      √ can handle errors thrown from postProcessing
      √ can handle errors thrown from join functions
      √ can handle errors thrown from replacement functions
    corner cases
      √ can handle empty content
      √ can handle regular expressions that always match
      √ can handle non-string in a regExp separator's split result
    try-on
      √ can handle files larger than 64KiB


  49 passing (275ms)

API

Overview

This package has two named function exports: streamEdit and sed (an alias for streamEdit).

streamEdit returns a promise that resolves to void | void[] for files, a promise that resolves to Writable[] | Writable for streams (which keeps output streams' references).

An object input with one or more following options is acceptable to streamEdit:

Options for replacement

| name | alias | expect | safe to ignore | default | | :--: | :-: | :-----: | :-: | :--: | | search | match | string | RegExp | ✔ | none | | replacement | x | string | [async] (wholeMatch, ...args) => string | ✔ | none | | limit | x | number | ✔ | Infinity | | maxTimes | x | number | ✔ | Infinity | | minTimes | x | number | ✔ | 0 | | required | x | boolean | ✔ | false | | isFullReplacement | x | boolean | ✔ | false | | disablePlaceholders |x| boolean | ✔ | false | | replace | x | an Array of { search, replacement } | ✔ | none | | defaultOptions| x | BasicReplaceOptions | ✔ | {} | | join | x | string | (part: string) => string | null | ✔ | part => part | | postProcessing| x | (part: string, isLastPart: boolean) => any | ✔ | none | | beforeCompletion| x | () => promise<void> | void | ✔ | none |

type GlobalLimit = number;
type LocalLimit = number;

interface BasicReplaceOptions {
  /**
   * Perform a full replacement or not.
   * 
   * A RegExp search without capture groups or a search in string will be
   * treated as a full replacement silently.
   */
  isFullReplacement?: Boolean;
  /**
   * Only valid for a string replacement.
   * 
   * Disable placeholders in replacement or not. Processed result shall be
   * exactly the same as the string replacement if set to true.
   * 
   * Default: false
   */
  disablePlaceholders?: Boolean;
  /**
   * Apply restriction on certain search's maximum executed times.
   * 
   * Upon reaching the limit, if option `truncate` is falsy (false by default),
   * underlying transform stream will become a transparent passThrough stream.
   * 
   * Default: Infinity. 0 is considered as Infinity for this option.
   */
  limit?: LocalLimit;
  /**
   * Observe a certain search's executed times, remove that search right
   * after upper limit reached.
   * 
   * Default: Infinity. 0 is considered as Infinity for this option.
   */
  maxTimes?: number;
  /**
   * For the search you specified, add a limit below which the substitution
   * is considered failed.
   */
  minTimes?: number;
  /**
   * Sugar for minTimes = 1
   */
  required?: boolean;
}

interface SearchAndReplaceOptions extends BasicReplaceOptions {
    /**
   * Correspondence: `String.prototype.replaceAll`'s 1st argument.
   * 
   * Accepts a literal string or a RegExp object.
   * 
   * Will replace all occurrences by converting input into a global RegExp
   * object, which means that the according replacement might be invoked 
   * multiple times for each full match to be replaced.
   * 
   * Every `search` and `replacement` not arranged in pairs is silently
   * discarded in `options`, while in `options.replace` that will result in
   * an error thrown.
   */
  search?: string | RegExp;

  /**
   * Correspondence: String.prototype.replace's 2nd argument.
   * 
   * Replaces the according text for a given match, a string or
   * a function that returns the replacement text can be passed.
   * 
   * Special replacement patterns (parenthesized capture group placeholders)
   *  / async replacement functions are well supported.
   * 
   * For a partial replacement, $& (also the 1st supplied value to replace
   * function) and $1 (the 2nd param passed) always have the same value,
   * supplying the matched substring in the parenthesized capture group
   * you specified.
   */
  replacement?: string | ((wholeMatch: string, ...args: string[]) => string);
}

interface MultipleReplacementOptions {
  /**
   * Apply restriction on the maximum count of every search's executed times.
   * 
   * Upon reaching the limit, if option `truncate` is falsy (false by default),
   * underlying transform stream will become a transparent passThrough stream.
   * 
   * Default: Infinity. 0 is considered as Infinity for this option.
   */
  limit?: GlobalLimit;
  /**
   * Should be an array of { [ "match" | "search" ], "replacement" } pairs.
   * 
   * Possible `search|match` and `replacement` pair in `options` scope will be
   * prepended to `options.replace` array, if both exist.
   */
  replace?: Array<SearchAndReplaceOptions | MatchAndReplaceOptions>;
  /**
   * Default: {}
   */
  defaultOptions?: BasicReplaceOptions;
}

type ReplaceOptions = MultipleReplacementOptions `OR` SearchAndReplaceOptions;

interface BasicOptions extends ReplaceOptions {
  /**
   * Correspondence: String.prototype.join's 1nd argument, though a function 
   * is also acceptable.
   * 
   * You can specify a literal string or a function that returns the post-processed
   * part.
   * 
   * Example function for appending a CRLF: part => part.concat("\r\n");
   * 
   * Default: part => part
   */
  join?: string | ((part: string) => string) | null;
  /**
   * A post-processing function that consumes transformed strings and returns a
   * string or a Buffer. This option has higher priority over option `join`.
   * 
   * If readableObjectMode is enabled, any object accepted by Node.js objectMode
   * streams can be returned.
   */
  postProcessing?: (part: string, isLastPart: boolean) => any;
  /**
   * This optional function will be called before the destination(s) close,
   * delaying the resolution of the promise returned by streamEdit() until
   * beforeCompletion resolves.
   * 
   * You can also return a rejected promise or simply raise an error to signal a
   * failure and destroy all streams.
   */
  beforeCompletion?: () => Promise<void> | void;
}

Options for stream transform

| name | alias | expect | safe to ignore | default | | :--: | :-: | :-----: | :-: | :--: | | separator | x | string | RegExp | null| ✔ | /(?<=\r?\n)/ | | encoding | x | string | null | ✔ | null | | decodeBuffers | x | string | ✔ | "utf8" | | truncate | x | boolean | ✔ | false | | maxLength | x | number | ✔ | Infinity | | readableObjectMode| x | boolean | ✔ | false | | abortController| x | AbortController | ✔ | null |

Options that are only available under certain context: | name | alias | expect | context | default | | :--: | :-: | :-----: | :-: | :--: | | readStart | x | number | file[s]| 0 | | writeStart | x | number | file[s]| 0 | | contentJoin | x | string | Buffer | readableStreams | "" |

interface BasicOptions extends ReplaceOptions {
  /**
   * Correspondence: String.prototype.split's 1nd argument.
   * 
   * Accepts a literal string or a RegExp object.
   * 
   * Used by underlying transform stream to split upstream data into separate
   * to-be-processed parts.
   * 
   * String.prototype.split will implicitly call `toString` on non-string &
   * non-regex & non-void values.
   * 
   * Specify `null` or `undefined` to process upstream data as a whole.
   * 
   * Default: /(?<=\r?\n)/. Line endings following lines.
   */
  separator?: string | RegExp | null;
  /**
   * Correspondence: encoding of Node.js Buffer.
   * 
   * If specified, then processed and joined strings will be encoded to buffers
   * with that encoding.
   *
   * Node.js currently supportes following options:
   * "ascii" | "utf8" | "utf-8" | "utf16le" | "ucs2" | "ucs-2" | "base64" | "latin1" | "binary" | "hex"
   * Default: null.
   */
  encoding?: BufferEncoding | null;
  /**
   * Correspondence: encodings of WHATWG Encoding Standard TextDecoder.
   * 
   * Accept a specific character encoding, like utf-8, iso-8859-2, koi8, cp1261,
   * gbk, etc for decoding the input raw buffer.
   * 
   * This option only makes sense when no encoding is assigned and stream data are 
   * passed as Buffer objects (that is, haven't done something like
   * readable.setEncoding('utf8'));
   * 
   * Example: streamEdit({
   *    from: createReadStream("gbk.txt"),
   *    to: createWriteStream("utf8.txt"),
   *    decodeBuffers: "gbk"
   * });
   * 
   * Some encodings are only available for Node embedded the entire ICU (full-icu).
   * See https://nodejs.org/api/util.html#util_class_util_textdecoder.
   * 
   * Default: "utf8".
   */
  decodeBuffers?: string;
  /**
   * Truncating the rest or not when limits reached.
   * 
   * Default: false.
   */
  truncate?: Boolean;
  /**
   * The maximum size of the line buffer.
   * 
   * A line buffer is the buffer used for buffering the last incomplete substring
   * when dividing chunks (typically 64 KiB) by options.separator.
   * 
   * Default: Infinity.
   */
  maxLength?: number;
  /**
   * Correspondence: readableObjectMode option of Node.js stream.Transform
   * 
   * Options writableObjectMode and objectMode are not supported.
   * 
   * Default: Infinity.
   */
  readableObjectMode?: boolean;
  /**
   * An optional controller object that allows you to abort one or more
   * substitutions as and when desired.
   * 
   * Node version >= 15.0.0 is required.
   */
  abortController?: AbortController;
}

interface UpdateFileOptions extends BasicOptions {
  file: string;
  readStart?: number;
  writeStart?: number;
}

interface UpdateFilesOptions extends BasicOptions {
  files: string[];
  readStart?: number;
  writeStart?: number;
}

interface MultipleReadablesToWritableOptions<T> extends BasicOptions {
  from: Array<Readable>;
  to: T;
  /**
   * Concatenate results of transformed Readables with the input value.
   * Accepts a literal string or a Buffer.
   * option.encoding will be passed along with contentJoin to Writable.write
   * Default: ""
   */
  contentJoin: string | Buffer;
}

interface MultipleReadablesToWritableOptionsAlias<T> extends BasicOptions {
  readableStreams: Array<Readable>;
  writableStream: T;
  contentJoin: string | Buffer;
}

Options for stream input/output

| name | alias | expect | with | default | | :--: | :-: | :-----: | :-: | :--: | | file | x | string | self | none | | files | x | an Array of string | self | none | | readableStream | from | Readable | writableStream[s]| none | | writableStream | to | Writable | readableStream[s]| none | | readableStreams| from | an Array of Readable | writableStream | none | | writableStreams| to | an Array of Writable | readableStream | none |

file:

interface UpdateFileOptions extends BasicOptions {
  file: string;
  readStart?: number;
  writeStart?: number;
}
function streamEdit(options: UpdateFileOptions): Promise<void>;

files:

interface UpdateFilesOptions extends BasicOptions {
  files: string[];
  readStart?: number;
  writeStart?: number;
}

function streamEdit(options: UpdateFilesOptions): Promise<void[]>;

transform Readable:

interface TransformReadableOptions<T> extends BasicOptions {
  [ from | readableStream ]: Readable;
  [ to   | writableStream ]: T;
}

function streamEdit<T extends Writable>(
  options: TransformReadableOptions<T>
): Promise<T>;

readables -> writable:

interface MultipleReadablesToWritableOptions<T> extends BasicOptions {
  [ from | readableStreams ]: Array<Readable>;
  [ to   | writableStream  ]: T;
  contentJoin: string | Buffer;
}

function streamEdit<T extends Writable>(
  options: MultipleReadablesToWritableOptions<T>
): Promise< T >;

readable -> writables

interface ReadableToMultipleWritablesOptions<T> extends BasicOptions {
  [ from | readableStream  ]: Readable;
  [ to   | writableStreams ]: Array<T>;
}

function streamEdit<T extends Writable>(
  options: ReadableToMultipleWritablesOptions<T>
): Promise< T[]>;

For further reading, take a look at the declaration file.

Examples

See ./examples and esm2cjs