npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@chumager/mongoose-mutex

v2.0.4

Published

A mutex node.js module who uses mongoose for locking

Downloads

19

Readme

mongoose-mutex

A mutex node.js module who uses mongoose for locking

Why

After searching in npm and realize that the only module for mutex with mongoose was deprecated I decided to do one for myself. The module it's very simple yet powerful, it uses the unique index for _id and TTL index to prevent mutex to stay locked forever.

Install

pnpm add @chumager/mongoose-mutex
#or
yarn add @chumager/mongoose-mutex

Use

Basics.

With function parameter.

import MutexSchema from "@chumager/mongoose-mutex";
//use mongoose to create the model.
const Mutex = mongoose.model("Mutex", MutexSchema);
//ensure indexes in case you need it.
await Mutex.ensureIndexes();

//lock
try {
  const result = await Mutex.lock({
    lockName: "lock", //required
    async fn() {
      //function definition with some data returned
    }
  });
  //there is no need to unlock, the module does it for you...
  console.log("result!!! 😬", result);
}catch(e){
  //check for locking error
  if(e.code !== "LOCK_TAKEN") console.error(e);
}

Without function parameter.

import MutexSchema from "@chumager/mongoose-mutex";
//use mongoose to create the model.
const Mutex = mongoose.model("Mutex", MutexSchema);
//ensure indexes in case you need it.
await Mutex.ensureIndexes();

//lock
try {
  const unlock = await Mutex.lock({
    lockName: "lock" //required
  });
  //do your stuff...
  await unlock(); 
  //await is only needed if you'll disconnect to 
  //the db any time soon to avoid trying to reach the db when disconnected;
}catch(e){
  //release the lock
  if (typeof unlock === "function") await unlock();
  console.log(Date.now(), "error", threadId, delay, e.message);
}

General Usage.

Exclusive lock with inner function

Only one process (worker), can take the lock, if a process try to lock and is taken will exit. In this example the lock take a function as parameter so there is no need to unlock. There is a 50% chance function will fail.

import {Worker, isMainThread, threadId} from "worker_threads";

//to support Promise.delay()
import {promiseHelpers} from "@chumager/promise-helpers";
promiseHelpers();

//the mutex schema
import MutexSchema from "../src/index.js";

//es6 doesn't support native __filename
import {URL} from "url";
const __filename = new URL("", import.meta.url).pathname;

//no mongoose, no mutex 😬
import db from "mongoose";

function createWorker() {
  const worker = new Worker(__filename, {env: process.env});
  worker.on("error", err => {
    console.log(Date.now(), "worker error", worker.threadId, err.message);
  });
}
async function main() {
  if (isMainThread) {
    [...Array(20).keys()].forEach(() => createWorker());
  } else {
    let delay;
    console.log(Date.now(), "start", threadId);

    await db.connect("mongodb://127.0.0.1:27018,127.0.0.1:27019,127.0.0.1:27020/test?replicaSet=rs0");
    const Mutex = db.model("Mutex", MutexSchema(db));
    await Mutex.ensureIndexes(); //to avoid test db problems and a good practice
    let start;
    delay = Math.round(Math.random() * 1e3);
    try {
      const result = await Mutex.lock({
        lockName: "lock1", //required
        async fn() {
          start = Date.now();
          console.log(Date.now(), "locked", threadId, delay);
          if (Math.random() > 0.5) throw new Error("function fails");
          await Promise.delay(delay); //simulate some execution time.
          return delay * 2; //hard math processing
        }
      });
      console.log(Date.now(), "done", threadId, result, Date.now() - start);
    } catch (e) {
      if (e.code === "LOCK_TAKEN") console.log(Date.now(), "lock taken, bye", threadId);
      else console.log(Date.now(), "error", threadId, delay, e.message);
    } finally {
      await db.disconnect();
    }
  }
}
main();

Exclusive lock with outher function

Only one process (worker), can take the lock, if a process try to lock and is taken will exit. In this example the lock doesn't take a function as parameter so you need to unlock checking for errors. There is a 50% chance function will fail.

import {Worker, isMainThread, threadId} from "worker_threads";

//to support Promise.delay()
import {promiseHelpers} from "@chumager/promise-helpers";
promiseHelpers();

//the mutex schema
import MutexSchema from "../src/index.js";

//es6 doesn't support native __filename
import {URL} from "url";
const __filename = new URL("", import.meta.url).pathname;

//no mongoose, no mutex 😬
import db from "mongoose";

function createWorker() {
  const worker = new Worker(__filename, {env: process.env});
  worker.on("error", err => {
    console.log(Date.now(), "worker error", worker.threadId, err.message);
  });
}
async function main() {
  if (isMainThread) {
    [...Array(20).keys()].forEach(() => createWorker());
  } else {
    console.log(Date.now(), "start", threadId);

    await db.connect("mongodb://127.0.0.1:27018,127.0.0.1:27019,127.0.0.1:27020/test?replicaSet=rs0");
    const Mutex = db.model("Mutex", MutexSchema(db));
    await Mutex.ensureIndexes(); //to avoid test db problems and a good practice
    const delay = Math.round(Math.random() * 1e3);
    let unlock;
    try {
      unlock = await Mutex.lock({
        lockName: "lock1" //required
      });
      const start = Date.now();
      console.log(Date.now(), "locked", threadId, delay);
      //do your stuff.
      if (Math.random() > 0.5) throw new Error("function fails");
      await Promise.delay(delay);
      console.log(Date.now(), "done", threadId, Date.now() - start);
      await unlock(); //await needed because we disconnect immediately
    } catch (e) {
      if (e.code === "LOCK_TAKEN") console.log(Date.now(), "lock taken, bye", threadId);
      else {
        //release the lock
        if (typeof unlock === "function") await unlock();
        console.log(Date.now(), "error", threadId, delay, e.message);
      }
    } finally {
      await db.disconnect();
    }
  }
}
main();

Mutex with inner function

Only one process (worker), can take the lock, if a process try to lock and is taken will wait until it could be taken. In this example the lock take a function as parameter so there is no need to unlock. There is a 50% chance function will fail.

import {Worker, isMainThread, threadId} from "worker_threads";

//to support Promise.delay()
import {promiseHelpers} from "@chumager/promise-helpers";
promiseHelpers();

//the mutex schema
import MutexSchema from "../src/index.js";

//es6 doesn't support native __filename
import {URL} from "url";
const __filename = new URL("", import.meta.url).pathname;

//no mongoose, no mutex 😬
import db from "mongoose";

function createWorker() {
  const worker = new Worker(__filename, {env: process.env});
  worker.on("error", err => {
    console.log(Date.now(), "worker error", worker.threadId, err.message);
  });
}
async function main() {
  if (isMainThread) {
    [...Array(20).keys()].forEach(() => createWorker());
  } else {
    let delay;
    console.log(Date.now(), "start", threadId);

    await db.connect("mongodb://127.0.0.1:27018,127.0.0.1:27019,127.0.0.1:27020/test?replicaSet=rs0");
    const Mutex = db.model("Mutex", MutexSchema(db));
    await Mutex.ensureIndexes(); //to avoid test db problems and a good practice
    let start;
    delay = Math.round(Math.random() * 1e3);
    try {
      const result = await Mutex.waitLock({
        lockName: "lock1",
        async fn() {
          start = Date.now();
          console.log(Date.now(), "locked", threadId, delay);
          if (Math.random() > 0.5) throw new Error("function fails");
          await Promise.delay(delay); //simulate some execution time.
          return delay * 2; //hard math processing
        }
      });
      console.log(Date.now(), "done", threadId, result, Date.now() - start);
    } catch (e) {
      console.log(Date.now(), "error", threadId, delay, e.message);
    } finally {
      await db.disconnect();
    }
  }
}
main();

Mutex with inner function and timeout

Only one process (worker), can take the lock, if a process try to lock and is taken will wait until it could be taken with a timeout. In this example the lock take a function as parameter so there is no need to unlock. There is a 50% chance function will fail.

import {Worker, isMainThread, threadId} from "worker_threads";

//to support Promise.delay()
import {promiseHelpers} from "@chumager/promise-helpers";
promiseHelpers();

//the mutex schema
import MutexSchema from "../src/index.js";

//es6 doesn't support native __filename
import {URL} from "url";
const __filename = new URL("", import.meta.url).pathname;

//no mongoose, no mutex 😬
import db from "mongoose";

function createWorker() {
  const worker = new Worker(__filename, {env: process.env});
  worker.on("error", err => {
    console.log(Date.now(), "worker error", worker.threadId, err.message);
  });
}
async function main() {
  if (isMainThread) {
    [...Array(20).keys()].forEach(() => createWorker());
  } else {
    let delay;
    console.log(Date.now(), "start", threadId);

    await db.connect("mongodb://127.0.0.1:27018,127.0.0.1:27019,127.0.0.1:27020/test?replicaSet=rs0");
    const Mutex = db.model("Mutex", MutexSchema(db));
    await Mutex.ensureIndexes(); //to avoid test db problems and a good practice
    let start;
    delay = Math.round(Math.random() * 1e3);
    try {
      const result = await Mutex.waitLock({
        lockName: "lock1",
        timeout: 2000,
        async fn() {
          start = Date.now();
          console.log(Date.now(), "locked", threadId, delay);
          if (Math.random() > 0.5) throw new Error("function fails");
          await Promise.delay(delay); //simulate some execution time.
          return delay * 2; //hard math processing
        }
      });
      console.log(Date.now(), "done", threadId, result, Date.now() - start);
    } catch (e) {
      if (e.code === "TIMEOUT") console.log(Date.now(), "timeout, bye...", threadId);
      else console.log(Date.now(), "error", threadId, delay, e.message);
    } finally {
      await db.disconnect();
    }
  }
}
main();

Why

I don't need a mutex (for now), what I needed was a way to avoid other processes to consume an API, but this way it'll help others developers...

Mutex.

The Mutex functions allows to define the mongoose model to use to lock.

Static Model Functions

.lock()

Lock function only allows one lock simultaneously and rejects if is taken. Option | Type | Default | Required | Definition ------ | ---- | ------- | -------- | ---------- options | Object | {TTL: 60} | yes | the options object. options.lockName | String | undefined | yes | the name of the lock, same locking logic, uses the same lockName. options.description | String | undefined | no | add a description to the lock document in the db, just for debug. options.metadata | Object | undefined | no | an object that allows to inserta any data you want to the lock. options.fn | Function | undefined | no | the function to be executed after lock and unlock after complete. options.TTL | Number | 60 | no | Number of seconds to wait until the lock is released, uses the mongodb TTL index logic. If you expect your code last more than 60 seconds then you must change this value, otherwise other process will take the lock and eventually the first one will release the second one.

Returns.

If there is a fn key in options, then the result will be the fulfilled o rejected value of this function. If there is no fn key, an unlock function will be returned. In case there is a lock error, it will reject with an error with core equals to "LOCK_TAKEN"

.waitLock()

WaitLock function only allows one lock simultaneously and waits until lock is released or timeout is accomplished.

Option | Type | Default | Required | Definition ------ | ---- | ------- | -------- | ---------- options | Object | {TTL: 60} | yes | the options object. options.lockName | String | undefined | yes | the name of the lock, same locking logic, uses the same lockName. options.description | String | undefined | no | add a description to the lock document in the db, just for debug. options.metadata | Object | undefined | no | an object that allows to inserta any data you want to the lock. options.fn | Function | undefined | no | the function to be executed after lock and unlock after complete. options.TTL | Number | 60 | no | Number of seconds to wait until the lock is released, uses the mongodb TTL index logic. If you expect your code last more than 60 seconds then you must change this value, otherwise other process will take the lock and eventually the first one will release the second one. options.timeout | Number | undefined | no | If the timeout is defined and the lock is taken, waits until timeout ms to take the lock, otherwise reject.

Returns.

If there is a fn key in options, then the result will be the fulfilled o rejected value of this function. If there is no fn key, an unlock function will be returned. In case there is a timeout, it will reject with an error with core equals to "TIMEOUT"

.isLocked(lockName)

Returns.

It return if the lock is taken or not, but you have to realize the lock could be taken or released microseconds after it returns, , so use it carefully.

Examples (not code).

  • If you have a multi process (workers) service and all want to do one task, you can use .lock() so only one process will do the task.
    • Multiprocess service who uses Model.ensureIndexes()
  • If you need an infinite queue, you can use .waitLock() so all the task will be serialized.
    • A service only allows one request at a time.

Caveats.

  • .lock() and .waitLock() uses the unique mongodb error code 11000 to acknowledge if a lock is taken, is you use some unique validation plugin, it could implies the error will not be caught.
  • The timeout uses microtasks and event loop to reject, if you have a lot of sync operations in between (maybe use of then/catch for lock/waitLock) your timeout could be very past the value.