@heap-code/concurrency-synchronization
v1.0.1
Published
Manage concurrency in Javascript "threads" with promises.
Downloads
127
Maintainers
Readme
Concurrency synchronization
Manage concurrency in Javascript "threads" with promises.
Preface
The aim of this package is to mimic the various tools used for concurrency synchronization, as multithreading does not exist in Javascript in the same way as in C, Java or other languages.
From the mozilla documentation:
Note that JavaScript is single-threaded by nature, so at a given instant, only one task will be executing, although control can shift between different promises, making execution of the promises appear concurrent. Parallel execution in JavaScript can only be achieved through worker threads.
A "threads" is, in this context, an asynchronous task.
Installation
Simply run:
npm i @heap-code/concurrency-synchronization
CDN
Thanks to jsdelivr, this package can easily be used in browsers like this:
<script
src="https://cdn.jsdelivr.net/npm/@heap-code/concurrency-synchronization/dist/bundles/concurrency-synchronization.umd.js"
type="application/javascript"
></script>
Note:
It is recommended to use a minified and versioned bundle.For example:
<script src="https://cdn.jsdelivr.net/npm/@heap-code/[email protected]/dist/bundles/concurrency-synchronization.umd.min.js" type="application/javascript" ></script>
More at this jsdelivr package page.
Usages
In the code examples, the sleep
function is the following:
function sleep(time: number) {
return new Promise(resolve => setTimeout(resolve, time));
}
This is a placeholder for any asynchronous task.
Note:
Avoid using this package on "production" code.
Go here to understand why.
Mutex
A mutex is a mechanism that enforces limits on access to a resource. It generally protects the access to shared variables.
Use semaphores for synchronization rather than a mutex.
With the given example:
const myVar = { i: 0 };
async function do1() {
await sleep(200);
myVar.i += 1;
}
async function do2() {
await sleep(50);
myVar.i *= 3;
}
async function bootstrap() {
await Promise.all([do1(), sleep(10).then(() => do2())]);
console.log(myVar.i); // => 1
}
bootstrap();
Even with the sleep
, we could expect that myVar.i += 1
is done
before myVar.i *= 3
as it is called before.
However myVar.i
is not protected, then the final value is 1
.
If a mutex locks the task, then the variable is protected:
import { Mutex } from "@heap-code/concurrency-synchronization";
const mutex = new Mutex();
const myVar = { i: 0 };
async function do1() {
await mutex.lock();
await sleep(200);
myVar.i += 1;
await mutex.unlock();
}
async function do2() {
await mutex.lock();
await sleep(50);
myVar.i *= 3;
await mutex.unlock();
}
async function bootstrap() {
await Promise.all([do1(), sleep(10).then(() => do2())]);
console.log(myVar.i); // => 3
}
bootstrap();
From this wikipedia section:
The task that locked the mutex is supposed to unlock it.
Mutex tryLock
It is possible to try to lock a mutex in a given time limit.
The function will then throw an exception if the mutex could not lock in time:
import { ConcurrencyExceedTimeoutException } from "@heap-code/concurrency-synchronization";
mutex.tryLock(250).catch((error: unknown) => {
if (error instanceof ConcurrencyExceedTimeoutException) {
console.log("Could not lock in the given time.");
}
throw error;
});
Mutex interrupt
A mutex can be interrupted at any time.
All awaiting "threads" will then receive an exception:
import { ConcurrencyInterruptedException, Mutex } from "@heap-code/concurrency-synchronization";
const mutex = new Mutex();
const myVar = { i: 0 };
async function do1() {
await mutex.lock().catch((error: unknown) => {
if (error instanceof ConcurrencyInterruptedException) {
console.log("The mutex has been interrupted", error.getReason());
}
throw error;
});
await sleep(200);
myVar.i += 1;
await mutex.unlock();
}
async function bootstrap() {
await Promise.all([
do1(),
do1(),
sleep(20).then(() => mutex.interrupt({ message: "Take too much time" }))
]);
}
bootstrap();
Semaphore
From wikipedia:
Semaphores are a type of synchronization primitive.
They can be used to protect certain resources (like mutexes), but are generally used for synchronization:
import { Semaphore } from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const semaphore = new Semaphore(0);
const time1 = 100;
const time2 = 150;
sleep(time1).then(() => semaphore.release());
sleep(time2).then(() => semaphore.release());
const maxTime = Math.max(time1, time2);
const before = performance.now();
await semaphore.acquire(2); // waiting until releases
const after = performance.now();
const elapsed = after - before; // ~150
console.log("Done. took %dms with expected %dms", elapsed, maxTime);
}
bootstrap();
Semaphore tryAcquire
It is possible to try to acquire a semaphore in a given time limit.
The function will then throw an exception if the semaphore could not acquire in time:
import {
ConcurrencyExceedTimeoutException,
Semaphore
} from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const semaphore = new Semaphore(2);
const acquired1 = await semaphore.tryAcquire(100).then(() => true);
const acquired2 = await semaphore.tryAcquire(100, 2).catch((error: unknown) => {
if (error instanceof ConcurrencyExceedTimeoutException) {
return false;
}
throw error;
});
console.log(acquired1); // true
console.log(acquired2); // false
}
bootstrap();
Semaphore interrupt
A semaphore can be interrupted at any time.
All awaiting "threads" will then receive an exception:
import { ConcurrencyInterruptedException, Semaphore } from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const semaphore = new Semaphore(1);
void sleep(100).then(() => semaphore.interrupt({ code: 502 }, 2));
const succeed = await Promise.all([
semaphore.acquire(),
semaphore.acquire(2),
semaphore.tryAcquire(200),
semaphore.tryAcquire(200, 1)
]).catch((error: unknown) => {
if (error instanceof ConcurrencyInterruptedException) {
return false;
}
throw error;
});
console.log(succeed); // false
console.log(semaphore.permitsAvailable); // 2
}
bootstrap();
Semaphore releaseAll
Very similar to interrupt, but it does not throw an exception.
import { Semaphore } from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const semaphore = new Semaphore(1);
void sleep(100).then(() => semaphore.releaseAll(3));
await Promise.all([
semaphore.acquire(),
semaphore.acquire(2),
semaphore.tryAcquire(200),
semaphore.tryAcquire(200, 1)
]);
console.log("ok");
console.log(semaphore.permitsAvailable); // 3
}
bootstrap()
Note: Unless it is really desired, prefer interrupt over
releaseAll
.
Producer-Consumer
The ProducerConsumer
looks a lot like a Semaphore,
but it returns values on acquire.
By default, all readings use an array:
import { ProducerConsumer } from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1]);
const time1 = 100;
const time2 = 150;
sleep(time1).then(() => producerConsumer.write(3, 4));
sleep(time2).then(() => producerConsumer.write(2));
const maxTime = Math.max(time1, time2);
const before = performance.now();
const valuesRead = await producerConsumer.read(4); // waiting until all is read
const after = performance.now();
const elapsed = after - before; // ~150
console.log("Done. took %dms with expected %dms", elapsed, maxTime);
console.log(valuesRead) // [1, 3, 4, 2]
}
bootstrap();
Producer-Consumer tryRead
It is possible to try to read some values in a given time limit.
The function will then throw an exception if it could not read in time:
import {
ConcurrencyExceedTimeoutException,
ProducerConsumer
} from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1, 2, 3]);
const success1 = await producerConsumer.tryRead(100, 2).then(() => true);
const success2 = await producerConsumer.tryRead(100, 2).catch((error: unknown) => {
if (error instanceof ConcurrencyExceedTimeoutException) {
return false;
}
throw error;
});
console.log(success1); // true
console.log(success2); // false
}
bootstrap();
Producer-Consumer readOne
The read
and tryRead
have their "one"-method
that do the same thing but return only one value instead of an array:
import { ProducerConsumer } from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1, 2]);
// const [value1] = producerConsumer.read(1);
// can be written:
const value1 = await producerConsumer.readOne();
// const [value2] = producerConsumer.tryRead(100, 1);
// can be written:
const value2 = await producerConsumer.tryReadOne(100);
console.log(value1, value2); // 1 2
}
bootstrap();
Producer-Consumer interrupt
A ProducerConsumer
can be interrupted at any time.
All awaiting "threads" will then receive an exception:
import {
ConcurrencyInterruptedException,
ProducerConsumer
} from "@heap-code/concurrency-synchronization";
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1]);
void sleep(100).then(() => producerConsumer.interrupt({ code: 502 }, [1, 2, 3]));
const succeed = await Promise.all([
producerConsumer.read(3),
producerConsumer.readOne(),
producerConsumer.tryRead(200, 3),
producerConsumer.tryReadOne(200)
]).catch((error: unknown) => {
if (error instanceof ConcurrencyInterruptedException) {
return false;
}
throw error;
});
console.log(succeed); // false
console.log(producerConsumer.permitsAvailable); // 3
}
bootstrap();
When to use
This package can be useful when writing test and wanting to synchronize events.
For example, the RxJs observable behavior slightly differs if it comes from regular observable or subjects.
firstValueFrom returns the value immediately.
So this difference can be omitted with the following:
import { ProducerConsumer } from "@heap-code/concurrency-synchronization";
describe("My test", () => {
it("should work", async () => {
const producerConsumer = new ProducerConsumer();
const subscription = myObservable.subscribe(value => producerConsumer.write(value));
// something that updates the observable
// Need to pass 2 times in the event
const [r1, r2] = await producerConsumer.tryRead(500, 2);
expect(r1).toBe(1);
expect(r2).toBe(2);
subscription.unsubscribe()
});
});
However, these synchronizations are generally not wanted in production code as it is wanted to
keep the javascript code as "parallelized" as possible, to not block code branches.
Moreover, better solutions might exist for these problems,
such as firstValueFrom
and lastValueFrom
for RxJs.
Releases
See information about breaking changes and release notes here.