npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@heap-code/concurrency-synchronization

v1.0.1

Published

Manage concurrency in Javascript "threads" with promises.

Downloads

127

Readme

Concurrency synchronization

CI npm version Code coverage Comment coverage

Manage concurrency in Javascript "threads" with promises.

Preface

The aim of this package is to mimic the various tools used for concurrency synchronization, as multithreading does not exist in Javascript in the same way as in C, Java or other languages.

From the mozilla documentation:

Note that JavaScript is single-threaded by nature, so at a given instant, only one task will be executing, although control can shift between different promises, making execution of the promises appear concurrent. Parallel execution in JavaScript can only be achieved through worker threads.

A "threads" is, in this context, an asynchronous task.

Installation

Simply run:

npm i @heap-code/concurrency-synchronization

CDN

Thanks to jsdelivr, this package can easily be used in browsers like this:

<script
 src="https://cdn.jsdelivr.net/npm/@heap-code/concurrency-synchronization/dist/bundles/concurrency-synchronization.umd.js"
 type="application/javascript"
></script>

Note:
It is recommended to use a minified and versioned bundle.

For example:

<script
 src="https://cdn.jsdelivr.net/npm/@heap-code/[email protected]/dist/bundles/concurrency-synchronization.umd.min.js"
 type="application/javascript"
></script>

More at this jsdelivr package page.

Usages

In the code examples, the sleep function is the following:

function sleep(time: number) {
  return new Promise(resolve => setTimeout(resolve, time));
}

This is a placeholder for any asynchronous task.

Note:
Avoid using this package on "production" code.
Go here to understand why.

Mutex

A mutex is a mechanism that enforces limits on access to a resource. It generally protects the access to shared variables.

Use semaphores for synchronization rather than a mutex.


With the given example:

const myVar = { i: 0 };

async function do1() {
  await sleep(200);
  myVar.i += 1;
}

async function do2() {
  await sleep(50);
  myVar.i *= 3;
}

async function bootstrap() {
  await Promise.all([do1(), sleep(10).then(() => do2())]);
  console.log(myVar.i); // => 1
}
bootstrap();

Even with the sleep, we could expect that myVar.i += 1 is done before myVar.i *= 3 as it is called before.
However myVar.i is not protected, then the final value is 1.


If a mutex locks the task, then the variable is protected:

import { Mutex } from "@heap-code/concurrency-synchronization";

const mutex = new Mutex();
const myVar = { i: 0 };

async function do1() {
  await mutex.lock();

  await sleep(200);
  myVar.i += 1;

  await mutex.unlock();
}

async function do2() {
  await mutex.lock();

  await sleep(50);
  myVar.i *= 3;

  await mutex.unlock();
}

async function bootstrap() {
  await Promise.all([do1(), sleep(10).then(() => do2())]);
  console.log(myVar.i); // => 3
}
bootstrap();

From this wikipedia section:

The task that locked the mutex is supposed to unlock it.

Mutex tryLock

It is possible to try to lock a mutex in a given time limit.
The function will then throw an exception if the mutex could not lock in time:

import { ConcurrencyExceedTimeoutException } from "@heap-code/concurrency-synchronization";

mutex.tryLock(250).catch((error: unknown) => {
  if (error instanceof ConcurrencyExceedTimeoutException) {
    console.log("Could not lock in the given time.");
  }

  throw error;
});

Mutex interrupt

A mutex can be interrupted at any time.
All awaiting "threads" will then receive an exception:

import { ConcurrencyInterruptedException, Mutex } from "@heap-code/concurrency-synchronization";

const mutex = new Mutex();
const myVar = { i: 0 };

async function do1() {
  await mutex.lock().catch((error: unknown) => {
    if (error instanceof ConcurrencyInterruptedException) {
      console.log("The mutex has been interrupted", error.getReason());
    }

    throw error;
  });

  await sleep(200);
  myVar.i += 1;

  await mutex.unlock();
}

async function bootstrap() {
  await Promise.all([
    do1(),
    do1(),
    sleep(20).then(() => mutex.interrupt({ message: "Take too much time" }))
  ]);
}
bootstrap();

Semaphore

From wikipedia:

Semaphores are a type of synchronization primitive.

They can be used to protect certain resources (like mutexes), but are generally used for synchronization:

import { Semaphore } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const semaphore = new Semaphore(0);

  const time1 = 100;
  const time2 = 150;

  sleep(time1).then(() => semaphore.release());
  sleep(time2).then(() => semaphore.release());

  const maxTime = Math.max(time1, time2);

  const before = performance.now();
  await semaphore.acquire(2); // waiting until releases
  const after = performance.now();

  const elapsed = after - before; // ~150
  console.log("Done. took %dms with expected %dms", elapsed, maxTime);
}
bootstrap();

Semaphore tryAcquire

It is possible to try to acquire a semaphore in a given time limit.
The function will then throw an exception if the semaphore could not acquire in time:

import {
 ConcurrencyExceedTimeoutException,
 Semaphore
} from "@heap-code/concurrency-synchronization";

async function bootstrap() {
 const semaphore = new Semaphore(2);

 const acquired1 = await semaphore.tryAcquire(100).then(() => true);
 const acquired2 = await semaphore.tryAcquire(100, 2).catch((error: unknown) => {
   if (error instanceof ConcurrencyExceedTimeoutException) {
     return false;
   }
 
   throw error;
 });

 console.log(acquired1); // true
 console.log(acquired2); // false
}
bootstrap();

Semaphore interrupt

A semaphore can be interrupted at any time.
All awaiting "threads" will then receive an exception:

import { ConcurrencyInterruptedException, Semaphore } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
 const semaphore = new Semaphore(1);

 void sleep(100).then(() => semaphore.interrupt({ code: 502 }, 2));

 const succeed = await Promise.all([
   semaphore.acquire(),
   semaphore.acquire(2),
   semaphore.tryAcquire(200),
   semaphore.tryAcquire(200, 1)
 ]).catch((error: unknown) => {
   if (error instanceof ConcurrencyInterruptedException) {
     return false;
   }
   throw error;
 });

 console.log(succeed); // false
 console.log(semaphore.permitsAvailable); // 2
}
bootstrap();

Semaphore releaseAll

Very similar to interrupt, but it does not throw an exception.

import { Semaphore } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
 const semaphore = new Semaphore(1);

 void sleep(100).then(() => semaphore.releaseAll(3));

 await Promise.all([
   semaphore.acquire(),
   semaphore.acquire(2),
   semaphore.tryAcquire(200),
   semaphore.tryAcquire(200, 1)
 ]);

 console.log("ok");
 console.log(semaphore.permitsAvailable); // 3
}
bootstrap()

Note: Unless it is really desired, prefer interrupt over releaseAll.

Producer-Consumer

The ProducerConsumer looks a lot like a Semaphore, but it returns values on acquire.

By default, all readings use an array:

import { ProducerConsumer } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1]);

  const time1 = 100;
  const time2 = 150;

  sleep(time1).then(() => producerConsumer.write(3, 4));
  sleep(time2).then(() => producerConsumer.write(2));

  const maxTime = Math.max(time1, time2);

  const before = performance.now();
  const valuesRead = await producerConsumer.read(4); // waiting until all is read
  const after = performance.now();

  const elapsed = after - before; // ~150
  console.log("Done. took %dms with expected %dms", elapsed, maxTime);
  console.log(valuesRead) // [1, 3, 4, 2]
}
bootstrap();

Producer-Consumer tryRead

It is possible to try to read some values in a given time limit.
The function will then throw an exception if it could not read in time:

import {
  ConcurrencyExceedTimeoutException,
  ProducerConsumer
} from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1, 2, 3]);

  const success1 = await producerConsumer.tryRead(100, 2).then(() => true);
  const success2 = await producerConsumer.tryRead(100, 2).catch((error: unknown) => {
    if (error instanceof ConcurrencyExceedTimeoutException) {
      return false;
    }
  
    throw error;
  });

  console.log(success1); // true
  console.log(success2); // false
}
bootstrap();

Producer-Consumer readOne

The read and tryRead have their "one"-method that do the same thing but return only one value instead of an array:

import { ProducerConsumer } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1, 2]);

  // const [value1] = producerConsumer.read(1);
  // can be written:
  const value1 = await producerConsumer.readOne();

  // const [value2] = producerConsumer.tryRead(100, 1);
 // can be written:
  const value2 = await producerConsumer.tryReadOne(100);
  
  console.log(value1, value2); // 1 2
}
bootstrap();

Producer-Consumer interrupt

A ProducerConsumer can be interrupted at any time.
All awaiting "threads" will then receive an exception:

import {
  ConcurrencyInterruptedException,
  ProducerConsumer
} from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1]);

  void sleep(100).then(() => producerConsumer.interrupt({ code: 502 }, [1, 2, 3]));

  const succeed = await Promise.all([
    producerConsumer.read(3),
    producerConsumer.readOne(),
    producerConsumer.tryRead(200, 3),
    producerConsumer.tryReadOne(200)
  ]).catch((error: unknown) => {
    if (error instanceof ConcurrencyInterruptedException) {
      return false;
    }
    throw error;
  });

  console.log(succeed); // false
  console.log(producerConsumer.permitsAvailable); // 3
}
bootstrap();

When to use

This package can be useful when writing test and wanting to synchronize events.

For example, the RxJs observable behavior slightly differs if it comes from regular observable or subjects.

firstValueFrom returns the value immediately.

So this difference can be omitted with the following:

import { ProducerConsumer } from "@heap-code/concurrency-synchronization";

describe("My test", () => {
  it("should work", async () => {
    const producerConsumer = new ProducerConsumer();
    const subscription = myObservable.subscribe(value => producerConsumer.write(value));
    // something that updates the observable
 
    // Need to pass 2 times in the event
    const [r1, r2] = await producerConsumer.tryRead(500, 2);
 
    expect(r1).toBe(1);
    expect(r2).toBe(2);
    subscription.unsubscribe()
  });
});

However, these synchronizations are generally not wanted in production code as it is wanted to keep the javascript code as "parallelized" as possible, to not block code branches.
Moreover, better solutions might exist for these problems, such as firstValueFrom and lastValueFrom for RxJs.

Releases

See information about breaking changes and release notes here.