@targetprocess/balancer-core
v1.5.2
Published
Message balancer that allow to process messages grouped by partition key in a fair way
Downloads
3,678
Readme
Balancer core
Usage sample
import {
MessageBalancer,
MessageCache,
MessageStorage,
Db,
migrateDb,
createMethodPerLevelLoggerAdapter,
DefaultMessageBalancerDiagnosticsAdapter,
DefaultMessageCacheDiagnosticsAdapter,
DefaultMessageStorageDiagnosticsAdapter,
ProcessMessageBatchResultItem
} from '@targetprocess/balancer-core'
import {makeLogger} from 'loggerism'
import {Pool} from 'pg'
import * as promClient from 'prom-client'
type MessageBalancerA = MessageBalancer<{data?: string; retry?: number}>
type MessageBalancerB = MessageBalancer<{data?: string}>
async function main() {
const [balancerA, balancerB] = await createAndInitBalancers()
await balancerA.storeMessage({
partitionKey: 'partition#1',
content: Buffer.alloc(128),
properties: {data: 'some arbitrary data'}
})
await balancerB.storeMessage({
partitionKey: 'partition#2',
content: Buffer.alloc(128),
properties: {data: 'some arbitrary data'}
})
await balancerA.processNextMessage(async message => {
try {
const {partitionGroup, partitionKey} = message
console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
return {type: 'Ok'}
} catch {
const properties = {
...message.properties,
retry: (message.properties?.retry || 0) + 1
}
// Push message back with updated properties
return {type: 'Requeue', update: {properties}}
}
})
// Using batch API
balancerB.processNextMessageBatch(
async messages => {
const results = [] as ProcessMessageBatchResultItem<{data?: string}>[]
for (const message of messages) {
try {
const {partitionGroup, partitionKey} = message
console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
results.push({messageId: message.messageId, type: 'Ok'})
} catch {
// Push message back with no properties update
results.push({messageId: message.messageId, type: 'Requeue'})
}
}
return results
},
{maxBatchSize: 10}
)
}
async function createAndInitBalancers(): Promise<[MessageBalancerA, MessageBalancerB]> {
const pool = new Pool({
connectionString: process.env.POSTGRES_CONNECTION_STRING,
max: process.env.POSTGRES_POOL_MAX
})
pool.on('error', error => {
// Handle error here
console.error(error)
})
await migrateDb({pool})
const logger = createMethodPerLevelLoggerAdapter(
makeLogger({
logLevel: process.env.LOG_LEVEL,
handleExceptions: false
})
)
const db = new Db({pool})
const storage = new MessageStorage({
db,
diagnostics: new DefaultMessageStorageDiagnosticsAdapter({
logger,
createMessagesDurationMetric: summaryMetric('create_messages_duration_in_ms'),
updateMessagesDurationMetric: summaryMetric('update_messages_duration_in_ms'),
removeMessagesDurationMetric: summaryMetric('remove_messages_duration_in_ms'),
readMessagesDurationMetric: summaryMetric('read_messages_duration_in_ms')
})
})
const cache = new MessageCache({
maxMessageSize: Number(process.env.MESSAGE_CACHE_MAX_MESSAGE_SIZE),
maxSize: Number(process.env.MESSAGE_CACHE_MAX_SIZE),
diagnostics: new DefaultMessageCacheDiagnosticsAdapter({
logger,
messageCountMetric: gaugeMetric('cache_message_count'),
messageSizeMetric: gaugeMetric('cache_message_size')
})
})
const balancerA = new MessageBalancer<{data?: string; retry?: number}>({
partitionGroup: 'A',
lockPartition: true,
storage,
cache,
diagnostics: new DefaultMessageBalancerDiagnosticsAdapter({
logger,
endToEndMessageProcessingDurationMetric: summaryMetric('balancer_a_end_to_end_processing_duration_in_ms'),
centrifugePartitionCountMetric: gaugeMetric('balancer_a_centrifuge_partition_count'),
centrifugeMessageCountMetric: gaugeMetric('balancer_a_centrifuge_message_count')
})
})
const balancerB = new MessageBalancer<{data?: string}>({
partitionGroup: 'B',
partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
lockPartition: true,
storage,
cache,
diagnostics: new DefaultMessageBalancerDiagnosticsAdapter({
logger,
endToEndMessageProcessingDurationMetric: summaryMetric('balancer_b_end_to_end_processing_duration_in_ms'),
centrifugePartitionCountMetric: gaugeMetric('balancer_b_centrifuge_partition_count'),
centrifugeMessageCountMetric: gaugeMetric('balancer_b_centrifuge_message_count')
})
})
await balancerA.init()
await balancerB.init()
return [balancerA, balancerA]
}
function summaryMetric(name: string) {
return new promClient.Summary({
name,
help: 'Write it yourself',
percentiles: [0.1, 0.5, 0.9, 0.99],
maxAgeSeconds: 10 * 60,
ageBuckets: 10
})
}
function gaugeMetric(name: string) {
return new promClient.Gauge({
name,
help: 'Write it yourself'
})
}
main()