npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

redis-semaphore

v5.6.1

Published

Distributed mutex and semaphore based on Redis

Downloads

1,595,271

Readme

redis-semaphore

NPM version Build status FOSSA Status Coverage Status Maintainability Known Vulnerabilities FOSSA Status

Mutex and Semaphore implementations based on Redis ready for distributed systems

Features

  • Fail-safe (all actions performed by LUA scripts (atomic))

Usage

Installation

npm install --save redis-semaphore ioredis
# or
yarn add redis-semaphore ioredis

ioredis is the officially supported Redis client. This library's test code runs on it.

Users of other Redis clients should ensure ioredis-compatible API (see src/types.ts) when creating lock objects.

Mutex

See RedisLabs: Locks with timeouts

new Mutex(redisClient, key [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8, identifier = crypto.randomUUID() }])
  • redisClient - required, configured redis client
  • key - required, key for locking resource (final key in redis: mutex:<key>)
  • options - optional
    • lockTimeout - optional ms, time after mutex will be auto released (expired)
    • acquireTimeout - optional ms, max timeout for .acquire() call
    • acquireAttemptsLimit - optional max number of attempts to be made in .acquire() call
    • retryInterval - optional ms, time between acquire attempts if resource locked
    • refreshInterval - optional ms, auto-refresh interval; to disable auto-refresh behaviour set 0
    • identifier - optional uuid, custom mutex identifier. Must be unique between parallel executors, otherwise multiple locks with same identifier can be treated as the same lock holder. Override only if you know what you are doing (see acquiredExternally option).
    • acquiredExternally - optional true, If identifier provided and acquiredExternally is true then _refresh will be used instead of _acquire in .tryAcquire()/.acquire(). Useful for lock sharing between processes: acquire in scheduler, refresh and release in handler.
    • onLockLost - optional function, called when lock loss is detected due refresh cycle; default onLockLost throws unhandled LostLockError

Example

const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')

// TypeScript
// import { Mutex } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClient = new Redis()

async function doSomething() {
  const mutex = new Mutex(redisClient, 'lockingResource')
  await mutex.acquire()
  try {
    // critical code
  } finally {
    await mutex.release()
  }
}

Example with lost lock handling

async function doSomething() {
  const mutex = new Mutex(redisClient, 'lockingResource', {
    // By default onLockLost throws unhandled LostLockError
    onLockLost(err) {
      console.error(err)
    }
  })
  await mutex.acquire()
  try {
    while (mutex.isAcquired) {
      // critical cycle iteration
    }
  } finally {
    // It's safe to always call release, because if lock is no longer belongs to this mutex, .release() will have no effect
    await mutex.release()
  }
}

Example with optional lock

async function doSomething() {
  const mutex = new Mutex(redisClient, 'lockingResource', {
    acquireAttemptsLimit: 1
  })
  const lockAcquired = await mutex.tryAcquire()
  if (!lockAcquired) {
    return
  }
  try {
    while (mutex.isAcquired) {
      // critical cycle iteration
    }
  } finally {
    // It's safe to always call release, because if lock is no longer belongs to this mutex, .release() will have no effect
    await mutex.release()
  }
}

Example with temporary refresh

async function doSomething() {
  const mutex = new Mutex(redisClient, 'lockingResource', {
    lockTimeout: 120000,
    refreshInterval: 15000
  })
  const lockAcquired = await mutex.tryAcquire()
  if (!lockAcquired) {
    return
  }
  try {
    // critical cycle iteration
  } finally {
    // We want to let lock expire over time after operation is finished
    await mutex.stopRefresh()
  }
}

Example with dynamically adjusting existing lock

const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')

// TypeScript
// import { Mutex } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClient = new Redis()

// This creates an original lock
const preMutex = new Mutex(redisClient, 'lockingResource', {
  lockTimeout: 10 * 1e3, // lock for 10s
  refreshInterval: 0
});

// This modifies lock with a new TTL and starts refresh
const mutex = new Mutex(redisClient, 'lockingResource', {
  identifier: preMutex.identifier,
  acquiredExternally: true, // required in this case
  lockTimeout: 30 * 60 * 1e3, // lock for 30min
  refreshInterval: 60 * 1e3
});

Example with shared lock between scheduler and handler apps

const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')

// TypeScript
// import { Mutex } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClient = new Redis()

// scheduler app code
async function every10MinutesCronScheduler() {
  const mutex = new Mutex(redisClient, 'lockingResource', {
    lockTimeout: 30 * 60 * 1e3, // lock for 30min
    refreshInterval: 0
  })
  if (await mutex.tryAcquire()) {
    someQueue.publish({ mutexIdentifier: mutex.identifier })
  } else {
    logger.info('Job already scheduled. Do nothing in current cron cycle')
  }
}

// handler app code
async function queueHandler(queueMessageData) {
  const { mutexIdentifier } = queueMessageData
  const mutex = new Mutex(redisClient, 'lockingResource', {
    lockTimeout: 10 * 1e3, // 10sec
    identifier: mutexIdentifier,
    acquiredExternally: true // required in this case
  })

  // actually will do `refresh` with new lockTimeout instead of acquire
  // if mutex was locked by another process or lock was expired - exception will be thrown (default refresh behavior)
  await mutex.acquire()

  try {
    // critical code
  } finally {
    await mutex.release()
  }
}

Semaphore

See RedisLabs: Basic counting sempahore

This implementation is slightly different from the algorithm described in the book, but the main idea has not changed.

zrank check replaced with zcard, so now it is fair as RedisLabs: Fair semaphore (see tests).

In edge cases (node time difference is greater than lockTimeout) both algorithms are not fair due cleanup stage (removing expired members from sorted set), so FairSemaphore API has been removed (it's safe to replace it with Semaphore).

Most reliable way to use: lockTimeout is greater than possible node clock differences, refreshInterval is not 0 and is less enough than lockTimeout (by default is lockTimeout * 0.8)

new Semaphore(redisClient, key, maxCount [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
  • redisClient - required, configured redis client
  • key - required, key for locking resource (final key in redis: semaphore:<key>)
  • maxCount - required, maximum simultaneously resource usage count
  • options optional See Mutex options

Example

const Semaphore = require('redis-semaphore').Semaphore
const Redis = require('ioredis')

// TypeScript
// import { Semaphore } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClient = new Redis()

async function doSomething() {
  const semaphore = new Semaphore(redisClient, 'lockingResource', 5)
  await semaphore.acquire()
  try {
    // maximum 5 simultaneously executions
  } finally {
    await semaphore.release()
  }
}

MultiSemaphore

Same as Semaphore with one difference - MultiSemaphore will try to acquire multiple permits instead of one.

MultiSemaphore and Semaphore shares same key namespace and can be used together (see test/src/RedisMultiSemaphore.test.ts).

new MultiSemaphore(redisClient, key, maxCount, permits [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
  • redisClient - required, configured redis client
  • key - required, key for locking resource (final key in redis: semaphore:<key>)
  • maxCount - required, maximum simultaneously resource usage count
  • permits - required, number of acquiring permits
  • options optional See Mutex options

Example

const MultiSemaphore = require('redis-semaphore').MultiSemaphore
const Redis = require('ioredis')

// TypeScript
// import { MultiSemaphore } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClient = new Redis()

async function doSomething() {
  const semaphore = new MultiSemaphore(redisClient, 'lockingResource', 5, 2)

  await semaphore.acquire()
  try {
    // make 2 parallel calls to remote service which allow only 5 simultaneously calls
  } finally {
    await semaphore.release()
  }
}

RedlockMutex

Distributed Mutex version

See The Redlock algorithm

new RedlockMutex(redisClients, key [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
  • redisClients - required, array of configured redis client connected to independent nodes
  • key - required, key for locking resource (final key in redis: mutex:<key>)
  • options optional See Mutex options

Example

const RedlockMutex = require('redis-semaphore').RedlockMutex
const Redis = require('ioredis')

// TypeScript
// import { RedlockMutex } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClients = [
  new Redis('127.0.0.1:6377'),
  new Redis('127.0.0.1:6378'),
  new Redis('127.0.0.1:6379')
] // "Those nodes are totally independent, so we don’t use replication or any other implicit coordination system."

async function doSomething() {
  const mutex = new RedlockMutex(redisClients, 'lockingResource')
  await mutex.acquire()
  try {
    // critical code
  } finally {
    await mutex.release()
  }
}

RedlockSemaphore

Distributed Semaphore version

See The Redlock algorithm

new RedlockSemaphore(redisClients, key, maxCount [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
  • redisClients - required, array of configured redis client connected to independent nodes
  • key - required, key for locking resource (final key in redis: semaphore:<key>)
  • maxCount - required, maximum simultaneously resource usage count
  • options optional See Mutex options

Example

const RedlockSemaphore = require('redis-semaphore').RedlockSemaphore
const Redis = require('ioredis')

// TypeScript
// import { RedlockSemaphore } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClients = [
  new Redis('127.0.0.1:6377'),
  new Redis('127.0.0.1:6378'),
  new Redis('127.0.0.1:6379')
] // "Those nodes are totally independent, so we don’t use replication or any other implicit coordination system."

async function doSomething() {
  const semaphore = new Semaphore(redisClients, 'lockingResource', 5)
  await semaphore.acquire()
  try {
    // maximum 5 simultaneously executions
  } finally {
    await semaphore.release()
  }
}

RedlockMultiSemaphore

Distributed MultiSemaphore version

See The Redlock algorithm

new RedlockMultiSemaphore(redisClients, key, maxCount, permits [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
  • redisClients - required, array of configured redis client connected to independent nodes
  • key - required, key for locking resource (final key in redis: semaphore:<key>)
  • maxCount - required, maximum simultaneously resource usage count
  • permits - required, number of acquiring permits
  • options optional See Mutex options

Example

const RedlockMultiSemaphore = require('redis-semaphore').RedlockMultiSemaphore
const Redis = require('ioredis')

// TypeScript
// import { RedlockMultiSemaphore } from 'redis-semaphore'
// import Redis from 'ioredis'

const redisClients = [
  new Redis('127.0.0.1:6377'),
  new Redis('127.0.0.1:6378'),
  new Redis('127.0.0.1:6379')
] // "Those nodes are totally independent, so we don’t use replication or any other implicit coordination system."

async function doSomething() {
  const semaphore = new RedlockMultiSemaphore(
    redisClients,
    'lockingResource',
    5,
    2
  )

  await semaphore.acquire()
  try {
    // make 2 parallel calls to remote service which allow only 5 simultaneously calls
  } finally {
    await semaphore.release()
  }
}

Development

yarn --immutable
./setup-redis-servers.sh
yarn dev

License

MIT

FOSSA Status