redis-semaphore-es
v5.3.0
Published
Distributed mutex and semaphore based on Redis
Downloads
2
Readme
redis-semaphore
Mutex and Semaphore implementations based on Redis ready for distributed systems
Features
- Fail-safe (all actions performed by LUA scripts (atomic))
Usage
Installation
npm install --save redis-semaphore ioredis
# or
yarn add redis-semaphore ioredis
Mutex
new Mutex(redisClient, key [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClient
- required, configuredredis
clientkey
- required, key for locking resource (final key in redis:mutex:<key>
)options
- optionallockTimeout
- optional ms, time after mutex will be auto released (expired)acquireTimeout
- optional ms, max timeout for.acquire()
callacquireAttemptsLimit
- optional max number of attempts to be made in.acquire()
callretryInterval
- optional ms, time between acquire attempts if resource lockedrefreshInterval
- optional ms, auto-refresh interval; to disable auto-refresh behaviour set0
externallyAcquiredIdentifier
- optional uuid, previously acquired mutex identifier (useful for lock sharing between processes: acquire in scheduler, refresh and release in handler)onLockLost
- optional function, called when lock loss is detected due refresh cycle; default onLockLost throws unhandled LostLockError
Example
const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')
// TypeScript
// import { Mutex } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClient = new Redis()
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource')
await mutex.acquire()
try {
// critical code
} finally {
await mutex.release()
}
}
Example with lost lock handling
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource', {
// By default onLockLost throws unhandled LostLockError
onLockLost(err) {
console.error(err)
}
})
await mutex.acquire()
try {
while (mutex.isAcquired) {
// critical cycle iteration
}
} finally {
// It's safe to always call release, because if lock is no longer belongs to this mutex, .release() will have no effect
await mutex.release()
}
}
Example with optional lock
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource', {
acquireAttemptsLimit: 1
})
const lockAcquired = await mutex.tryAcquire()
if (!lockAcquired) {
return
}
try {
while (mutex.isAcquired) {
// critical cycle iteration
}
} finally {
// It's safe to always call release, because if lock is no longer belongs to this mutex, .release() will have no effect
await mutex.release()
}
}
Example with temporary refresh
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 120000,
refreshInterval: 15000
})
const lockAcquired = await mutex.tryAcquire()
if (!lockAcquired) {
return
}
try {
// critical cycle iteration
} finally {
// We want to let lock expire over time after operation is finished
await mutex.stopRefresh()
}
}
Example with externallyAcquiredIdentifier
const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')
// TypeScript
// import { Mutex } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClient = new Redis()
// scheduler app code
async function every10MinutesCronScheduler() {
const mutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 30 * 60 * 1e3, // lock for 30min
refreshInterval: 0
})
if (await mutex.tryAcquire()) {
someQueue.publish({ mutexIdentifier: mutex.identifier })
} else {
logger.info('Job already scheduled. Do nothing in current cron cycle')
}
}
// handler app code
async function queueHandler(queueMessageData) {
const { mutexIdentifier } = queueMessageData
const mutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 10 * 1e3, // 10sec
externallyAcquiredIdentifier: mutexIdentifier
})
// actually will do `refresh` with new lockTimeout instead of acquire
// if mutex was locked by another process or lock was expired - exception will be thrown (default refresh behavior)
await mutex.acquire()
try {
// critical code
} finally {
await mutex.release()
}
}
Semaphore
This implementation is slightly different from the algorithm described in the book, but the main idea has not changed.
zrank
check replaced with zcard
, so now it is fair as RedisLabs: Fair semaphore (see tests).
In edge cases (node time difference is greater than lockTimeout
) both algorithms are not fair due cleanup stage (removing expired members from sorted set), so FairSemaphore
API has been removed (it's safe to replace it with Semaphore
).
Most reliable way to use: lockTimeout
is greater than possible node clock differences, refreshInterval
is not 0 and is less enough than lockTimeout
(by default is lockTimeout * 0.8
)
new Semaphore(redisClient, key, maxCount [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClient
- required, configuredredis
clientkey
- required, key for locking resource (final key in redis:semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countoptions
optional SeeMutex
options
Example
const Semaphore = require('redis-semaphore').Semaphore
const Redis = require('ioredis')
// TypeScript
// import { Semaphore } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClient = new Redis()
async function doSomething() {
const semaphore = new Semaphore(redisClient, 'lockingResource', 5)
await semaphore.acquire()
try {
// maximum 5 simultaneously executions
} finally {
await semaphore.release()
}
}
MultiSemaphore
Same as Semaphore
with one difference - MultiSemaphore will try to acquire multiple permits instead of one.
MultiSemaphore
and Semaphore
shares same key namespace and can be used together (see test/src/RedisMultiSemaphore.test.ts).
new MultiSemaphore(redisClient, key, maxCount, permits [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClient
- required, configuredredis
clientkey
- required, key for locking resource (final key in redis:semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countpermits
- required, number of acquiring permitsoptions
optional SeeMutex
options
Example
const MultiSemaphore = require('redis-semaphore').MultiSemaphore
const Redis = require('ioredis')
// TypeScript
// import { MultiSemaphore } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClient = new Redis()
async function doSomething() {
const semaphore = new MultiSemaphore(redisClient, 'lockingResource', 5, 2)
await semaphore.acquire()
try {
// make 2 parallel calls to remote service which allow only 5 simultaneously calls
} finally {
await semaphore.release()
}
}
RedlockMutex
Distributed Mutex
version
new RedlockMutex(redisClients, key [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClients
- required, array of configuredredis
client connected to independent nodeskey
- required, key for locking resource (final key in redis:mutex:<key>
)options
optional SeeMutex
options
Example
const RedlockMutex = require('redis-semaphore').RedlockMutex
const Redis = require('ioredis')
// TypeScript
// import { RedlockMutex } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClients = [
new Redis('127.0.0.1:6377'),
new Redis('127.0.0.1:6378'),
new Redis('127.0.0.1:6379')
] // or cluster.nodes('master')
async function doSomething() {
const mutex = new RedlockMutex(redisClients, 'lockingResource')
await mutex.acquire()
try {
// critical code
} finally {
await mutex.release()
}
}
RedlockSemaphore
Distributed Semaphore
version
new RedlockSemaphore(redisClients, key, maxCount [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClients
- required, array of configuredredis
client connected to independent nodeskey
- required, key for locking resource (final key in redis:semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countoptions
optional SeeMutex
options
Example
const RedlockSemaphore = require('redis-semaphore').RedlockSemaphore
const Redis = require('ioredis')
// TypeScript
// import { RedlockSemaphore } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClients = [
new Redis('127.0.0.1:6377'),
new Redis('127.0.0.1:6378'),
new Redis('127.0.0.1:6379')
] // or cluster.nodes('master')
async function doSomething() {
const semaphore = new Semaphore(redisClients, 'lockingResource', 5)
await semaphore.acquire()
try {
// maximum 5 simultaneously executions
} finally {
await semaphore.release()
}
}
RedlockMultiSemaphore
Distributed MultiSemaphore
version
new RedlockMultiSemaphore(redisClients, key, maxCount, permits [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClients
- required, array of configuredredis
client connected to independent nodeskey
- required, key for locking resource (final key in redis:semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countpermits
- required, number of acquiring permitsoptions
optional SeeMutex
options
Example
const RedlockMultiSemaphore = require('redis-semaphore').RedlockMultiSemaphore
const Redis = require('ioredis')
// TypeScript
// import { RedlockMultiSemaphore } from 'redis-semaphore'
// import Redis from 'ioredis'
const redisClients = [
new Redis('127.0.0.1:6377'),
new Redis('127.0.0.1:6378'),
new Redis('127.0.0.1:6379')
] // or cluster.nodes('master')
async function doSomething() {
const semaphore = new RedlockMultiSemaphore(
redisClients,
'lockingResource',
5,
2
)
await semaphore.acquire()
try {
// make 2 parallel calls to remote service which allow only 5 simultaneously calls
} finally {
await semaphore.release()
}
}
License
MIT