npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

chanpuru

v0.2.2

Published

Parallel processing by Promise + Async Generator.

Downloads

55

Readme

chanpuru

Parallel processing by Promise + Async Generator.

  • Sending and receiving values by Promise and Async Generator (Chan)
  • Control multiple Promise concurrent executions by Chan (workers() / payloads())
  • Selecting and merging Async Generators with Promise.race (select())
  • Cancellation by Promise and AbortController (abortPromise() etc.)

These are influenced by Go Channle.

Installtion

$ npm install --save chanpuru

Usage

paralle-jobs.ts

Parallelize command execution by $ of zx with Chan.

External commands(sha256sum) are being executed in parallel

Send

  1. Make a Channel with a buffer
  2. Asynchronously execute a loop that sends the $ command to channel
    • send Promise that is returned from $(it returns ProcessPromise)
      • $ executes commands continuously until buffer is filled
      • Buffered items will be consumed by received
    • Close Channel after sending all
  3. Return Receiver of Channel

Note that the buffer size does not limit the number of Promise executions. Refer pass-promise-paralle.ts to details.

function computeHash(
  recvFiles: AsyncGenerator<string, void, void>,
  workerNum: number
): ChanRecv<ProcessOutput> {
  const ch = new Chan<Promise<ProcessOutput>>(workerNum - 1)

  ;(async () => {
    try {
      for await (const file of recvFiles) {
        console.log(chalk.greenBright(`start - ${file}`))
        await ch.send($`sha256sum ${file}`)
      }
    } catch (err) {
      console.error(err)
    } finally {
      ch.close()
    }
  })()

  return ch.receiver()
}

Receive

  1. Get Receiver(Async Generator)
  2. Receive results with for await...of
    • Each result is awaited
    • Buffer of Channel is not filled, await send will be released
  3. Exit the loop after all commands (Promise) received
const recvResults = computeHash(recvFiles, workerNum)
for await (const f of recvResults) {
  console.log(chalk.blueBright(f.stdout))
}

log-multiple-sources.ts

  • Merge the output from $ of zx with select ()
  • Stop all commands if any command exit with error status
  • Stop all commands even if timed out

Displaying while merging the ping execution status to multiple hosts

Send

  1. Make a Channel to send command output
  2. Execute the command asynchronously with $
    • Send output line by line to Channel
    • kill the process when the cancelPromise is settled
    • When an error occurs
      1. Send stderr etc. to the error channel
      2. Call cancel () to notify other asynchronous functions via cancelPromise
  3. Returns Receiver of Channe;
function ping(
  [cancelPromise, cancel]: [Promise<void>, () => void],
  sendErr: ChanSend<any>,
  host: string
) {
  const ch = new Chan<string | Buffer>()
  ;(async () => {
    let zxProc: ProcessPromise<ProcessOutput> | undefined = undefined
    // cancel されたときの処理.
    let abortOwn = false
    const signalName = 'SIGTERM'
    cancelPromise
      .catch(() => {})
      .finally(() => {
        // コードが側からの kill.
        abortOwn = true
        zxProc && zxProc.kill(signalName)
      })
    try {
      // ping 開始(await しない).
      zxProc = $`ping -c 7 ${host}`
      // ping の stdout を行単位での読み取り行う.
      const rl = createInterface({
        input: zxProc.stdout,
        crlfDelay: Infinity
      })
      for await (const s of rl) {
        // 読み取った行をログとして送信する.
        ch.send(s)
      }
      // プロセスの完了を待つ.
      // この時点で reject されていれば throw される.
      await zxProc
        .catch((r) => {
          if (r instanceof ProcessOutput) {
            if (abortOwn && r.exitCode === null && r.signal === signalName) {
              // 今回はコード側からの kill はエラーとしない.
              ch.send(`aborted`)
              return
            }
          }
          throw r
        })
        .finally(() => {
          zxProc = undefined
        })
    } catch (err) {
      if (err instanceof ProcessOutput) {
        // プロセスの異常終了.
        sendErr(
          `host: ${host}\nexitCopde: ${err.exitCode}\nsignal: ${err.signal}\n${err.stderr}`
        )
      } else {
        // その他のエラー.
        sendErr(`host: ${host}, err: ${err}`)
      }
      cancel() // 全体の処理をキャンセルさせる.
    } finally {
      // 後処理.
      ch.close()
    }
  })()

  return ch.receiver()
}

Receive

  1. Execute a loop that receives an error with an asynchronous function
    • When data is received, executed error process
  2. Create an object with key and Receriver for select()
  3. Receive logs via select() with for await..of
    • done is passed from each Async Generators via select()
    • Process by source key(host)
// エラーを受信するループ.
// 非同期に実行させておく.
;(async () => {
  for await (const err of errCh.receiver()) {
    console.error(chalk.redBright(`${err}`))
    // エラーだったのでコードを変更.
    exitStatus = 1
  }
})()

// select に渡す key と Async Generator をセットするオブジェクト.
const jobs: Record<string, ChanRecv<string | Buffer>> = {}
// ping から送信されるログの Receiver(Async Generator) をセットする.
Object.entries(hosts).forEach(([k, v]) => {
  jobs[k] = ping([cancelPromise, cancel], errCh.send, v)
})

// ログを受信するループ.
for await (const [host, value] of select(jobs)) {
  if (!value.done) {
    console.log(`[${decorate(host)(host)}] ${value.value.toString('utf-8')}`)
  }
}

Examples

Further examples

API

API document

License

MIT License

Copyright (c) 2022 hankei6km