@laqus/laqus.mensageriajs
v1.0.15
Published
Esse Projeto de Biblioteca, serve para gerenciamento de mensageria com suporte a diferentes provedores, como RabbitMQ, SQS e futuramente outros; o intuito principal é abstrair de forma simples como gerenciamos esses `Providers`, como para configurar Filas
Downloads
298
Readme
Laqus Mensageria JS/TS
Esse Projeto de Biblioteca, serve para gerenciamento de mensageria com suporte a diferentes provedores, como RabbitMQ, SQS e futuramente outros; o intuito principal é abstrair de forma simples como gerenciamos esses Providers
, como para configurar Filas, Exchanges/Tópicos, Pub/Sub no geral.
Diagrama de Classes
classDiagram
class IMessageProvider {
+Connect(): Promise<void>;
+AddConsumer(queue: string, cb: (message: ConsumeMessage | SQS.Types.Message | null) => Promise<void>, options?: DLQOptions | null | number): Promise<void>;
+SendMessage(queue: string, message: Buffer, messageType?: string | null, options?: DLQOptions | null | number): Promise<void>
+CreateTopic(exchange: string, type: string, config?: Options.AssertExchange | null): Promise<void | string>;
+PublishMessage(exchange: string, message: Buffer, routingKey?: string | null, messageType?: string | null): Promise<void>;
+AssignTopic(queue: string, exchange: string, routingKey?: string | null, options?: DLQOptions | null | number): Promise<void>;
}
class MessageBrokerFactory {
-provider: MessageProvider
+constructor(options: IProviderOptions)
}
IMessageProvider <-- RabbitMQProvider
IMessageProvider <-- SQSProvider
MessageBrokerFactory ..> IMessageProvider
Súmario
- Laqus Mensageria JS/TS
Primeiros passos na Biblioteca
Podemos gerenciar facilmente um Consumers e Publishers, com poucas linhas de código, abstraindo boa parte da Lógica de Validação, Instanciamento de Filas e Exchanges/Tópicos, a ideia como um todo é que o Desenvolvedor que irá utilizar a Biblioteca não precise conhecer a fundo os Providers e apenas utilize a Biblioteca.
A Biblioteca visa ser totalmente Agnostica, ou seja, o Desenvolvedor não precisa alterar o Fluxo de Funcionamento quando quiser utilizar outro provider, apenas alterar as configurações da Biblioteca e tudo continuará funcionando corretamente.
Instalar Laqus.MensageriaJS em um Projeto
Execute o comando:
joe@doe:~$ npm install laqus.mensageriajs
Configurando váriaveis da Biblioteca
Para a configuração inicial dos Providers que serão instânciados, as configurações podem ser passadas via Construtor da Classe principal, como por exemplo:
const { MessageBrokerFactory } = require('laqus.mensageriajs');
/* RabbitMQ Example */
const RabbitMQ = new MessageBrokerFactory({
BrokerType: 'RabbitMQ'
ConnectionURI: 'amqp://127.0.0.1'
}); // -> required semicolon
/* SQS Example */
const SQS = new MessageBrokerFactory({
BrokerType: 'SQS'
Region: 'us-east-1',
AccessKey: '123456',
SecretKey: '78910',
SessionToken: 'optional' // Optional Parameter (Development-only)
}); // -> required semicolon
Broker Types
- [X] RabbitMQ
- [X] SQS
- [ ] Kafka (Em Breve)
Códigos de Exemplo
Os códigos a seguir irão funcionar para qualquer tipo de Provider
existente na Biblioteca, pois ela irá abstrair de forma sucinta a maneira com que as ações são realizadas pelos Providers
.
Conectando a minha Instância
(async () => {
try {
await Provider.Connect() /* Create Session of Connection */
} catch (error) {
console.log(error)
}
})();
Gerenciando um Consumer
(async () => {
try {
await Provider.Connect()
/* CONSUMER WITH DEAD LETTER */
await Provider.AddConsumer('Laqus-lib-teste', (msg => /* ... */), {
DLQ: true // CREATE DEAD LETTER QUEUE,
MaxRetries?: 3 // MAX RANGE OF RETRIES (DEFAULT 3),
WaitTimeSeconds?: 10 // SECONDS TO WAIT MESSAGE RECEIVE (DEFAULT 10)
})
/* COMMON CONSUMER */
await Provider.AddConsumer('Laqus-lib-teste', Controller.function.bind(Controller)) /* Bind Strategy per Message */
} catch (error) {
console.log(error)
}
})();
Gerenciando um Publisher
(async () => {
try {
await Provider.Connect()
/* SEND TO A QUEUE WITH A DEAD LETTER ASSIGNED */
await Provider.Send('Laqus-lib-teste', Buffer.from(JSON.stringify({abc: 123})), { DLQ: true }) /* Single Queued Strategy */
/* SEND TO A COMMON QUEUE */
await Provider.Send('Laqus-lib-teste', Buffer.from(JSON.stringify({abc: 123})), ) /* Single Queued Strategy */
await Provider.PublishMessage('myTopic', Buffer.from(JSON.stringify({abc: 123}))) /* Broadcast Strategy */
await Provider.PublishMessage('myTopic', Buffer.from(JSON.stringify({abc: 123}))) /* Broadcast Strategy */
await Provider.PublishMessage('myTopic', Buffer.from(JSON.stringify({abc: 123})), 'routingKey') /* Broadcast Strategy (Routing) */
} catch (error) {
console.log(error)
}
})();
Criando uma Exchange/Topic
(async () => {
try {
await Provider.Connect()
await Provider.CreateTopic('myTopic', 'direct')
} catch (error) {
console.log(error)
}
})();
Assinando uma Exchange/Topic
Neste caso existem dois cenários que podem ser utilizados de maneira estrátegica, o Cenário com Roteamente e sem Roteamento.
(async () => {
try {
await Provider.Connect()
/* ASSIGN TOPIC WITH A DEAD LETTER QUEUE STRATEGY */
await Provider.AssignTopic('queue', 'topic', {
DLQ: true
}) /* Sem Roteamento */
/* ASSIGN TOPIC COMMON */
await Provider.AssignTopic('queue', 'topic') /* Sem Roteamento */
} catch (error) {
console.log(error)
}
})();
(async () => {
try {
await Provider.Connect()
await Provider.AssignTopic('queue', 'topic', 'routingKey') /* Com Roteamento */
} catch (error) {
console.log(error)
}
})();
A distinção entre as duas está na forma de receber mensagens, realizando a assinatura de uma Exchange/Tópico com uma Chave de Roteamento, você está dizendo que as Mensagens que chegarem na sua Exchange/Tópico serão direcionados a fila com base nessa chave, caso a Exchange/Tópico não tenha esse Chaveamento, ele irá se comportar normalmente recebendo mensagens.
Tipagem dos Métodos e Interfaces
A interface da MessageBrokerFactory, responsável por realizar os direcionamentos necessários para os Providers criados do RabbitMQ
e SQS
, essa inteface Herda alguns tipos dos Providers Originais, de modo que não seja alterado o funcionamento como um todo, mas sim, apenas facilitar o uso.
export interface IMessageProvider {
Connect(): Promise<void>;
AddConsumer(queue: string, cb: (message: ConsumeMessage | SQS.Types.Message | null) => Promise<void>, options?: DLQOptions | null | number): Promise<void>;
SendMessage(queue: string, message: Buffer, messageType?: string | null, options?: DLQOptions | null | number): Promise<void>
CreateTopic(exchange: string, type: string, config?: Options.AssertExchange | null): Promise<void | string>;
PublishMessage(exchange: string, message: Buffer, routingKey?: string | null, messageType?: string | null): Promise<void>;
AssignTopic(queue: string, exchange: string, routingKey?: string | null, options?: DLQOptions | null | number): Promise<void>;
}
Connect()
Conecta-se ao provedor de mensageria. Deve ser chamado antes de usar qualquer outro método.
sequenceDiagram
actor Application
Application->>+Laqus.MensageriaJS: First Configuration
Laqus.MensageriaJS->>+Provider: Provider Instance
Note over Laqus.MensageriaJS,Provider: Choose Provider
alt SQS
Provider-->>+SQS: Choosed Instance
else RabbitMQ
Provider-->>+RabbitMQ: Choosed Instance
end
SQS-->>-Application: Open connection
RabbitMQ-->>-Application: Open connection
AddConsumer(queue, cb, options?)
Registra um consumidor para processar mensagens em uma fila específica.
queue
: O nome da fila a ser consumida.cb
: Uma função de retorno que será chamada para cada mensagem recebida na fila.options
(opcional): Opções adicionais para configurar o consumo, como o tempo de espera e visibilidade.DLQOptions
: Opções para o Provider RabbitMQnull
: Irá utilizar as configurações padrão de cada Providernumber
: Define o tempo de espera em segundos para o Pooling de mensagens, 0 ~ 20 segundos (0 - Short) e (>= 1 - Long)
sequenceDiagram
actor Application
Laqus.MensageriaJS->>+Provider: Provider Instance
Note over Laqus.MensageriaJS,Provider: Previous choosed Provider
Application->>+Laqus.MensageriaJS: Add Consumer
Provider-->Provider: Pooling messages
Provider->>+Application: Consumed Message
Provider-->Provider: Remove message from Queue
SendMessage(queue, message)
Envia uma mensagem para uma fila específica.
queue
: O nome da fila de destino.message
: O conteúdo da mensagem a ser enviada (JSON).messageType
: Nome do URN para Associação com o Masstransit. (Exemplo:Laqus:Exemplo
)options
: Options para a Fila com uma DeadLetter Strategy vinculada
sequenceDiagram
actor Application
Laqus.MensageriaJS->>+Provider: Provider Instance
Note over Laqus.MensageriaJS,Provider: Previous choosed Provider
Application->>+Laqus.MensageriaJS: Send Message
Laqus.MensageriaJS-)+Provider: Send Buffered Message
Provider--)Application: End Process
CreateTopic(exchange, type?, config?)
Cria um tópico para troca de mensagens entre várias filas.
exchange
: O nome do tópico (exchange).type
(opcional): O tipo do tópico (exchange), como "fanout", "direct", etc.config
(opcional): Configurações adicionais para o tópico.
sequenceDiagram
actor Application
Laqus.MensageriaJS->>+Provider: Provider Instance
Note over Laqus.MensageriaJS,Provider: Previous choosed Provider
Application->>+Laqus.MensageriaJS: Create Topic
alt SQS
Laqus.MensageriaJS-)+Provider: Create an SNS Topic
else RabbitMQ
Laqus.MensageriaJS-)+Provider: Create an Exchange
end
Provider--)Application: End Process
PublishMessage(exchange, message, routingKey?)
Publica uma mensagem em um tópico específico.
exchange
: O nome do tópico (exchange) de destino.message
: O conteúdo da mensagem a ser publicada.routingKey
(opcional): A chave de roteamento da mensagem.messageType
: Nome do URN para Associação com o Masstransit. (Exemplo:Laqus:Exemplo
)
sequenceDiagram
actor Application
Laqus.MensageriaJS->>+Provider: Provider Instance
Note over Laqus.MensageriaJS,Provider: Previous choosed Provider
Application->>+Laqus.MensageriaJS: Send Message
Laqus.MensageriaJS-)+Provider: Send Buffered Message
Provider->>Exchange/Topic: Receive
alt Routing Strategy
Exchange/Topic->>Queue(s): Dispatch Message to One Queue
else Broadcast Strategy
Exchange/Topic->>Queue(s): Dispatch Message to Assigned Queues
end
Queue(s)--)Application: End Process
AssignTopic(queue, exchange, routingKey)
Associa uma fila a um tópico específico para receber mensagens.
queue
: O nome da fila a ser associada.exchange
: O nome do tópico (exchange) ao qual a fila será associada.routingKey
: A chave de roteamento para a associação.options
: Options para a Fila com uma DeadLetter Strategy vinculada
sequenceDiagram
actor Application
Laqus.MensageriaJS->>+Provider: Provider Instance
Note over Laqus.MensageriaJS,Provider: Previous choosed Provider
Application->>+Laqus.MensageriaJS: Assign Topic
alt SQS
Laqus.MensageriaJS-)+Provider: Assign to an SNS Topic
Provider-)+Queue: Assign
alt Routing Strategy
Queue->>Exchange/Topic: Routing Key
else Broadcast Strategy
Queue->>Exchange/Topic: Empty Routing Key
end
else RabbitMQ
Laqus.MensageriaJS-)+Provider: Assign to an Exchange
Provider-)+Queue: Assign
alt Routing Strategy
Queue->>Exchange/Topic: Routing Key
else Broadcast Strategy
Queue->>Exchange/Topic: Empty Routing Key
end
end
Exchange/Topic--)Application: End Process
Interface que define as opções de configuração para a biblioteca de mensageria.
interface IProviderOptions {
BrokerType: BrokerType,
ConnectionURI?: string,
Region?: string,
AccessKey?: string,
SecretKey?: string,
SessionToken?: string
}
interface DLQOptions {
DLQ?: boolean,
MaxRetries?: number,
WaitTimeSeconds?: number
}
enum BrokerType {
RabbitMQ = 'RabbitMQ',
SQS = 'SQS'
}
BrokerType
: O tipo de provedor de mensageria a ser utilizado. Deve ser um dos valores do enumBrokerType
.ConnectionURI
(opcional): A URI de conexão para o provedor de mensageria.Region
(opcional): A região para o provedor de mensageria (apenas para AWS).AccessKey
(opcional): A chave de acesso para autenticação (apenas para AWS).SecretKey
(opcional): A chave secreta para autenticação (apenas para AWS).SessionToken
(opcional): O token de sessão para autenticação (apenas para AWS).
Enum BrokerType
Uma enumeração que define os tipos de provedores de mensageria suportados.
SQS
: Provedor de mensageria Amazon SQS.RabbitMQ
: Provedor de mensageria RabbitMQ.