npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@pieropatron/amqp-client

v1.0.0

Published

AMQP0-9-1 streamable client for nodejs

Downloads

4

Readme

amqp-client

tiny amqp 0-9-1 streamable client

NPM version NPM downloads

Idea of this project is to provide easy of use client for work with rabbit-mq, which should allow to post and read message bodies as streams.

Introduction

Starting to work with Rabbit MQ it is often a bit complex to understand its philosophy: how to correctly make connection, then using of channels, publish and consume messages and so on. Even if we need to do only something simple, like just read messages from one Queue of some 3rd party app, we'll had to study a lot of things about Rabbit MQ, AMQP and so on. This client (in theoury) should to allow of easy start to work with Rabbit MQ. Another problem is that inspite of ability of Rabbit-MQ to transmit big-size messages (splitting them to chunks of liimited size), I didn't found in popular libraries for nodejs to process this correct. As I understand, they trying to cache all chunks in memory while message will not completely received, which is really not safe for nodejs and it's memory restrictions. Thats why I also added to this client ability to use message bodies as streams. In result, it should allow to save infromation, for example, to files directly after receive message chunks.

NB: This client is not well-tested yet, so, please, be careful using it!

Install

npm install @pieropatron/amqp-client

OR

npm install https://github.com/pieropatron/amqp-client

API

Create client

import {Client} from '@pieropatron/amqp-client';

// create client with default values of options
const client = new Client({
	host: '127.0.0.1',
	port: 5672,
	username: 'guest',
	password: 'guest',
	auth_mechanism: ['AMQPLAIN', 'PLAIN'],
	protocol: 'amqp:',
	channel_max: 0,
	frame_max: 0,
	heartbeat: 0,
	locale: 'en_US',
	vhost: '/',
	connection_timeout: 60000
});

Work with exchanges

// get helper to work with exchanges
const exchange = client.exchange("test");

// Create exchange if not exists
await exchange.declare({
	storage_type: 'durable',
	type: 'direct',
	alternate_exchange: '',
	internal: false
});

// Check if exchange exists
const exists: boolean = await exchange.exists();

// Bind exchange to "destination" exchange with "routing key".
const bound: boolean = await exchange.bind({
	destination: "",
	routing_key: ""
});

// Unbind exchange from "destination" exchange with "routing key".
const unbound: boolean = await exchange.unbind({
	destination: "",
	routing_key: ""
});

// Delete exchange
const deleted = await exchange.delete();

Work with queues

// get helper to work with queues
const queue = client.queue("test");

// Create queue if not exists
await queue.declare({
	storage_type: 'durable',
	queue_type: 'classic',
	private: false
});

// Check if queue exists
const exists: boolean = await queue.exists();

// Get current queue stat
const stat: {queue: string, message_count: number, consumer_count: number} = await queue.stat();

// Bind queue to exchange with "routing key".
const bound: boolean = await queue.bind({
	exchange: "",
	routing_key: ""
});

// Unbind queue from exchange with "routing key".
const unbound: boolean = await queue.unbind({
	exchange: "",
	routing_key: ""
});

// remove all messages from queue
const purged: {message_count: number} = await queue.purge();

// Delete queue
const deleted: {message_count: number} = await queue.delete();

Publish messages

Structure of "Published message":

const message = {
	properties?: {
		// MIME content type
		content_type?: string,
		// MIME content encoding
		content_encoding?: string,
		// For applications, and for header exchange routing
		headers?: object,
		/**
		 * For queues that implement persistence:
		 * non-persistent (1) or persistent (2)
		 */
		delivery_mode?: number,
		// message priority, 0 to 9
		priority?: number,
		// For application use, correlation identifier
		correlation_id?: string,
		// address to reply to
		reply_to?: string,
		// message expiration specification
		expiration?: string,
		// application message identifier
		message_id?: string,
		// message timestamp
		timestamp?: Date,
		// For application use, message type name
		type?: string,
		// creating user id
		user_id?: string,
		// application id
		app_id?: string
	},
	// name of exchange to publish
	exchange?: string,
	// routing key to publish
	routing_key?: string,
	// callback which is called after current message will published
	callback?: (error?: Error | null, result?: {delivery_tag: bigint, multiple: boolean}) => void,
	// optional body to publish
	body?: Readable | Buffer,
	// size of body, required for case when body is stream
	body_size
}

There are 2 ways for publishing messages in the client:

  1. Using writable publisher stream
  2. Using publisher helper

Examples:

import {ReadableAsync, pipeline} from '@pieropatron/amqp-client';
import {createReadStream} from 'fs';
import {Client, PublishMessage} from '@pieropatron/amqp-client';

// publish using streams:
const rs = new ReadableAsync<PublishMessage>;
await rs.pushAsync({
	body: createReadStream(__dirname + '/big.avi'),
	callback: (error, result)=>{
		console.log(error, result);
		rs.push(null);
	}
});

const publish_writable = await client.publish_writable();
await pipeline(rs, publish_writable);

// publish using helper:
const publisher = await client.create_publisher();
await publisher.publish({
	body: createReadStream(__dirname + '/big.avi'),
	callback: (error, result)=>{
		console.log(error, result);
	}
});

Consume messages

Structure of "Consume message":

const message = {
	// tag of consumer
    	consumer_tag: string,
    	// unique (for consumer) delivery number
    	delivery_tag: bigint,
	// is message redeliveried
    	redelivered: boolean,
	// name of source exchange
    	exchange: string,
	// delivery routing key
    	routing_key: string,
	// same properties structure as for Publish message
	properties?: {},
	// Readable stream, with additional method toBuffer (for easy get content, if required)
	body?: ConsumerBodyReadable,
	// size of body
	body_size?: number,
	// Method to call if message was processed well
	ack: ()=>Promise<void>,
	// Method to call if message was processed bad
	nack: (requeue: boolean)=>Promise<void>
}

NB: for message it is mandatory to call ack or nack after the process!

Example:

import {ConsumeMessage} from '@pieropatron/amqp-client';
import {WritableAsync, pipeline} from '@pieropatron/amqp-client';

const consumer_transform = await client.consumer_transform({
	/** Queue for consume */
	queue: string,
	/** Prefetch count of messages */
	prefetch_count: number,
	/** Do not receive messages from same connection */
	no_local: boolean,
	/** Identifier for the consumer */
	consumer_tag: string,
	/** In case of set, only this consumer can access the queue. */
	exclusive: boolean,
	/** Priority of consumer. Can be positive or negative. */
	priority: number,
	// Any other supported consume arguments
	custom: Record<string, any>
});

await pipeline(
	consumer_transform,
	new WritableAsync<ConsumeMessage>({
		async write(chunk: ConsumeMessage){
			if (chunk.body){
				const buffer = await chunk.body.toBuffer();
				console.log(buffer.toString());
				await chunk.ack();
			} else {
				await chunk.nack(false);
			}
		}
	})
);

That's all for this client. Hope, it will be usefull for you.