each
v2.7.2
Published
Chained and parallel async iterator in one elegant function.
Downloads
39,741
Maintainers
Readme
About
Each is a single elegant function to iterate over values both in sequential
, parallel
, and concurrent
mode. It is a powerful and mature library.
Main functionalities include:
- User-defined concurrency level: sequential, parallel, or custom
- Iteration over a list of functions
- Iteration over a list of promise
- Iteration with any type of values handled by a user-defined function
- Full promise support
- ESM package distributed as CommonJS, ESM, and UMD
- Full test coverage
- Zero dependency
Getting started
Installation
Use your favorite package manager to install the each
package:
npm install each
With ESM:
import each from "each";
With CommonJS:
const each = require("each");
Simple example
In its simplest form, Each is used as a single function, a bit like Promise.all
or Promise.allSettled
but, arguably, with more flexibility and easier to read.
This example defines list 3 items to process along the concurrency level and a function hander as it last argument:
const stack = [];
const result = await each(
[
{ message: "Is", timeout: 30 },
{ message: "Gollum", timeout: 20 },
{ message: "Around", timeout: 10 },
],
{ concurrency: true },
({ message, timeout }) =>
new Promise((resolve) =>
setTimeout(() => stack.push(message) && resolve(message), timeout),
),
);
assert.equal(result.join(" "), "Is Gollum Around");
assert.equal(stack.join(" "), "Around Gollum Is");
It is equivalent to passing items as functions without a function handler and with the concurrency level defined as true
.
const stack = [];
const result = await each(
[
() =>
new Promise((resolve) => {
setTimeout(() => stack.push("Is") && resolve("Is"), 30);
}),
() =>
new Promise((resolve) => {
setTimeout(() => stack.push("Gollum") && resolve("Gollum"), 20);
}),
() =>
new Promise((resolve) => {
setTimeout(() => stack.push("Around") && resolve("Around"), 10);
}),
],
true,
);
assert.equal(result.join(" "), "Is Gollum Around");
assert.equal(stack.join(" "), "Around Gollum Is");
Advanced usage
In its advanced form, Each is a scheduler with advanced functionalities to control the execution process.
const scheduler = each({ concurrency: true });
const result = await Promise.all([
scheduler.call([
() => new Promise((resolve) => resolve(1)),
() => new Promise((resolve) => resolve(2)),
]),
scheduler.call([
() => new Promise((resolve) => resolve(3)),
() => new Promise((resolve) => resolve(4)),
]),
]);
assert.deepStrictEqual(result, [
[1, 2],
[3, 4],
]);
Usage
Initialisation
Signature is each(...[items|options|concurrency|handler])
.
All arguments are optional and can be defined in any order.
Multiple items (arrays) are merged. Muliple options (objects) are merged as well.
items: array
An array containing any type of value. Functions are executed and may return a promise. Promise are waiting to be resolved. Any other type is returned as is or passed as an argument of thehandler
function.option: object
An options object. See below for the list of supported options.concurrency: boolean | integer
A boolean or an integer value. Similar to setting theconcurrency
option property. Jump to the concurrency section below.handler: function
A function which take each item as an argument.
Options
concurrency
(default1
)
An integer value defining the number of functions executed in parallel or a boolean value. Valuefalse
is converted to1
where functions are executed sequentially. The valuetrue
is converted to-1
where all functions run simultaneously.fluent
(defaulttrue
) Expose a fluent API where the functions may be chained.pause
(defaultfalse
)
Delay the execution of functions untilresume
is called.relax
(defaultfalse
)
Keep scheduling new functions whencall
is further executed.
Functions
call(handler)
Execute one or several items and return a promise with the resolved value(s). Unless thefluent
option isfalse
, it is also possible to chain additional functions.concurrency([level])
Change the number of items executed in parallel.end([error|options])
Close the scheduler and ensure no additionnal items is registered. The returned promise is resolved once all the scheduled items resolve.error(error|null)
Place the scheduler in an error state, all future registered items will be rejected. Usenull
to set the scheduler to a normal state.options
Return a promise with all options when no argument or with a single option value when one argument. When two arguments are provide as key and value, the promise is resolved when the value is effective.pause
Pause the scheduling of new functions, see the throttling section.resume
Resume the scheduling of new functions, see the throttling section. It returns a promise that resolves once all previously scheduled and paused items are resolved.
Iteration
Resolution order
Output order is consistent with input order. The value returned by a function or resolved by a promise is always returned in the same position as it was originally defined.
Iteration with any type of values
Another type is returned as is unless a handler function is defined.
Each iterates over any type of item. If no handler is defined, functions and and promises get a special treatment. Functions are executed and may return a promise and promises are resolved.
Here is a quick example:
const result = await each([
// Item is a value
"a",
// Item is a function
() => new Promise((resolve) => resolve("b")),
// Item is a promise
new Promise((resolve) => resolve("c")),
]);
assert.deepStrictEqual(result, ["a", "b", "c"]);
Note, in the majority of cases, items (arrays) which do not contain functions and promises are handled with a handler function.
Iteration over a list of functions
Functions are executed. Each handles both synchronous and asynchronous functions. In the latter case, functions return a Promise and Each wait for their resolution.
Here are various ways to declare functions:
const result = await each([
// Synchronous function
function () {
return "a";
},
// Synchronous function with the fat arrow syntax
() => "b",
// Asynchronous function
() => new Promise((resolve) => resolve("c")),
// Asynchronous function which resolves after some delay
() => new Promise((resolve) => setTimeout(() => resolve("d")), 100),
]);
assert.deepStrictEqual(result, ["a", "b", "c", "d"]);
Iteration over a list of promises
Each wait for all promises to be resolved before returning their result. Just like with Promise.all
, result orders respect registration orders.
const result = await each([
// Instant resolution
new Promise((resolve) => resolve("a")),
// Delayed resolution
new Promise((resolve) => setTimeout(() => resolve("b")), 100),
// Instant resolution
new Promise((resolve) => resolve("c")),
]);
assert.deepStrictEqual(result, ["a", "b", "c"]);
Synchronous and asynchronous functions
A function can be an item to iterate or defined with the handler
option. In both cases, the behavior is the same.
A function defined as an item:
console.info(await each([() => 1]));
A function handling an item:
console.info(await each([1], (item) => item));
Handlers are called with the item as the first argument and the index number as the second argument.
Synchronous functions return a value. Asynchronous functions return a Promise.
Here is a synchronous handler function:
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
(item, index) => `${item.id}@${index}`,
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Here is an asynchronous handler function:
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
(item, index) =>
new Promise((resolve) => setTimeout(resolve(`${item.id}@${index}`), 100)),
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Concurrency modes
sequential
Concurrency isfalse
or1
. It is the default concurrency mode.parallel
Concurrency istrue
or-1
. In asynchronous mode, all the items are executed in parallel.concurrent
Concurrency is a number. It defines the maximum number of functions running in parallel at a given time.
Sequential mode (default)
When the concurrent
option is undefined
, false
, or 1
, items are executed in order one after the other.
let running = 0;
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
function (item, index) {
running++;
if (running !== 1) {
throw Error("Invalid execution");
}
return new Promise((resolve) =>
setTimeout(() => {
if (running !== 1) {
throw Error("Invalid execution");
}
running--;
resolve(`${item.id}@${index}`);
}, 100),
);
},
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Parallel mode
When the concurrent
option is true
or -1
, items are all scheduled at the same time and run in parallel.
let running = 0;
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
true,
function (item, index) {
if (running !== index) {
throw Error("Invalid execution");
}
running++;
return new Promise((resolve) =>
setTimeout(() => {
if (running !== 4 - index) {
throw Error("Invalid execution");
}
running--;
resolve(`${item.id}@${index}`);
}, 100),
);
},
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Concurrent mode
When the concurrent
mode is a value above 1
, the number of items running simultaneously is bounded to the concurrent
value.
let running = 0;
const result = await each(
[{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
2,
function (item, index) {
running++;
if (running > 2) {
throw Error("At most 2 running tasks");
}
return new Promise((resolve, reject) =>
setTimeout(() => {
running--;
if (running > 2) {
reject(Error("At most 2 running tasks"));
} else {
resolve(`${item.id}@${index}`);
}
}, 100),
);
},
);
assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);
Manual throttling
Use pause
and resume
functions to throttle the iteration.
The pause
option defines the initial status. Its value defaults to false
.
On pause, executed functions pursue their execution and no further function is scheduled for execution.
When the iteration's state is paused, new scheduled items will not resolve the returned promise until the iteration is resumed.
let state = "paused";
const scheduler = each({ pause: true });
scheduler.then(() => assert.deepStrictEqual(state, "resumed"));
setTimeout(() => {
state = "resumed";
scheduler.resume();
}, 100);
The resume
and end
methods return a promise that resolves once all the element's executions are complete. This is an example using the resume
function.
const stack = [];
const scheduler = each({ pause: true });
scheduler.call(
() =>
new Promise((resolve) => {
stack.push(1);
resolve();
}),
);
scheduler.call(
() =>
new Promise((resolve) => {
stack.push(2);
resolve();
}),
);
setTimeout(async () => {
// Before resume, not processing occurs
assert.deepStrictEqual(stack, []);
// Resume and wait for execution
await scheduler.resume();
// After resume, every element was processed
assert.deepStrictEqual(stack, [1, 2]);
}, 100);
Dealing with errors
Iterations are stopped on error.
With synchronous functions or when the concurrency mode is sequential, it behaves like Promise.all
. On error, no additionnal function is scheduled for execution and the returned promise is rejected.
With asynchronous functions executed concurrently, no additional functions are scheduled. Already executed functions resolves or rejects their promise but the result is discarded.
Whether the items array is provided at initialization or with the call
function, the behavior is the same:
try {
await each(2).call([
() => new Promise((resolve) => setImmediate(() => resolve("ok"))),
() =>
new Promise((resolve, reject) =>
setImmediate(() => reject(Error("Catchme"))),
),
() => new Promise((resolve) => setImmediate(() => resolve("ok"))),
]);
} catch (error) {
assert.equal(error.message, "Catchme");
}
API concurrency
concurrency([level])
level
<integer|boolean>
New concurrency value
It defines the number of items to be executed in parallel. The new level takes effect for all new scheduled items. Previously scheduled items are unaffected.
Calling the concurrency
function change the number of items executed in parrallel. Previously scheduled items are not affected. Only the items scheduled after calling the concurrency
function will honor the new value.
This example change the concurrency
level. The first 3 items are executed in parallel and the next 3 items are executed sequentially.
import assert from "assert";
import each from "each";
const history = [];
const handler = (id) => {
history.push(`${id}:start`);
return new Promise((resolve) =>
setTimeout(() => {
history.push(`${id}:end`);
resolve();
}, 20),
);
};
const scheduler = each(-1);
// Schedule parallel execution
scheduler.call(() => handler(1));
scheduler.call(() => handler(2));
// Change the concurrency level
scheduler.concurrency(1);
// Schedule sequential execution
scheduler.call(() => handler(4));
scheduler.call(() => handler(5));
// Wait for completion
await scheduler.end();
assert.deepStrictEqual(history, [
// Parallel execution
"1:start",
"2:start",
"1:end",
"2:end",
// Sequential execution
"4:start",
"4:end",
"5:start",
"5:end",
]);
API end
end([error|options])
error
Reject the returned promise and every registered item that is not yet executed with an error. All scheduled items not yet executed are resolved with an error. Inrelax
mode, only the promise returned byend
is rejected with an error.force
Skip the execution of registered items that are not yet scheduled for execution. The items resolve with undefined or the value associated with the error option.
Close the scheduler. The returned promise waits for all previously scheduled items to resolve.
No further items are allowed to register with call
. In such case, the returned promise is rejected. When end
is called and the scheduler is in paused state, all paused items are resolved with undefined
or an error if any.
This example wait for the completion of two scheduled items before completion.
import assert from "assert";
import each from "each";
const history = [];
const handler = (id) => {
return new Promise((resolve) =>
setTimeout(() => {
history.push(`${id}:end`);
resolve();
}, 20),
);
};
const scheduler = each(-1);
// Schedule parallel execution
scheduler.call(() => handler(1));
scheduler.call(() => handler(2));
// Wait for completion
await scheduler.end();
assert.deepStrictEqual(history, ["1:end", "2:end"]);
Option fluent
The fluent
option applies when using the each().call
function. By default, it is enabled. The API is designed to allow multiple calls to be chained where the value of the last call is returned:
const result = await each()
.call(() => new Promise((resolve) => resolve(1)))
.call(() => new Promise((resolve) => resolve(2)))
.call(() => new Promise((resolve) => resolve(3)));
assert.strictEqual(result, 3);
The returned promise is enriched with the same functions as the promise returned by each()
, thus exposing the each
API.
Set the fluent
option to false
to not overload the returned promise with the each API:
const promise = each({ fluent: false }).call(
() => new Promise((resolve) => resolve(1)),
);
assert.strictEqual(promise.call, undefined);
assert.strictEqual(promise.options, undefined);
Option pause
The pause
set the initial mode of the scheduler. It is false
by default. Setting the scheduler in pause mode implies calling resume
to start the execution.
Option relax
When the relax
option is active, the internal scheduler permits the registration of new items with the call
function even after an error.
It doesn't affect the processing of an items
list. An error while handling one of the items prevents additionnal execution and rejects the items' promise. However, it provides the ability to register and execute new items with call
.
This is an example with the default behavior:
const scheduler = each();
const prom1 = scheduler.call(() => new Promise((resolve) => resolve(1)));
const prom2 = scheduler.call(() => new Promise((resolve, reject) => reject(2)));
const prom3 = scheduler.call(() => new Promise((resolve) => resolve(3)));
const result = await Promise.allSettled([prom1, prom2, prom3]);
assert.deepStrictEqual(result, [
{ status: "fulfilled", value: 1 },
{ status: "rejected", reason: 2 },
{ status: "rejected", reason: 2 },
]);
This is an example with the relax
option in action:
const scheduler = each({ relax: true });
const prom1 = scheduler.call(() => new Promise((resolve) => resolve(1)));
const prom2 = scheduler.call(() => new Promise((resolve, reject) => reject(2)));
const prom3 = scheduler.call(() => new Promise((resolve) => resolve(3)));
const result = await Promise.allSettled([prom1, prom2, prom3]);
assert.deepStrictEqual(result, [
{ status: "fulfilled", value: 1 },
{ status: "rejected", reason: 2 },
{ status: "fulfilled", value: 3 },
]);
Developers
Tests are executed with Mocha. To install the mocha
package and its dependencies, run npm install
.
npm run test
# Or
yarn run test
To automatically generate a new version and publish it:
yarn run release
Package publication is handled by the CI/CD with GitHub action.
History
- Version 2 is a complete rewrite based on promise.
- Version above 0.8.0 renamed then to next.
- Versions above 0.2.x, changed the arguments of the callback.