npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

microservice-kit-atg

v0.4.4

Published

Utility belt for building microservices

Downloads

5

Readme

microservice-kit-atg

Utility belt for building microservices.

Note

This is a fork by SignAlive's version to add the package to NPM and to add Boom error support.

Quick Start

API Reference

Class MicroserviceKit

This is the main class, the entry point to microservice-kit-atg. To use it, you just need to import microservice-kit-atg:

const MicroserviceKit = require('microservice-kit-atg');

To create an instance, look at constructor below. A microservice-kit-atg instance is simply collection of an AmqpKit and a ShutdownKit instances.

new MicroserviceKit(options={})

Params

Name|Type|Description ----|----|----------- options.type="microservice"|String|Type of the microservice. This name will be used as prefix in generating unique name. This is helpful when differentiating microservice instances. options.amqp|Object|This object will be pass to AmqpKit when creating instance. See AmqpKit's docs for detail. [options.shutdown.killTimeout=Infinity]|Number|This Number will be passed into ShutdownKit.setOptions method.

Sample
const microserviceKit = new MicroserviceKit({
    type: 'core-worker',
    amqp: {
        url: "amqp://localhost",
        queues: [
            {
                name: "core",
                options: {durable: true}
            }
        ],
        exchanges: []
    },
    shutdown: {
      killTimeout: "Infinity"
    }
});

MicroserviceKit.prototype.amqpKit

This amqpKit instance is automatically created for microservice. See AmqpKit for details.

const coreQueue = microserviceKit.amqpKit.getQueue('core');

MicroserviceKit.prototype.shutdownKit

This shutdownKit (singleton) instance is automatically created for microservice. See ShutdownKit for details.

microserviceKit.shutdownKit.addJob(someFunction);

MicroserviceKit.prototype.init() -> Promise

Created instance is not ready yet, it will connect to rabbitmq. You should call this method when booting your app.

microserviceKit
  .init()
  .then(() => {
    console.log("Initalized microservicekit!");
  })
  .catch((err) => {
    console.log("Cannot initalize microservicekit!", err);
  })

MicroserviceKit.prototype.getName() -> String

This is the unique name of the created instance. It begins with microservice type and followed by random string. Ex: socket-worker-54a98630

Class AmqpKit

This is the AmqpKit class aims to help communication over RabbitMQ. Main features:

  • Get callbacks like natively instead of low-level RabbitMQ RPC topologies
  • Send & recieve events instead of messages. Events are just special message hierarchy.
  • Send & recieve payloads in native JSON format instead of buffers.
  • Progress support, a consumer can inform its progress to the producer.

AmqpKit uses amqplib in barebones. Look at its documentation. We will refer this page a lot.

const AmqpKit = require('microservice-kit-atg').AmqpKit;

You can reach AmqpKit class like above. However, if you create a MicroserviceKit instance you don't need to reach AmqpKit. An AmqpKit instance will be automatically created for you.

new AmqpKit([options={}])

Only use this constructor for advanced usage! An AmqpKit instance will be automatically created, if you use new MicroserviceKit(options) constructor. If so, options.amqp will be used while creating AmqpKit instance.

Params

Param|Type|Description -----|----|----------- [options.url]|String|AMQP connection string. Ex: amqp://localhost [options.rpc=true]|Boolean|If you don't need to use callbacks for amqp communication, you can use false. If so, an extra rpc channel and queue will not be created. Default true. options.queues=[]|Array|This queues will be asserted in init flow. [options.queues[].name]|String|Name of queue on RabbitMQ. Optional. Do not pass any parameter if you want to create an exclusive queue. It will be generated automatically. options.queues[].key|String|This is key value for accessing reference. This will be used for AmqpKit.prototype.getQueue. options.queues[].options|Object|Options for the queue. See offical amqplib assertQueue reference. options.exchanges=[]|Array|This exchanges will be asserted in init flow. options.exchanges[].name|String|Name of exchange on RabbitMQ. options.exchanges[].key|String|This is key value for accessing reference. This will be used for AmqpKit.prototype.getExchange. options.exchanges[].type|String|fanout, direct or topic options.exchanges[].options|Object|Options for the exchange. See offical amqplib assertExchange reference. [options.logger=null]|Function|AmqpKit can log incoming and outgoing events. It also logs how much time spend on consuming events or getting callback. You can use simply console.log.bind(console).

Sample
const amqpKit = new AmqpKit({
  queues: [
      {
          key: 'broadcast',
          options: {exclusive: true}
      },
      {
          key: 'direct',
          options: {exclusive: true}
      }
  ],
  exchanges: [
      {
          name: 'socket-broadcast',
          key: 'socket-broadcast',
          type: 'fanout',
          options: {}
      },
      {
          name: 'socket-direct',
          key: 'socket-direct',
          type: 'direct',
          options: {}
      }
  ],
  logger: function() {
      var args = Array.prototype.slice.call(arguments);
      args.unshift('[amqpkit]');
      console.log.apply(console, args);
  }
});

AmqpKit.prototype.prefetch(count, [global])

AmqpKit has two channels by default. The common channel, is used for recieving and sending messages in your microservice. Another channel is for getting rpc callbacks and used exclusively inside AmqpKit. This method sets a limit the number of unacknowledged messages on the common channel. If this limit is reached, RabbitMQ won't send any events to microservice.

Params

Param|Type|Description -----|----|----------- count|Number|Set the prefetch count for the channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged. A falsey value for count indicates no such limit. [global]|Boolean|Use the global flag to get the per-channel behaviour. Use true if you want to limit the whole microservice. RPC channel is seperate, so don't worry about callbacks.

Sample
microserviceKit.amqpKit.prefetch(100, true);

This microservice can process maximum 100 events at the same time. (Event type does not matter) RabbitMQ won't send any message to the microservice until it completes some jobs.

AmqpKit.prototype.getQueue(key) -> AmqpKit.Queue

Gets queue instance by key.

Param|Type|Description -----|----|----------- key|String|Unique queue key.

AmqpKit.prototype.getExchange(key) -> AmqpKit.Exchange

Gets exchange instance by key.

Param|Type|Description -----|----|----------- key|String|Unique exhange key.

AmqpKit.prototype.createQueue(key, name, options={}) -> Promise.<AmqpKit.Queue>

Creates (assert) a queue.

Param|Type|Description -----|----|----------- key|String|Unique queue key. [name]|String|Name of queue on RabbitMQ. Optional. Pass empty string if you want to create an exclusive queue. It will be generated automatically. options|Object|Options for the queue. See offical amqplib assertQueue reference.

AmqpKit.prototype.createExchange(key, name, type, options={}) -> Promise.<AmqpKit.Exchange>

Creates (asserts) an exchange.

Param|Type|Description -----|----|----------- key|String|Unique exhange key. name|String|Name of exchange on RabbitMQ. type|String|fanout, direct or topic options|Object|Options for the exchange. See offical amqplib assertExchange reference.

AmqpKit.prototype.connection

Native ampqlibs connection. See offical docs.

AmqpKit.prototype.channel

Native ampqlibs channel instance that will be used commonly. See offical docs.

Class AmqpKit.Queue

This class is not exposed to user. When you do amqpKit.getQueue() or amqpKit.createQueue(), what you get is an instance of this class.

AmqpKit.Queue.prototype.consumeEvent(eventName, callback, [options={}])

Sends an event to queue.

Params

Param|Type|Description -----|----|----------- eventName|String|Event name. callback|Function|Handler function. It takes 3 parameters: payload, done, progress. Payload is event payload. Done is node style callback that finalize the event: done(err, payload). Both error and payload is optional. Error should be instaceof native Error class! Progress is optional callback that you can send progress events: progress(payload). Progress events does not finalize events! [options={}]|Object|Consume options. See amqplibs offical consume docs.

Sample
const coreQueue = microserviceKit.amqpKit.getQueue('core');

coreQueue.consumeEvent('get-device', (payload, done, progress) => {
  // Optional progress events!
  let count = 0;
  let interval = setInterval(() => {
      progress({data: 'Progress ' + (++count) + '/5'});
  }, 1000);

  // complete job.
  setTimeout(() => {
      clearInterval(interval);
      done(null, {some: 'Response!'});
  }, 5000);
}, {});

AmqpKit.Queue.prototype.bind(exhange, pattern) -> Promise

Assert a routing pattern from an exchange to the queue: the exchange named by source will relay messages to the queue named, according to the type of the exchange and the pattern given.

Params

Param|Type|Description -----|----|----------- exchange|String|Name of exchange on RabbitMQ. pattern|String|Binding pattern.

AmqpKit.Queue.prototype.unbind(exchange, pattern) -> Promise

Remove a routing path between the queue named and the exchange named as source with the pattern and arguments given.

Param|Type|Description -----|----|----------- exchange|String|Name of exchange on RabbitMQ. pattern|String|Binding pattern.

AmqpKit.Queue.prototype.getUniqueName() -> String

Returns real queue name on RabbitMQ.

AmqpKit.Queue.prototype.sendEvent(eventName, [payload={}], [options={}]) -> Promise

Sends an event with payload to the queue.

Params

Param|Type|Description -----|----|----------- eventName|String|Event name. [payload]|Object|Payload data. [options]|Object|See ampqlibs official docs. [options.dontExpectRpc=false]|Boolean|Additional to amqplib options, we provide couple of functions too. If you don't want to callback for this message, set true. Default false. [options.timeout=30000]|Number|Timeout duration. This check is totaly in producer side, if job is done after timeout, it's rpc message will be ignored. Pass 0 if you dont want to timeout. If you set dontExpectRpc as true, ignore this option.

Sample
const coreQueue = microserviceKit.amqpKit.getQueue('core');

coreQueue
  .sendEvent('get-device', {id: 5}, {persistent: true})
  .progress((payload) => {
    console.log('The job is processing...', payload);
  })
  .success((payload) => {
    console.log('Device: ', payload);
  })
  .catch((err) => {
    console.log('Cannot get device', err);
  })

Notice the .progress() handler? It's just a additonal handler that AmqpKit puts for you. Instead of this, return value of this method is Promise.

Class AmqpKit.Exchange

This class is not exposed to user. When you do amqpKit.getExchange() or amqpKit.createExchange(), what you get is an instance of this class.

AmqpKit.Exchange.prototype.publishEvent(routingKey, eventName, [payload], [options]) -> Promise

Sends an event with payload to the exchange.

Params

Param|Type|Description -----|----|----------- routingKey|String|Routing pattern for event! eventName|String|Event name. [payload]|Object|Payload data. [options]|Object|See ampqlibs official docs. [options.dontExpectRpc=false]|Boolean|Additional to amqplib options, we provide couple of functions too. If you don't want to callback for this message, set true. Default false. [options.timeout=30000]|Number|Timeout duration. This check is totaly in producer side, if job is done after timeout, it's rpc message will be ignored. Pass 0 if you dont want to timeout. If you set dontExpectRpc as true, ignore this option.

Sample
const broadcastExchange = microserviceKit.amqpKit.getExchange('socket-broadcast');
broadcastExchange.publishEvent('', 'channel-updated', {channel: 'data'}, {dontExpectRpc: true});

Class ShutdownKit

This class helps us to catch interrupt signals, uncaught exceptions and tries to perform jobs to shutdown gracefully. This class is singleton.

// Direct access
const shutdownKit = require('microservice-kit-atg').ShutdownKit;

// Or from microservice-kit-atg instance
const microserviceKit = new MicroserviceKit({...});
console.log(microserviceKit.shutdownKit);

As you can see above, you can access ShutdownKit singleton instance in multiple ways.

ShutdownKit.prototype.addJob(job)

Add a job to graceful shutdown process. When ShutdownKit tries to shutdown gracefully, it runs all the jobs in parallel.

Params

Param|Type|Description -----|----|----------- job|Function|This function takes done callback as single parameter. Execute done callback when job is completed. It's also like node-style callback: done(err).

Sample
shutdownKit.addJob((done) => {
    debug('Closing connection...');
    this.connection
      .close()
      .then(() => {
          done();
      })
      .catch(done);
});

ShutdownKit.prototype.gracefulShutdown()

This method gracefully shutdowns current node process.

ShutdownKit.prototype.setLogger(logger)

Sets a custom logger to print out shutdown process logs to console.

Params

Param|Type|Description -----|----|----------- logger|Function|This function takes done callback as single parameter. Execute done callback when job is completed. It's also like node-style callback: done(err).

Sample
shutdownKit.setLogger(() => {
  var args = Array.prototype.slice.call(arguments);
  args.unshift('[shutdownkit]');
  console.log.apply(console, args);
});

As you can see, we convert all arguments to native array and prepends [shutdown] prefix. Then apply this arguments to standart console.log method.