npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rvl-pipe-rabbitmq

v0.3.0

Published

rvl-pipe style wrappers for rabbitmq

Downloads

2

Readme

rvl-pipe-rabbitmq

A very small set of boilerplate functions for rabbitMQ common uses based on rvl-pipe async-style functions.

Includes task queue producer/consumer based on Work Queues at RabbitMQ Tutorial and topics publishing/consumer based on Topics at RabbitMQ Tutorial

Abstracts some quirks of builing queue producers/consumers by providing an small/reduced set of functions to do exactly a few features.

API

API is comprised of only 8 functions. 2 for open and close AMQP connections, 4 for task queue consumer/producer and acknowledgement and 2 for publishing/consuming topic messages.

  • connectAMQP(amqp_url: String):AsyncPipeFunction: Returns an async-pipe function that connects to the AMQP server specifyed in the url param and adds the connection to the context as amqpConnection prop.
const producer = each(
  connectAMQP(process.env.RABBITMQ_URL),

  // Do more things here
)

producer()
  .then(ctx => {
    // ctx.amqpConnection exists
  })
  • closeAMQP(): AsyncPipeFunction: Returns an async-pipe function that closes any existing AMQP connection on the context by its property amqpConnection
const producer = each(
  connectAMQP(process.env.RABBITMQ_URL),

  // Do some things here

  closeAMQP()
)

producer()

Task Queues

We added 4 basic functions to help with creating tasks queues consumer, producers and acknowledgment.

  • sendTaskMessage(queueQuery: AsyncPipeQuery, messageQuery: AsyncPipeQuery):AsyncPipeFunction: Creates an async-pipe function that once called sends a task message to the specifyed queue. Both queue and message parameters are async-pipe queries, meaning they are simple functions that take the context as param and return a value.

To send a simple task queue message we only need to:

const emailPayload = JSON.stringify({
  dest: '[email protected]',
  type: 'welcome'
})

const produceEmailTask = each(
  connectAMQP(process.env.RABBITMQ_URL),
  sendTaskMessage(always('emails'), always(emailPayload)))
  closeAMQP()
)

produceEmailTask()
  • consumeTaskQueue(queueQuery: AsyncPipeQuery, consumerFn:AsyncPipeFunction):AsyncPipeFunction: Creates an async-pipe function that consumes tasks messages using a consumer function. The consumer function gets access to the context created so it can make use of any resources existing on the context.
const consumeEmailTask = each(
  ctx => {
    console.log('MSG: ', ctx.msg.content.toString())
    return ctx
  },
  createDocument(               // MongoDB connection is available here
    'email-payloads',
    props({
      id: cuid,
      msg: ctx => ctx.msg.content.toString()
    }),
    'email-payload'
  ),
  messageAck()
)

const consume = each(
  connectMongoDB(process.env.MONGODB_URL, process.env.MONGODB_NAME),
  connectAMQP(process.env.AMQP_URL),
  consumeTaskQueue(always('emails'), consumeEmailTask)
)

consume()
  .then(ctx => {
    console.log('Consumer started')
  })
  • messageAck():AsyncPipeFunction: Creates an async-pipe function to send the acknowledge message back to AMQP server that we processed the message correctly.
  • messageNoAct():AsyncPipeFunction: Same as messageAck but to signal that we could not process the message.
const consumeEmailTask = each(
  ctx => {
    console.log('MSG: ', ctx.msg.content.toString())
    return ctx
  },
  ensure(
    each(
      createDocument(
        'email-payloads',
        props({
          id: cuid,
          msg: ctx => ctx.msg.content.toString()
        }),
        'email-payload'
      ),
      messageAck()
    ),
    messageNoAck()            // If previous function fails, then send noAck
  )
)

const consume = each(
  connectMongoDB(process.env.MONGODB_URL, process.env.MONGODB_NAME),
  connectAMQP(process.env.AMQP_URL),
  consumeTaskQueue(always('emails'), consumeEmailTask)
)

consume()
  .then(ctx => {
    console.log('Consumer started')
  })

Topics

For a simpler pub/sub scheme we provide 2 functions to send messages to an named exchange using a topic key. Same for subscribing to messages.

  • sendTopicMessage(exchangeQuery: AsyncPipeQuery, keyQuery: AsyncPipeQuery, messageQuery:AsyncPipeQuery):AsyncPipeFunction: Returns a function that sends a topic message to an exchange using a key.
const notify = each(
  connectAMQP(process.env.AMQP_URL),
  sendTopicMessage(
    always('notifications'),
    always('email.sent'),
    prop('message')
  ),
  closeAMQP()
)

notify({ message: 'Emailed user, welcome' })
  .then(ctx => {
    console.log('Notification sent')
  })
  • consumeTopicsMessage(exchangeQuery: AsyncPipeQuery, keyQuery: AsyncPipeQuery, consumerFn: AsyncPipeFunction):AsyncPipeFunction: Creates an async-pipe function that will consume messages sent to the defined exchange and key pattern.
const consumeNotification = each(
  ctx => {
    console.log(
      " [x] %s:'%s'",
      ctx.msg.fields.routingKey,
      ctx.msg.content.toString()
    )
    return ctx
  }
)

const consumeEmailNotifications = each(
  connectAMQP(process.env.AMQP_URL),
  consumeTopicsMessages(
    always('notifications'),
    always('email.*'),
    consumeNotification
  )
)

consumeEmailNotifications()
  .then(ctx => {
    console.log('Subscribed to email notifications')
  })

Same as consumeTaskQueue the function to process messages receives the context, so you can initialize resources first and then start processing messages.

Notice that, topic subscripcions don't need to acknowledge messages. Is possible to lose messages if no subscriber is available when notifications are fired.