npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

prom-utils

v0.14.0

Published

Promise utilities: rate limiting, queueing/batching, defer, etc.

Downloads

2,118

Readme

prom-utils

Promise utilities designed for looping.

rateLimit

Limit the concurrency of promises. This can be used to control how many requests are made to a server, for example. Note: exceptions will be swallowed in order to prevent an UnhandledPromiseRejection from being thrown in the case where the promise rejects before the limit is reached. Therefore, you must handle exceptions on a per promise basis. Wrapping rateLimit method calls in a try/catch will not work.

// Limit concurrency to at most 3
const limiter = rateLimit(3)

for (const url of urls) {
    // Will wait for one promise to finish if limit is reached
    await limiter.add(fetch(url))
}
// Wait for unresolved promises to resolve
await limiter.finish()

batchQueue

Batch calls via a local queue. This can be used to batch values before writing to a database, for example.

Calls fn when either batchSize, batchBytes, or timeout is reached. batchSize defaults to 500 and therefore will always be in effect if no options are provided. You can pass Infinity to disregard batchSize. If timeout is passed, the timer will be started when the first item is enqueued and reset when flush is called explicitly or implicitly.

Use maxItemsPerSec and/or maxBytesPerSec to limit throughput. Call queue.getStats() to get the items/sec and bytes/sec rates.

Call queue.flush() to flush explicitly.

The last result of calling fn can be obtained by referencing lastResult on the returned object.

The cause of the last automatic queue flush can be obtained by referencing lastFlush on the returned object.

const writeToDatabase = async (records) => {...}

const queue = batchQueue(writeToDatabase)
for (const record of records) {
  // Will call `fn` when a threshold is met
  await queue.enqueue(record)
}
// Call `fn` with remaining queued items
await queue.flush()

Types

export type QueueResult<A, B> = {
    /** Call `fn` with the items in the queue. */
    flush(): Promise<void>
    /** Add an item to the queue. When a queue condition is met `flush` will be called. */
    enqueue(item: A): Promise<void>
    /** The last result returned from calling `fn`. */
    lastResult?: Awaited<B>
    /** Get the current throughput rates. */
    getStats(): QueueStats
}

export interface QueueOptions {
    /** Wait for the batch to reach this number of elements before flushing the queue. */
    batchSize?: number
    /** Wait for the batch to reach this size in bytes before flushing the queue. */
    batchBytes?: number
    /** Wait this long in ms before flushing the queue. */
    timeout?: number
    /** Maximum throughput allowed (item/sec). */
    maxItemsPerSec?: number
    /** Maximum throughput allowed (bytes/sec). */
    maxBytesPerSec?: number
}

Example

const writeToDatabase = async (records) => {...}
const batchSize = 250

const queue = batchQueue(writeToDatabase, { batchSize })
for (const record of records) {
  await queue.enqueue(record)
}
await queue.flush()

throughputLimiter

Limit throughput by sleeping until the rate (units/sec) is less than or equal to maxUnitsPerSec. Units is intentionally abstract since it could represent records/sec or bytes/sec, for example.

Example

// Limit to at most 1000 items/sec
const limiter = throughputLimiter(1000)

for (const batch of batches) {
    // Will wait until the rate is <= `maxUnitsPerSec`
    await limiter.throttle(batch.length)
    console.log('Items/sec %d', limiter.getCurrentRate())
}

Types

export interface ThroughputLimiterOptions {
    /** The maximum number of start invocations to hold in memory. */
    windowLength?: number
    /** Number of ms to sleep before checking the rate again. Defaults to 100. */
    sleepTime?: number
}

pausable

Pause a loop by awaiting maybeBlock. When pause is called maybeBlock will return a promise that is resolved when resume is called. Otherwise, maybeBlock will return immediately. If timeout is passed, resume will be called after timeout if it is not manually called first.

const shouldProcess = pausable()

onSomeCondition(shouldProcess.pause)
onSomeOtherCondition(shouldProcess.resume)

for (const record of records) {
    await shouldProcess.maybeBlock()
    await processRecord(record)
}

defer

Defer resolving a promise until done is called.

const delay = (milliseconds: number) => {
    const deferred = defer()
    setTimeout(deferred.done, milliseconds, '🦄')
    return deferred.promise
}

sleep

Sleep for time ms before resolving the Promise.

// Sleep for one second
await sleep(1000)

pacemaker

Call heartbeatFn every interval until promise resolves or rejects. interval defaults to 1000.

Returns the value of the resolved promise.

const heartbeatFn = () => {
    // Emit heartbeat
}

const result = await pacemaker(heartbeatFn, someProm)

waitUntil

Wait until the predicate returns truthy or the timeout expires. Returns a promise.

Types

export interface WaitOptions {
    /** Wait this long in ms before rejecting. Defaults to 5000 ms. */
    timeout?: number
    /** Check the predicate with this frequency. Defaults to 50 ms. */
    checkFrequency?: number
}

Example

let isTruthy = false
setTimeout(() => {
    isTruthy = true
}, 250)
await waitUntil(() => isTruthy)

raceTimeout

Returns the value of the promise if the promise resolves prior to timeout. If the timeout happens first, the exported TIMEOUT symbol is returned.

const winner = await raceTimeout(someProm, 5)

if (winner === TIMEOUT) {
  // Do something
}