@distn/redis-streams
v0.1.13
Published
A class for convenient work with reading and writing messages to redis streams.
Downloads
12
Readme
Вспомогательный класс для работы со стримами redis.
Основные функции
На данный момент реализовано
- Организация чтения новых сообщений стрима или ссписка стримов (через блокирующий режим XREADGROUP).
- Запуск обработчика для каждого нового сообщения
- Повторных запуск обработчика для всех полученных, но еще не обработанных сообщениях (режим восстановления после сбоя)
- Отправка сообщений в стрим
Использование списка стримов
Возможность использования списка стримов сделана для возможности чтения сообщений сразу из нескольких стримов через одно соединение с redis (такая возможность есть в redis при использовании XREAD и XREADGROUP). При этом все дополнительные опции и настройки инициализации применяются одинаково к каждому из стримов, указанных в этом списке.
Логика по повторному процессингу необработанных сообщений стрима
Сообщения стрима помечается как успешно обработанное (удаляется из pending списка), когда успешно выполнена функция обработчик (которая передана в параметре callbackFunction
).
Если функция обработчик вызывается в асинхронном режиме (isAsyncRun: true
), то мы сам факт ее вызова считаем равным успешной обработке сообщения. То есть в аинхронном режиме сообщения вообще никогда не должны задерживаться в pending списке.
Если же вызов функции обработчика идет синхронный (isAsyncRun: false
), то мы дожидаемся окончания работы функции-обработки, и только потом считаем сообщение успешно обработанным (удаляем его из pending списка этой группы в этом стриме).
При этом, мы имеем возможность вернуть функцией обработчиком значение = false - и в этом случае сообщение не будет помечаться как обработанное, даже после завершения работы функции обработчика (т.е. оно останется в pending списке).
Пример того, как выглядит функция обработчик, которая может оставлять плохо обработанное сообщение в pending-списке (например, для их повторной обработки позже, через reRunPendingTimeout)
// функция обработчик
async function processMessage(message, streamName) {
if (!condition) {
return false
}
}
// инициализируем класс
const testStreams = new RedisStreams({
ioredis: global.redisGlobal,
streamsNames: ['candle:5s:binanceusdm:DOT/USDT', 'candle:5s:binanceusdm:SAND/USDT'],
clearStreams: true,
clearStreamsTimeout: 20*60*1000,
})
// инициализируем чтение
testStreams.readGroup({
groupName: 'tester',
consumerName: 'tester_app',
callbackFunction: async (message, streamName) => {return await processMessage(message, streamName) },
isAsyncRun: false,
clearOldPending: true,
clearOldPendingTimeout: 12*3600*1000,
reRunPending: true,
reRunPendingTimeout: 20*1000,
reRunPendingCheckPeriod: 60*1000
})
NB: важно не забыть про return
перед await processMessage
, когда processMessage оборачивается в безымянную функцию.
Указание id первого сообщения при чтении
По-умолчанию, readGroup читает сообщения, которые были добавлены в стрим после создания группы потребителей стрима и еще не были прочитаны ей.
Но бывают ситуации:
- когда нужно получить старые сообщения стрима;
- либо наоборот, указать, что для уже существующей группы нужно пропустить накопленные в стриме сообщения и начать чтение сообщений, добавленных только с текущего момента.
Для того, чтобы изменить дефолтное поведения есть 2 параметра, передаваемые в функцию readGroup
:
startMessageId
- параметр, который позволяет указать, с какой айдишки мы будем начинать чтение стримов при каждом запуске readGroup;newGroupStartMessageId
- параметр, который работает, как предыдущий, но применяется только один раз (для каждой группы стрима) - в момент создания этой группы (а не при каждом вызовеreadGroup
).
Если в качестве параметров указать 0 - то будут получены все сообщения стрима с самого начала (если они еще не удалены, конечно). А если указать '$', то будут получены только сообщения, добавленные с текущего момента.
Обычно, на практике, удобно использовать значение времени в миллисекундах (т.к. для дефолтных айдишек в сообщениях стримов используется именно время). Вот пример, как сдвинуть оконо чтения на 3 минуты назад:
testStreams.readGroup({
groupName: 'tester',
consumerName: 'tester_app',
callbackFunction: async (message, streamName) => {return await processMessage(message, streamName) },
isAsyncRun: false,
clearOldPending: true,
clearOldPendingTimeout: 12*3600*1000,
reRunPending: true,
reRunPendingTimeout: 20*1000,
reRunPendingCheckPeriod: 60*1000,
startMessageId: await this.redisTime.now() - 3*60*1000,
// newGroupStartMessageId: await this.redisTime.now() - 3*60*1000,
})
(параметр startMessageId
более "сильный", он применяется всегда, даже при использовании уже существующей группы потребителей - поэтому когда он задан, нет смысла использовать newGroupStartMessageId
, который применяется только в момент создания новой группы потребителей стрима).
Возможность корректировать список стримов в процессе чтения
Иногда бывают ситуации, когда в процессе работы нужно изменить список стримов (которые были переданы в конструктор RedisStreams). Чтобы сделать это бесшовно, без пересоздания объекта RedisStream, сделана функция changeStreamsList()
.
Она позволяет изменить список стримов даже в момент, когда уже запущено блокирующее чтение. Новый список стримов применится, как только произойдет последнее чтение по старому списку - вызов нового блокирующего чтения (происходит в рекурсии) будет произведен уже по новому списку стримов.
Пример вызова:
testStreams.changeStreamsList({
streamsNames: newStreamsToReadArray
})
Параметры startMessageId
и newGroupStartMessageId
, описанные выше для функции readGroup, и применяемые для указания id сообщения, которое будет прочитано первым, также могут использоваться при вызове функции changeStreamsList.
В этом случае, если будет передан startMessageId
, он применится к чтению сообщений из всех стримов (перезапишется значение этой настройки, которое могло быть передано ранее при вызове .readGroup()). Соответственно, если значение startMessageId
не задано вообще, то для новых стримов будет применено значение, которое могло быть передано ранее при вызове .readGroup().
А newGroupStartMessageId
применится только для списка новых добавляемых стримов, когда для них будут создаваться группы (и при условии, что их еще нет). На старые стримы, соответственно, этот параметр никак не повлияет.
testStreams.changeStreamsList({
streamsNames: newStreamsToReadArray,
startMessageId: await this.redisTime.now() - 5*60*1000,
// newGroupStartMessageId: await this.redisTime.now() - 3*60*1000
})
Этот функционал применяется в приложении archiver
, чтобы обеспечить непрерывное чтение из старых стримов придобавлении задание на чтение новых (чтение из всех стримов там выполняется одной командой).
Уменьшение периодичности чтения
По-умолчанию блокирующее чтение с помощью вызова redisStreams.readGroup(...)
(и внутри вызова XREADGROUP
) происходит в режиме реального времени. То есть как только появляется новое сообщение в любом из стримов, которые переданны на вход этой функции для мониторинга, срабатывает передача этого сообщение в колбэк-функцию. После чего сразу происходит повторный запуск блокирующего чтения (XREADGROUP
).
В некоторых ситуациях, когда у нас нет требования к оперативности получения новых сообщений, это избыточно и влечет ненужную доп. нагрузку. И тогда мы можем перезапускать это повторное блокирующее чтение через какую-то паузу.
Это применяется сейчас в приложении archiver
- там функционал подразумевает одновременное чтение из десятков стримов (одной командой XREADGROUP
), и оно возвращается результат, как только появилось новое сообщение хотя бы в одном из этих десятков стримов. Это избыточно - XREADGROUP
перезапускается слишком часто. В случае с приложением archiver
, где нет требований по оперативности получения данных, допустимо запускать XREADGROUP
, например, раз в 5 секунд, и получать сразу все сообщения, накопленные во всех стримах за этот период.
Пример применения параметра readGroupInterval
testStreams.readGroup({
...
readGroupInterval: 5*1000
...
})