npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nstarter-rabbitmq

v0.9.0

Published

## 使用说明

Downloads

19

Readme

RabbitMQ 队列

使用说明

AMQP 链接

import AmqpConnector from 'nstarter-rabbitmq';
const amqp = AmqpConnector({
    user: 'user',
    password: 'password',
    brokers: [
        {
            host: '127.0.0.1',
            port: 5672
        }
    ],
    heartbeatIntervalInSeconds: 60,
    reconnectTimeInSeconds: 1
});

队列启动

import { IQueueConfig, queueFactory, IRabbitMqMessage } from 'nstarter-rabbitmq';

export interface IDemoMessage extends IRabbitMqMessage {
    value: string;
}

// 队列配置
const queueConfig: IQueueConfig = {
    name: 'demo:normal',
    prefetch: 2,
    maxLength: 10000,
};

export const demo_queue = queueFactory<IDemoMessage>(amqp, queueConfig);

生产者,向队列发消息

import { IProduceOptions, queueProducerFactory } from 'nstarter-rabbitmq';
import { demo_queue, IDemoMessage } from './queue';

/**
 * 增量同步延迟队列 生产者
 */
const produceOption: Partial<IProduceOptions> = {};

export const producer = queueProducerFactory<IDemoMessage>(demo_queue, produceOption);

// 启动生产者
producer.setup().then();
// 发送消息
producer
    .publish({ value: 'demo:normal' }, { mandatory: true, deliveryMode: true, persistent: true })
    .then(_.noop)
    .catch((err: Error) => console.log(err));

消费者,向队列订阅消息

import { AckPolicy, queueConsumerFactory, RetryMethod, IConsumerConfig, startQueueConsumers } from 'nstarter-rabbitmq';
import { queue, IDemoMessage } from'./queue';

const consumerConfig: IConsumerConfig<IDemoMessage> = {
    retryMethod: RetryMethod.republish,
    ackPolicy: AckPolicy.after,
    consumeTimeout: 10000, // 10s
    run(message): Promise<void> {
        const demoMessage: IDemoMessage = message.content;
        console.log(demoMessage);            
    }
};

export const consumer = queueConsumerFactory<IDemoMessage>(queue, consumerConfig);

// 注册队列消费者
consumer.register();
// 启动队列消费者
startQueueConsumers().then();

RabbitMqQueue

| 参数名 | 类型 | 参数说明 | | :-- | :-- | :-- | | amqp | AmqpConnectManager | RabbitMQ 链接管理 | | queue | IQueueConfig | 队列配置 |

RabbitMqQueue#waitForSetup(): Promise

等待链接初始化完成。

RabbitMqQueue#close(): Promise

关闭链接。

RabbitMqQueue#subscribe(messageHandler: IMessageHandler, options: Consume): Promise

Push 模式,客户端订阅队列消息,消息由服务端“推送”给客户端。

| 参数名 | 类型 | 参数说明 | | :-- | :-- | :-- | | messageHandler | IMessageHandler<T> | 消息处理逻辑 | | options | object | 参数配置 | | options.exclusive | boolean | 是否启用匿名队列订阅,服务端分配一个匿名队列,断开链接后自动删除 |

RabbitMqQueue#publish(content: IQueuePayload, options: Publish): Promise

Confirm 模式,将消息内容发送到 RabbitMQ 中的 Exchange,确保消息准确被添加到队列,且持久化保存后返回。消息分发规则由 routingKeyexchange 规则确定。

| 参数名 | 类型 | 参数说明 | | :-- | :-- | :-- | | content | any | 消息内容 | | options | IProducerConfig<T> | 消息参数, 参考 RabbitMqProducer 说明 |

RabbitMqQueue#ack(message: IQueueMessage, allUpTo?: boolean): Promise

确认消息消费,RabbitMQ 会将对应的消息删除。allUpTotrue,会将该消息之前的所有消息均 ack 掉。

RabbitMqQueue#nack(message: IQueueMessage, allUpTo?: boolean, requeue?: boolean): Promise

RabbitMQ 会“拿回”该消息的。requeuetrue 会重新将该消息放回队列,否则丢弃该消息。

RabbitMqProducer

| 参数名 | 类型 | 参数说明 | | :-- | :-- | :-- | | queue | RabbitMqQueue<T> | 队列对象 | | options | IProducerConfig<T> | 消息参数 | | options.headers | IProduceHeaders | 消息生产者 headers | | options.priority | Priority | 消息优先级,高优先级先分发消费 | | options.pushRetryTimes | number | 消息发送时,本地重试次数 |

RabbitMqProducer#setup(): Promise

队列生产者启动方法。

RabbitMqProducer#publish(content: IQueuePayload, options: Publish): Promise

此方法带本地重试机制。参数内容同 RabbitMqQueue#publish(content, options)

RabbitMqConsumer

| 参数名 | 类型 | 参数说明 | | :-- | :-- | :-- | | queue | RabbitMqQueue<T> | 队列对象 | | options | IConsumerConfig<T> | 消费者参数 | | options.retryTimes | number | 重试次数 | | options.retryDelay | DelayLevel | 重试延时等级 | | options.retryMethod | RetryMethod | 重试策略,RetryMethod.retry 本地重试,RetryMethod.republish 重新发布到队列 | | options.timeout | number | 消息消费超时时间,从消息生产开始算,republish 会刷新时间 | | options.run() | (message: IQueueMessage<T>): Promise<void> | 消息消费逻辑 |

RabbitMqConsumer#start(): Promise

启动消费者, 执行任务订阅。

RabbitMqConsumer#stop(): Promise

停止消费者执行。