npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

cb-tortoise

v1.0.1

Published

Client library for AMQP

Downloads

2

Readme

Tortoise

NPM

Build Status Coverage Status Dependencies

npm install tortoise

A client library for interacting with AMQP.

Basic Example

var Tortoise = require('tortoise')
  , tortoise = new Tortoise('amqp://localhost');

tortoise
  .queue('my-queue')
  .prefetch(1)
  .subscribe(function(msg, ack) {
    console.log(msg);
    ack();
  });

setInterval(function() {
  tortoise
    .queue('my-queue')
    .publish({ Hello: 'World' });
}, 1000);

Basic Setup

var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost');

Advanced Setup

var Tortoise = require('tortoise');
var options = {
  connectRetries: -1,
  connectRetryInterval: 1000
};
var tortoise = new Tortoise('amqp://localhost', options);

options is optional. Current options are:

  • connectRetries: Number value greater than or equal to -1. Defaults to -1. Tortoise will attempt to connect up to this number. When set to -1, tortoise will attempt to connect forever. Note: This does not handle connections that have already been established and were lost see Handling connection or channel closure for more information on that.
  • connectRetryInterval: Number value greater than or equal to 0. Defaults to 1000. This is the amount of time, in ms, that tortoise will wait before attempting to connect again.

Publishing to a queue

tortoise
  .queue('my-queue', { durable:false })
  .publish({ Hello: 'World' });

Publishing to an exchange

tortoise
  .exchange('my-exchange', 'direct', { durable:false })
  .publish('routing.key', { Hello: 'World' });

Subscribing to a queue

tortoise
  .queue('my-queue', { durable: false })
  .prefetch(1)
  .subscribe(function(msg, ack, nack) {
    // Handle
    ack(); // or nack();
  });
tortoise
  .queue('my-queue', { durable: false })
  // Add as many bindings as needed
  .exchange('my-exchange', 'direct', 'routing.key', { durable: false })
  .prefetch(1)
  .subscribe(function(msg, ack, nack) {
    // Handle
    ack(); // or nack();
  });
Automatically parsing JSON

There is an optional function setting that will automatically attempt to parse messages from JSON (using JSON.parse) and if invalid, will nack(requeue=false) the message. To capture this event each time it occurs, you can subscribe to your tortoise instance for event Tortoise.EVENTS.PARSEERROR:

var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost');

tortoise
  .queue('my-queue', { durable: false })
  .prefetch(1)
  .json()
  .subscribe(function(msg, ack, nack) {
    // Will be called if the msg content is valid JSON and can be parsed
    ack(); // or nack();
  });
  
 tortoise.on(Tortoise.EVENTS.PARSEERROR, function(err, msg) {
    // err is the error
    // msg is the message object returned from AMQP.
    // msg.content is the Buffer of the message
    console.log('An error occurred parsing the msg content');
 });

Accessing message data

The callback function provided to the subscribe method will be scoped to the message, i.e. the this object will contain the properties of the message. The object would look similar to this:

{
  fields: {
    deliveryTag: <int>,
    redelivered: <bool>,
    routingKey: <string>,
    ...
  },
  properties: {
    contentType: <string>,
    headers: {
      ...
    },
    ...
  }
}

So, if I wanted to access the routingKey that was provided, I would access it by:

tortoise
  .queue('my-queue', { durable: false })
  .exchange('my-exchange', 'topic', 'event.*', { durable: false })
  .subscribe(function(msg, ack, nack) {
    var routingKey = this.fields.routingKey;
    // Handle
    ack(); // or nack();
  });

This is useful if you subcribe to wildcard topics on an exchange but wanted to know what the actual topic (routingKey) was.

Handling Errors and Events

Tortoise will emit events when certain things occur. The following events are emitted:

{
    PARSEERROR: 'TORTOISE.PARSEERROR',
    CONNECTIONCLOSED: 'TORTOISE.CONNECTIONCLOSED',
    CONNECTIONDISCONNECTED: 'TORTOISE.CONNECTIONDISCONNECTED'
}

These event strings are accessed by the EVENTS property on the Tortoise library, and can be subscribed to on an individual tortoise instance. Here is an example of being notified when a parse error occurred:

var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost');
// Do your tortoise configuration

tortoise.on(Tortoise.EVENTS.PARSEERROR, function() {
    // Called on parse error
});

Auto retrying and throttling

There are a few methods available for controlling continuous failures, all are optional. failSpan and retryTimeout do nothing if failThreshold is not set

default behavior (not setting) of failThreshold is no failure handling

var Tortoise = require('tortoise')
  , tortoise = new Tortoise('amqp://localhost');

tortoise
  .queue('simple-queue', { durable: true })
  .failThreshold(3) // 3 immediate attempts
  .failSpan(1000 * 60 * 10) // 10 minutes, defaults to 1 minute
  .retryTimeout(1000 * 10) // 10 second timeout on each retry, defaults to 5 seconds
  .subscribe(function(msg, ack, nack) {
    console.log(msg);
    nack();
  });

Automatic setup of dead letter exchange and queue

If you wanted to setup your (subscribe) queue to automatically set a dead letter exchange:

var Tortoise = require('tortoise')
  , tortoise = new Tortoise('amqp://localhost');

tortoise
  .queue('simple-queue')
  .dead('exchange.dead', 'queue.dead')
  .subscribe(function(msg, ack, nack) {
    // Do not requeue, instead shove to dead letter exchange
    nack(false);
  });

Declaring the queue to bind to the exchange is optional. It is perfectly acceptable to setup like this:

var Tortoise = require('tortoise')
  , tortoise = new Tortoise('amqp://localhost');

tortoise
  .queue('simple-queue')
  .dead('exchange.dead')
  .subscribe(function(msg, ack, nack) {
    // Do not requeue, instead shove to dead letter exchange
    nack(false);
  });

Configuring without the need to subscribe or publish

The .setup method will call all asserts and bindings then close the channel

tortoise
  .queue('myQueue')
  .exchange('myExchange', 'topic', '#')
  .dead('myDeadExchange')
  .setup();

tortoise
  .exchange('myExchange', 'topic')
  .setup();

Handling connection or channel closure

Automatic Method

There exists a helper method, .reestablish(), to re-establish connections that were lost (when subscribing). It will attempt re-establish the connection and, when successful, will be configured with the same settings as before (queue, exchanges, etc). One caveat with this method is the .then() resolution from the .subscribe() method will no longer function after the connection is lost. In most cases that is not a problem.

It should be noted that this will begin consuming the connectRetries limit. See Advanced Setup for more information.

Here is an example:

var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost', { connectRetries: -1 });

tortoise
  .queue('myQueue')
  .reestablish()
  .subscribe(function(msg, ack, nack) {
    console.log('message received', msg);
    ack();
  })
  .then(function(ch) {
    // This will only be called once the original channel closes, not for any new channels created
    ch.on('close', function() {
      console.log('channel closed');
    });
  });

If you would still like to know (for logging, etc) when a connection is closed, see the Handling Errors and Events section for subscribing to connection events.

Manual Method

When subscribing, the promise returned from .subscribe() resolves with a channel object that can be listened on.

The following is an example of listening for close events and resubscribing.

// Wrap subscription inside function
var subscribe = function() {
  tortoise
    .queue('myQueue')
    .subscribe(function(msg, ack, nack) {
      ack();
    })
    .then(function(ch) {
      // Once connection is closed, immediately attempt to subscribe again
      ch.on('close', subscribe);
    })
}

// Start subscribing
subscribe();