npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@watchable/nevermore

v1.0.0

Published

Rate limits and concurrency using generators of promiseFactories

Downloads

14

Readme

@watchable/nevermore - async scheduling and retry

What is nevermore?

The nevermore scheduler adds rate-limiting, concurrency control, timeout, retry and backoff to your async functions without changing their signature or implementation.

Usage

Define limits by passing option values to one of the two core nevermore APIs. The example below uses the executor API to wrap your own vanilla async function...

import { createExecutorStrategy } from "@watchable/nevermore";
import { myFn } from "./myFn.ts";

const { createExecutor } = createExecutorStrategy({
  concurrency: 1,
  intervalMs: 100,
  backoffMs: 50,
  timeoutMs: 3000,
  retries: 3,
});

const myLimitedFn = createExecutor(myFn);
const myResult = await myLimitedFn("my", "typed", "arguments");

nevermore has two core APIs which accept the same strategy options...

  • createExecutorStrategy - wraps async functions without changing your code
  • createSettlementSequence - pulls from generators creating jobs just-in-time

The execution of Jobs is controlled through composable scheduling primitives known as strategies. Multiple strategies are already implemented as individual composable blocks which can be freely combined. You can further extend nevermore by writing your own strategies.

See more detail about the two API signatures in the APIs section later in this document.

Available strategies

A concurrency Strategy accepts another job only when the number of pending jobs goes below concurrency. When there is a free slot (previously pending jobs have settled as resolved or rejected), the strategy will accept a new pending job. To activate this strategy, provide a concurrency number in the options.

A rate Strategy implements rate limiting by launching the next job only when there is a free slot within the intervalMs. Every execution of a job uses up one slot in the interval. When an interval's slots are exhausted, the strategy calculates when the next slot will become free, and sleeps for that duration before accepting the next job. To activate this strategy, provide an intervalMs number in the options. The default value of intervalLaunches is 1 launch per interval.

A timeout Strategy wraps jobs in a timeout job (throwing an error if the job hasn't settled before timeoutMs) then passes the job to downstream strategies. On receiving a settlement (fulfilment, rejection or timeout) it unwraps the timeout job, yielding a JobSettlement pointing to the original job, not the substitute. To activate this strategy, provide a timeoutMs number in the options and remember your wrapped function may now throw a nevermore TimeoutError.

A retry Strategy repeatedly calls failing jobs until the number of failures equals retries. It wraps jobs in a retry job before launching them, storing the count of retries attempted. JobResolved settlements are unwrapped yielding a JobResolved pointing to the original job. By contrast, JobRejected events trigger further retries until reaching the maximum number of retries for that job, and the last failure is passed back as the job's settlement. To activate this strategy, provide a retries number in the options.

A backoff Strategy repeatedly calls failing jobs with a increasing backoff delay (based on an exponential function). See the section on 'retry' for more detail of the approach. To activate this strategy, provide a backoffMs number in the options. To get eventual feedback from continually failing jobs, you need to set a retries option. To get backpressure from createSettlementSequence pulling just-in-time, you need to set a concurrency option to prevent indefinitely-many jobs being queued.

Install

npm install @watchable/nevermore

APIs

Ad Hoc (async function) API

An ExecutorStrategy can transform a normal async function into a function that is regulated by a nevermore pipeline.

Create a strategy and get back a createExecutor() function. The strategy shown below exercises most of the options - concurrency-limits, rate-limits, backoff, timeouts and retries...

import { createExecutorStrategy } from "@watchable/nevermore";
const { createExecutor } = createExecutorStrategy({
  concurrency: 1,
  intervalMs: 100,
  timeoutMs: 3000,
  backoffMs: 1000,
  retries: 3,
});

You can then use createExecutor to turn an ordinary function into a regulated function that respects the constraints of the strategy you configured...

async function getStarWars(filmId: number) {
  return await fetch(`https://swapi.dev/api/films/${filmId}/`, {
    method: "get",
  });
}

const getStarWarsExecutor = createExecutor(getStarWars);

// the below invocation has intellisense for
// autocompleting args for getStarWars and...
// * will allow only one concurrent retrieval
// * will allow only one retrieval every 100ms
// * will timeout individual attempts after 3000ms
// * will attempt up to 3 times if getStarWars throws
const [episode4, episode5, episode6] = await Promise.allSettled([
  getStarWarsExecutor(1),
  getStarWarsExecutor(2),
  getStarWarsExecutor(3),
]);

Batch (generator) API

For batch routines, (or potentially infinite sets), createSettlementSequence provides an alternative API based on iterable sequences of async functions that are created on-the-fly as they are needed.

This backpressure limits the growth of memory in your app by limiting the creation of new tasks according to downstream rate and concurrency limits.

Exactly the same scheduling options (concurrency, retry etc.) are supported as in the createExecutorStrategy API.

Explanation

If you eventually need to satisfy a million requests, you don't want to spawn them all as pending promises in memory while they are slowly processed at 100 per second. The resources dedicated to pending jobs should be allocated just-in-time.

The createExecutor approach described above is very convenient for adding seamless scheduling of hundreds of parallel tasks without having to change your code. Unfortunately this makes the scheduling opaque and there is therefore no mechanism to provide backpressure when jobs aren't completing quickly.

By contrast the createSettlementSequence allows developers to respect 'backpressure' from a pipeline's limited capacity. Each Job is yielded from your iterator just-in-time as capacity becomes available. Between yields your iterator is halted, holding only its stack in memory. An iteration procedure for 1 million requests will therefore only progress as fast as the pipeline allows, and the only promises in memory are those which have been scheduled.

An example of a sequence yielding Job callbacks one-by-one is shown below.

import { createSettlementSequence } from "@watchable/nevermore";

// define a sequence of zero-arg functions
async function* createJobSequence() {
  for (;;) {
    yield async () => {
      const result = await fetch(
        `https://timeapi.io/api/TimeZone/zone?timeZone=Europe/London`
      );
      if (result.status !== 200) {
        throw new Error("Failure retrieving time");
      }
      return result.json() as { timeZone: string; currentLocalTime: string };
    };
  }
}

// create a sequence of settlements (limited by specified options)
const settlementSequence = createSettlementSequence(
  {
    concurrency: 1,
    intervalMs: 1000,
    timeoutMs: 3000,
    retries: 3,
  },
  createJobSequence
);

// consume the settlements (like Promise.allSettled())
for await (const settlement of settlementSequence) {
  if (settlement.status === "fulfilled") {
    console.log(`Time in London is ${settlement.value.currentLocalTime}`);
  } else {
    console.error(
      `Gave up retrying. Last error was: ${settlement.reason?.message}`
    );
  }
}

Extending Settlement

The type of settlements yielded from a settlement sequence aligns with Promise.allSettled(), but with an extra job member.

The type of your job J is preserved in JobSettlement<J>, meaning you can get the annotations back at settlement time.

Annotating a job, and creating an inferrable J is trivial. Instead of ...

yield () => getStarWars(filmId);

Add properties to the yielded no-arg function with Object.assign

yield Object.assign(() => getStarWars(filmId), { filmId });

Then you can get the extra information back from the type-safe job in the settlement...

// consume the settlements (like Promise.allSettled())
for await (const settlement of settlementSequence) {
  const { filmId } = settlement.job;
  if (settlement.status === "fulfilled") {
    console.log(`Success for ${filmId} : response was ${settlement.value}`);
  } else {
    console.error(
      `Failure for ${filmId}: last error was ${settlement.reason?.message}`
    );
  }
}

Writing your own nevermore strategies

Developers can add e.g. a CircuitBreaker strategy of their own to extend the richness of their nevermore pipeline.

For reference a passthru Strategy is included in source. This is a no-op strategy that is suitable as a starting point for your own strategies. Its implementation is shown in full below to illustrate the Strategy formalism.

  • launchJob() asks to schedule a Job, returning a promise that resolves once the job has been first invoked.
  • launchesDone() is a signal called on your strategy when no further launches will take place, allowing it to track remaining pending jobs, and finally clean up resources.
  • next(): Promise<IteratorResult<JobSettlement<J>>> implements an AsyncIterator allowing your strategy to pass back the eventual settlements of launched jobs (when they are eventually fulfilled or rejected).
export function createPassthruStrategy<J extends Job<unknown>>(
  downstream: Strategy<J>
) {
  return {
    launchJob(job) {
      return downstream.launchJob(job);
    },
    launchesDone() {
      downstream.launchesDone();
    },
    next() {
      return downstream.next();
    },
  } satisfies Strategy<J>;
}

Changing strategy sequence

You can pass piped strategies in the pipes option to be placed upstream of strategies specified in the other options. If there are no other options, it will simply sequence the pipes you choose. nevermore exports factories for core pipes to be able to interleave them with your own e.g. createConcurrencyPipe() and createTimeoutPipe().

This would be needed if you want to sequence your own strategies differently than the default sequence (found in the core sequence.ts file). For example, in the default sequence backoff is placed before concurrency. This ensures that backed off tasks don't consume a slot, meaning concurrency only limits executing jobs. Placing concurrency before backoff means a slot is used by a scheduled task during its whole lifecycle (including between retries).

See also

  • p-limit
  • p-queue
  • p-retry
  • promise-pool