npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@pestras/micro-rabbitmq

v1.2.1

Published

pestras microservice plugin for rabbitmq broker support

Downloads

4

Readme

Pestras Micro RabbitMQ

Pestras microservice plugin for rabbitmq broker support.

install

npm i @pestras/micro @pestras/micro-rabbitmq

Plug In

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {}

Micro.start(Test);

MicroMQ class accepts a two arguments.

Name | Type | Default | Description ---- | ----- | ------ | ----- connectOptions | string | Options.Connect | required | see RabbitMQ Docs socketConnection | any | null | see RabbitMQ Docs

Decorators:

MicroMQ provides several decorators to organize our code.

QUEUE:

As the name suggests, it helps to consume queue messages.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, QUEUE, ConsumeMessage, Channel } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  @QUEUE("hello", { durable: false }, { noAck: false })
  handler(msg: ConsumeMessage, channel: Channel) {
    console.log(msg.content.toString());
  }
}

Micro.start(Test);

QUEUE decorator accepts two arguments, name of the queue and the optional AssertQueue options.

FANOUT:

Helps to consume published messages as in Publish/Subscribe pattern.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, FANOUT, ConsumeMessage, Channel } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  FANOUT("hello", { durable: false }, { noAck: false })
  handler(msg: ConsumeMessage, channel: Channel) {
    console.log(msg.content.toString());
  }
}

Micro.start(Test);

DIRECT:

Helps to consume published messages with specific routing keys.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, DIRECT, ConsumeMessage, Channel } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  DIRECT("logs", ["error", "wran"], { durable: false }, { noAck: false })
  handler(msg: ConsumeMessage, channel: Channel) {
    console.log(msg.content.toString());
  }
}

Micro.start(Test);

TOPIC:

Helps to consume published messages with specific patterns.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, TOPIC, ConsumeMessage, Channel } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  TOPIC("logs", ["kern.*", "*.critical"], { durable: false }, { noAck: false })
  handler(msg: ConsumeMessage, channel: Channel) {
    console.log(msg.content.toString());
  }
}

Micro.start(Test);

Producing Messages:

We can produce messages using the channel instance in the second argument provided by messages handlers methods.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, QUEUE, ConsumeMessage, Channel } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  QUEUE("hello", { durable: false })
  handler(msg: ConsumeMessage, channel: Channel) {
    console.log(msg.content.toString());

    // producing messages using same queue channel
    channel.sendToQueue(queueName, content);
  }
}

Micro.start(Test);

However what if we want to produce meesages without having to do any messaging consuming.

MicroMQ provides several ways to produce messages as follows:

Sent To A Queue:

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, Queue } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  async someMethod() {
    // second argument is optional
    let queue = new Queue("hello", { durable: false });

    // second argument is optional
    await queue.send(Buffer.from("Hello World!"), { expiration: 60 * 1000 });
    // or
    await queue.channel.sendToQueue("hello", Buffer.from("Hello World!"), { expiration: 60 * 1000 });
  }
}

Micro.start(Test);

Publish fanout:

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, FanoutEx } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  async someMethod() {
    // second argument is optional
    let fanout = new FanoutEx("hello", { durable: false });

    // second argument is optional
    await fanout.publish(Buffer.from("Hello World!"), { expiration: 60 * 1000 });
    // or
    await fanout.channel.publish("hello", '', Buffer.from("Hello World!"), { expiration: 60 * 1000 });
  }
}

Micro.start(Test);

Publish direct:

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, DirectEx } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  async someMethod() {
    // second argument is optional
    let direct = new DirectEx("hello", { durable: false });

    // third argument is optional
    await direct.publish(Buffer.from("Hello World!"), "greetings", { expiration: 60 * 1000 });
    // or
    await direct.channel.publish("hello", 'greetings', Buffer.from("Hello World!"), { expiration: 60 * 1000 });
  }
}

Micro.start(Test);

Publish topic:

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, TopicEx } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  async someMethod() {
    // second argument is optional
    let topic = new TopicEx("hello", { durable: false });

    // third argument is optional
    await topic.publish(Buffer.from("Hello World!"), "greetings.all", { expiration: 60 * 1000 });
    // or
    await topic.channel.publish("hello", 'greetings.all', Buffer.from("Hello World!"), { expiration: 60 * 1000 });
  }
}

Micro.start(Test);

RPC

RabbitMQ has support for Request/Reply pattern, and we can achiev that in our service.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class Test {

  async getUserInfo(token: string) {
    try {
      // timeout default to 30000.
      let msg = await MicroMQ.Request("auth", Buffer.from(token), { timeout: 10000, noAck: true });
      console.log(msg.content.toString());
    } catch (e) {
      console.error(e.message);
    }
  }
}

Micro.start(Test);

We can make the reply in any QUEUE handler.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, QUEUE, ConsumeMessage, Channel } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class AuthService {

  QUEUE("auth", { durable: true })
  handler(msg: ConsumeMessage, channel: Channel) {
    let token = msg.content.toString();
    let user: any;
    // fetch user somehow

    channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringfy(user)), { correlationId: msg.properties.correlationId });
  }
}

Micro.start(AuthService);

MicroMQ Events

MicroMQ provides a single event triggered when a connection to rabbitmq borker made successfully.

import { SERVICE, Micro } from '@pestras/micro';
import { MicroMQ, MicroMQEvent } from '@pestras/micro-rabbitmq';

Micro.plugin(new MicroMQ(connectOptions, socketOptions));

@SERVICE()
class AuthService implements MicroMQEvent {

  onConnection() {

  }
  
}

Micro.start(AuthService);

SubServices:

MicroMQ supports pestras/micro subservice, so we can distribute our consumers decoraters into them as well.

Also onConnection event will be triggered when implemented in any subservice.