npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

push-buffer

v1.1.1

Published

Abstraction for converting between push/pull APIs, and managing asynchronous value distribution

Downloads

184

Readme

push-buffer

An abstraction for handling buffering of asynchronous pull-based and push-based operations.

  • Request a value from the buffer, receive a Promise.
  • Depending on mode of operation, values are either acquired through an asynchronous callback (pull), or provided externally at a later point (push).
  • Values that come in when there are no outstanding request are buffered, to be reconciled with later requests; ie. the abstraction matches both requests and results, and reconciles them in an order-preserving manner.
  • Support for dividing values into 'lanes'; ie. independent request queues where values get assigned to specific lanes based on some predicate function, for implementing complex value distribution patterns.
  • Supports @promistream/NoValue to explicitly represent cases where no value is produced; this means you can send null and undefined to a queue just like any other value.
  • Values and errors can be sent to one lane, multiple lanes (multicast), or all lanes (broadcast).

Typical usecases

  • Implementing Promistreams, particularly forking streams.
  • Implementing asynchronous task queues and task distribution systems.
  • Splitting out asynchronously obtain values into different worker threads.
  • Converting push-based APIs (eg. EventEmitters) into pull-based APIs (asynchronous iterators, Promistreams, pull-streams, etc.)

Caveats

  • The internal queues are unbounded, and so may use an arbitrary amount of memory. You should ensure that you are continuously requesting values from all lanes, so that they never become too full.
  • Likewise, there is nothing that prevents results from sitting in a lane forever and never actually being read, even when the process terminates; you must continuously drive reads to ensure you are not missing any values.
  • The buffer can currently only operate in either push or pull mode, because there are no obviously-correct semantics for a combined mode; if you still need a combined mode, please file an issue describing your exact usecase, so that I can better understand what the real-world requirements of such cases are.
  • While using this abstraction is much less error-prone than manually implementing asynchronous reconciliation, it is still somewhat difficult to get right, and very difficult to debug when you don't; if possible, prefer using higher-level off-the-shelf libraries for your usecase instead.

Lanes

This library supports a concept of 'lanes'. Each lane is an independent queue of values and errors; you make a request for a value to a specific lane, and values and errors are assigned to specific lane, even though the source of values is shared among all of them. Additionally, it is possible to broadcast errors or values to all lanes.

If you just want to convert a push-based API to a pull-based API, you probably won't need lanes, and you can safely ignore all options related to it. However, if you are implementing some kind of value or task distribution mechanism, you will probably want to use lanes to ensure that the right values end up at the right place. It's up to you what to do with the lane assignments; they're just a list of zero-indexed queues internally, and you can map the lane indexes to some sort of other lookup table in your own code if needed.

If you are implementing a forking Promistream, the most useful lane configuration will most likely be: one lane per output stream, with error broadcasts enabled.

Example

API

let buffer = pushBuffer(options)

Note that using lanes is optional; all options and functions default to lane 0, which is the only lane when no lane count is specified (and so it functions as if lanes didn't exist).

  • options: The settings for this pushBuffer.
    • mode: Default: "pull". The mode to operate in. One of "push", "pull".
    • lanes Default: 1. The amount of lanes to create. Each lane is an independent queue of values and errors, though broadcasts are possible.
    • pull: Required only in pull mode. An (async) callback that's called to acquire a new value/result, and which is expected to return a Promise. This should be providing your values in pull mode.
    • sequential: Default: false. Only used in pull mode. Whether to handle requests sequentially; that is, a second pull will not be started until the previous pull has completed, even if a second request is made.
    • select: Default: 0. A callback that receives a value, and returns the index of the lane to assign it to (or an array of such indexes). The callback may return a Promise.
    • selectError: Default: 0. Like select, but it receives errors instead of values.
    • broadcastValues: Default: false. Whether to broadcast all values to all lanes by default. The select callback will not be called for broadcast values. In push mode, this can be overridden for individual values.
    • broadcastErrors: Default: true. Like broadcastValues, but for errors. The selectError callback will not be called for broadcast errors. In push mode, this can be overridden for individual errors.

Returns: A new pushBuffer.

buffer.request(lane)

Requests the next value (for a given lane).

  • lane: Default: 0. The lane to request a value for.

Returns: a Promise that will eventually either resolve or reject, depending on the value/error acquired.

buffer.push(value, broadcast)

Pushes a value (or a Promise) to the next pending request.

  • value: Required. The value to push.
  • broadcast: Default: the broadcastValues setting. Whether to broadcast this value to all lanes.

buffer.pushError(error, broadcast)

Pushes an error to the next pending request. Note that if you wish to push a Promise that may fail (rather than an error you already have), you should use buffer.push instead; it will automatically be handled as an error if it ends up rejecting.

  • error: Required. The value to push.
  • broadcast: Default: the broadcastErrors setting. Whether to broadcast this error to all lanes.

buffer.countLane(lane)

Provides queue lengths for the given lane.

  • lane: Default: 0. The lane to request a value for.

Returns: an object with queue length properties:

  • values: The amount of pending results that have not been reconciled with a request yet - note that this includes errors/rejections!
  • requests: The amount of pending requests that have not been reconciled with a result yet.