npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@mia-platform/flow-manager-client

v1.1.1

Published

A library to simplify the interaction with the Flow Manager

Downloads

1,043

Readme

Flow Manager Client

Build Status javascript style guide Coverage Status

This library simplifies the interaction between a generic microservice and the Flow Manager service.

Installation

npm i --save @mia-platform/flow-manager-client

Testing locally

Create a network connection

docker network create app --driver bridge

Pull the images

docker pull bitnami/zookeeper
docker pull bitnami/kafka

Run the images

docker run -d --rm --name zookeeper --network=app -e ALLOW_ANONYMOUS_LOGIN=yes -p 2180:2181 bitnami/zookeeper

docker run --rm \
  --network app \
  --name=kafka \
  -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_CFG_ADVERTISED_LISTENERS='PLAINTEXT://127.0.0.1:9092,INTERNAL://localhost:9093' \
  -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP='PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT' \
  -e KAFKA_CFG_LISTENERS='PLAINTEXT://0.0.0.0:9092,INTERNAL://0.0.0.0:9093' \
  -e KAFKA_INTER_BROKER_LISTENER_NAME='INTERNAL' \
  -e ALLOW_PLAINTEXT_LISTENER=yes \
  -p 2181:2181 \
  -p 443:9092 \
  -p 9092:9092 \
  -p 9093:9093 \
  bitnami/kafka

Run tests

npm test

Configuration

The service relies upon a client builder, which allows to configure the underling Kafka client options and the different enabled components.

The library use KafkaJS to actually connect to Kafka. It is recommended to have read its documentation in case some parameter or configuration is not clear.

Below are reported which properties of each Kafka component can be customized:

kafkaConfig = {
  clientId: { type: 'string', minLength: 1 },
  brokers: { type: 'string', minLength: 1, description: 'string of comma separated brokers address' },
  authMechanism: { type: 'string', nullable: true },
  username: { type: 'string', nullable: true },
  password: { type: 'string', nullable: true },
  connectionTimeout: { type: 'integer', default: 10000, description: 'milliseconds' },
  authenticationTimeout: { type: 'integer', default: 10000, description: 'milliseconds' },
  connectionRetries: { type: 'integer', default: 5, description: 'number of times the client should try to connect'}
}

consumerConfig = {
  groupId: { type: 'string' }, // required
  sessionTimeout: { type: 'integer', default: 30000 },
  rebalanceTimeout: { type: 'integer', default: 60000 },
  heartbeatInterval: { type: 'integer', default: 3000 },
  allowAutoTopicCreation: { type: 'boolean', default: false },
}

producerConfig = {
  allowAutoTopicCreation: { type: 'boolean', default: false },
}

Example:

const { FMClientBuilder } = require('@mia-platform/flow-manager-client')

const client = new FMClientBuilder(pinoLogger, kafkaConfig)
  .configureCommandsExecutor(commandsTopic, consumerConf, partitionsConsumedConcurrently)
  .configureEventsEmitter(eventsTopic, producerConf)
  .build()

// define which action should be exected when the specified command is received
client.onCommand(
  'COMMAND',
  async (sagaId, metadata, emitEvent) => { /* do something*/ },
  async (sagaId, error, commit) =>  {
    /* do something else */

    await commit()  // execute in case the message should be skipped
  }
)

await client.start()

await client.emit('EVENT', sagaId, metadata)

await client.stop()

Notes:

  • it is not necessary to configure both components. This allows to enable only the needed features. However, please note that calling methods of components not configured results in an error.
  • it is recommended to define all the commands actions before starting the client. Nonetheless, they can be added or updated also at client runtime.

Client Methods

async start()

start function connects underlying components to Kafka, subscribe the consumer to the commands topic and allows the emission of events.

async stop()

stop function stops and disconnects underlying components from Kafka. It is resilient to disconnection errors. After its execution, methods isHealthy and isReady return false.

onCommand(command, commandFunction, errorHandler)

onCommand defines which action should be executed in case a specific command is received by the Flow Manager. It is also possible to define an error handler which takes as input the processing error.

Here is reported the signature of the two methods associated to a command:

  • commandFunction -> [async] Function(sagaId: string, commandMetadata: Object, emitEvent: function, heartbeat: function)
  • errorHandler -> [async] Function(sagaId: string, error: Error, commit: async Function)

Notes:

  • before executing a command, a parsing step is carried out. In case the command message can not be parsed as a Flow Manager message (e.g. the key does not contain any sagaId or the value does not provide messageLabel), the processing of that message is skipped altogether.
  • when a command is handled, it is also provided the possibility to emit a new event to notify the end of command execution. This is achieved by calling the emitEvent function given as argument of the commandFunction. Its signature is emitEvent(event, metadata). In this case sagaId is not needed since it exploits the same of executed command.
  • by default error risen during the processing step cause messages to be retried until the execution is successful. This behaviour can be fine in case the command action is idempotent or its repetition does not cause potential conflicts. In order to change it and skip messages whose processing throw an error, it is sufficient to call the commit function within the error handler. That function is provided as a parameter to the error handler, in conjunction with the processing error.
  • by default commands are read sequentially but, if you have multiple partitions assigned to the same client, you can set the partitionsConsumedConcurrently property of the commandExecutor to parallelize the execution of the commands in different partitions. This can improve performances if the command consists of asynchronous work. For reference: Partition-aware concurrency

async emit(event, sagaId, metadata)

emit allows to publish a new message in the events topic. It can throw in case sending a message results in a failure.

isHealthy()

isHealthy provides client status. In particular, it returns true in case:

  • the client has just been initialized, but not started
  • the client has been started and is running properly
  • the client has been started and it has crashed or stopped, but not disconnected from Kafka

Once the underlying components are disconnected from Kafka, either due to an error or due to calling stop method, the client status transitions to false (unhealthy). The only manners to get back into a healthy status is to either call again the start method (not recommended) or to wait that underlying components reconnect by themselves.

isReady()

isReady provides client running status. It returns true only in case the service is running properly.

Metrics

The library exposes also a function getMetrics(prometheusClients) which can be used to generate useful metrics related the commands and events handled by the client.

The metrics employed within the library are:

  • fm_client_commands_total - count how many commands issued by the Flow Manager have been processed. It has two labels:
    • command displays which command has been processed
    • result displays the result of processing this command
  • fm_client_events_total - count how many events have been sent to the Flow Manager. It has one label event that indicates which event has been fired.

Example of client building with metrics enabled:

const prometheusClient = require('prom-client')

const { FMClientBuilder, getMetrics } = require('@mia-platform/flow-manager-client')

const metrics = getMetrics(prometheusClient)

const client = new FMClientBuilder(pinoLogger, kafkaConfig)
  .configureCommandsExecutor(commandsTopic, consumerConf)
  .configureEventsEmitter(eventsTopic, producerConf)
  .enableMetrics(metrics)
  .build()

// expose metrics afterwards

Note: users of custom-plugin-lib can directly expose the getMetrics function and find these metrics decorated in the service customMetrics object.

const { FMClientBuilder, getMetrics } = require('@mia-platform/flow-manager-client')

const customService = require('@mia-platform/custom-plugin-lib')(
  { /* schema */ }
)

// export by default your service
module.exports = customService(async function index(service) {
  const { log, customMetrics } = service
  
  const client = new FMClientBuilder(log, kafkaConfig)
    .configureCommandsExecutor(commandsTopic, consumerConf)
    .configureEventsEmitter(eventsTopic, producerConf)
    .enableMetrics(customMetrics)
    .build()
  
  // your plugin logic
})

// export the function used by lc39 to add your custom metrics
// Note: flow-manager-client getMetrics function can be extended
// to include further metrics to be used in service
module.exports.getMetrics = getMetrics