chanpuru
v0.2.2
Published
Parallel processing by Promise + Async Generator.
Downloads
55
Readme
chanpuru
Parallel processing by Promise + Async Generator.
- Sending and receiving values by
Promise
and Async Generator (Chan
) - Control multiple
Promise
concurrent executions byChan
(workers()
/payloads()
) - Selecting and merging Async Generators with
Promise.race
(select()
) - Cancellation by
Promise
and AbortController (abortPromise()
etc.)
These are influenced by Go Channle.
Installtion
$ npm install --save chanpuru
Usage
paralle-jobs.ts
Parallelize command execution by $
of zx with Chan
.
Send
- Make a Channel with a buffer
- Asynchronously execute a loop that sends the
$
command to channel- send
Promise
that is returned from$
(it returnsProcessPromise
)$
executes commands continuously until buffer is filled- Buffered items will be consumed by received
- Close Channel after sending all
- send
- Return Receiver of Channel
Note that the buffer size does not limit the number of Promise
executions.
Refer pass-promise-paralle.ts to details.
function computeHash(
recvFiles: AsyncGenerator<string, void, void>,
workerNum: number
): ChanRecv<ProcessOutput> {
const ch = new Chan<Promise<ProcessOutput>>(workerNum - 1)
;(async () => {
try {
for await (const file of recvFiles) {
console.log(chalk.greenBright(`start - ${file}`))
await ch.send($`sha256sum ${file}`)
}
} catch (err) {
console.error(err)
} finally {
ch.close()
}
})()
return ch.receiver()
}
Receive
- Get Receiver(Async Generator)
- Receive results with
for await...of
- Each result is awaited
- Buffer of Channel is not filled,
await send
will be released
- Exit the loop after all commands (
Promise
) received
const recvResults = computeHash(recvFiles, workerNum)
for await (const f of recvResults) {
console.log(chalk.blueBright(f.stdout))
}
log-multiple-sources.ts
- Merge the output from
$
of zx withselect ()
- Stop all commands if any command exit with error status
- Stop all commands even if timed out
Send
- Make a Channel to send command output
- Execute the command asynchronously with
$
- Send output line by line to Channel
- kill the process when the
cancelPromise
is settled - When an error occurs
- Send
stderr
etc. to the error channel - Call
cancel ()
to notify other asynchronous functions viacancelPromise
- Send
- Returns Receiver of Channe;
function ping(
[cancelPromise, cancel]: [Promise<void>, () => void],
sendErr: ChanSend<any>,
host: string
) {
const ch = new Chan<string | Buffer>()
;(async () => {
let zxProc: ProcessPromise<ProcessOutput> | undefined = undefined
// cancel されたときの処理.
let abortOwn = false
const signalName = 'SIGTERM'
cancelPromise
.catch(() => {})
.finally(() => {
// コードが側からの kill.
abortOwn = true
zxProc && zxProc.kill(signalName)
})
try {
// ping 開始(await しない).
zxProc = $`ping -c 7 ${host}`
// ping の stdout を行単位での読み取り行う.
const rl = createInterface({
input: zxProc.stdout,
crlfDelay: Infinity
})
for await (const s of rl) {
// 読み取った行をログとして送信する.
ch.send(s)
}
// プロセスの完了を待つ.
// この時点で reject されていれば throw される.
await zxProc
.catch((r) => {
if (r instanceof ProcessOutput) {
if (abortOwn && r.exitCode === null && r.signal === signalName) {
// 今回はコード側からの kill はエラーとしない.
ch.send(`aborted`)
return
}
}
throw r
})
.finally(() => {
zxProc = undefined
})
} catch (err) {
if (err instanceof ProcessOutput) {
// プロセスの異常終了.
sendErr(
`host: ${host}\nexitCopde: ${err.exitCode}\nsignal: ${err.signal}\n${err.stderr}`
)
} else {
// その他のエラー.
sendErr(`host: ${host}, err: ${err}`)
}
cancel() // 全体の処理をキャンセルさせる.
} finally {
// 後処理.
ch.close()
}
})()
return ch.receiver()
}
Receive
- Execute a loop that receives an error with an asynchronous function
- When data is received, executed error process
- Create an object with key and Receriver for
select()
- Receive logs via
select()
withfor await..of
done
is passed from each Async Generators viaselect()
- Process by source key(
host
)
// エラーを受信するループ.
// 非同期に実行させておく.
;(async () => {
for await (const err of errCh.receiver()) {
console.error(chalk.redBright(`${err}`))
// エラーだったのでコードを変更.
exitStatus = 1
}
})()
// select に渡す key と Async Generator をセットするオブジェクト.
const jobs: Record<string, ChanRecv<string | Buffer>> = {}
// ping から送信されるログの Receiver(Async Generator) をセットする.
Object.entries(hosts).forEach(([k, v]) => {
jobs[k] = ping([cancelPromise, cancel], errCh.send, v)
})
// ログを受信するループ.
for await (const [host, value] of select(jobs)) {
if (!value.done) {
console.log(`[${decorate(host)(host)}] ${value.value.toString('utf-8')}`)
}
}
Examples
API
License
MIT License
Copyright (c) 2022 hankei6km