npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@sectester/bus

v0.28.0

Published

The package includes a simplified implementation of the `EventBus`, one based on `RabbitMQ`, to establish synchronous and asynchronous communication between services and agents.

Downloads

313

Readme

@sectester/bus

Maintainability Test Coverage Build Status NPM Downloads

The package includes a simplified implementation of the EventBus, one based on RabbitMQ, to establish synchronous and asynchronous communication between services and agents.

Setup

npm i -s @sectester/bus

Usage

Overview

To use the RabbitMQ Event Bus, pass the following options object to the constructor method:

import { Configuration } from '@sectester/core';
import { RMQEventBus, ExponentialBackoffRetryStrategy } from '@sectester/bus';

const config = new Configuration({
  hostname: 'app.neuralegion.com'
});

const repeaterId = 'your Repeater ID';

const bus = new RMQEventBus(
  config.container,
  new ExponentialBackoffRetryStrategy({ maxDepth: 5 }),
  {
    url: config.bus,
    exchange: 'EventBus',
    clientQueue: `agent:${repeaterId}`,
    appQueue: 'app',
    credentials: {
      username: 'bot',
      password: config.credentials!.token
    }
  }
);

The options are specific to the chosen transporter. The RabbitMQ implementation exposes the properties described below:

| Option | Description | | :------------------ | ------------------------------------------------------------------------------------ | | url | EventBus address. | | exchange | Exchange name which routes a message to a particular queue. | | clientQueue | Queue name which your bus will listen to. | | appQueue | Queue name which application will listen to. | | prefetchCount | Sets the prefetch count for the channel. By default, 1 | | connectTimeout | Time to wait for initial connect. If not specified, defaults to heartbeatInterval. | | reconnectTime | The time to wait before trying to reconnect. By default, 20 seconds. | | heartbeatInterval | The interval, in seconds, to send heartbeats. By default, 30 seconds. | | credentials | The username and password to perform authentication. |

Finally, to establish a connection with RabbitMQ, you have to the init() method.

await bus.init();

In case of unrecoverable or operational errors, you will get an exception while initial connecting.

Subscribing to events

To subscribe an event handler to the particular event, you should use the @bind() decorator as follows:

import { bind, Event, EventHandler } from '@sectester/core';
import { injectable } from 'tsyringe';

interface Issue {
  name: string;
  details: string;
  type: string;
  cvss?: string;
  cwe?: string;
}

class IssueDetected extends Event<Issue> {
  constructor(payload: Issue) {
    super(payload);
  }
}

@bind(IssueDetected)
@injectable()
class IssueDetectedHandler implements EventHandler<Issue> {
  public handle(payload: Issue): Promise<void> {
    // implementation
  }
}

⚡ Make sure that you use @injectable() decorator to register the corresponding provider in the IoC. Otherwise, you get an error while trying to register a handler in the EventBus.

Then you just need to register the handler in the EventBus:

await bus.register(IssueDetectedHandler);

Now the IssueDetectedHandler event handler listens for the IssueDetected event. As soon as the IssueDetected event appers, the EventBus will call the handle() method with the payload passed from the application.

To remove subscription, and removes the event handler, you have to call the unregister() method:

await bus.unregister(IssueDetectedHandler);

Publishing events through the event bus

The EventBus exposes a publish() method. This method publishes an event to the message broker.

interface Payload {
  status: 'connected' | 'disconnected';
}

class StatusChanged extends Event<Payload> {
  constructor(payload: Payload) {
    super(payload);
  }
}

const event = new StatusChanged({ status: 'connected' });

await bus.publish(event);

The publish() method takes just a single argument, an instance of the derived class of the Event.

⚡ The class name should match one defined event in the application. Otherwise, you should override it by passing the expected name via the constructor.

For more information, please see @sectester/core.

Executing RPC methods

The EventBus exposes a execute() method. This method is intended to perform a command to the application and returns an Promise with its response.

interface Payload {
  version: string;
}

interface Response {
  lastVersion: string;
}

class CheckVersion extends Command<Payload, Response> {
  constructor(payload: Payload) {
    super(payload);
  }
}

const command = new CheckVersion({ version: '0.0.1' });

const response = await bus.execute(command);

This method returns a Promise which will eventually be resolved as a response message.

For instance, if you do not expect any response, you can easily make the EventBus resolve a Promise immediately to undefined:

class Record extends Command<Payload> {
  public readonly expectReply = false;

  constructor(payload: Payload) {
    super(payload);
  }
}

const command = new Record({ version: '0.0.1' });

await bus.execute(command);

The HttpCommandDispatcher is an alternative way to execute the commands over HTTP. To start, you should create an HttpCommandDispatcher instance by passing the following options to the constructor:

import {
  HttpCommandDispatcher,
  HttpCommandDispatcherConfig
} from '@sectester/bus';
import { container } from 'tsyringe';

const options: HttpCommandDispatcherConfig = {
  baseUrl: 'https://app.neuralegion.com',
  token: 'weobbz5.nexa.vennegtzr2h7urpxgtksetz2kwppdgj0'
};
const logger = container.resolve(Logger);
const retryStrategy = container.resolve(RetryStrategy);

const httpDispatcher = new HttpCommandDispatcher(
  logger,
  retryStrategy,
  options
);

The command dispatcher can be customized using the following options:

| Option | Description | | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | baseUrl | Base URL for your application instance, e.g. https://app.neuralegion.com | | token | API key to access the API. Find out how to obtain personal and organization API keys in the knowledgebase | | timeout | Time to wait for a server to send response headers (and start the response body) before aborting the request. Default 10000 ms | | rate | Set how many requests per interval should perform immediately, others will be delayed automatically. By default, 10 requests per 1 minute |

Then you have to create an instance of HttpRequest instead of a custom command, specifying the url and method in addition to the payload that a command accepts by default:

const command = new HttpCommand({
  url: '/api/v1/repeaters',
  method: 'POST',
  payload: { name: 'test' }
});

Once it is done, you can perform a request using HttpComandDispatcher as follows:

const response: { id: string } = await httpDispatcher.execute(command);

Below you will find a list of parameters that can be used to configure a command:

| Option | Description | | --------------- | ------------------------------------------------------------------------------------------ | | url | Absolute URL or path that will be used for the request. By default, / | | method | HTTP method that is going to be used when making the request. By default, GET | | params | Use to set query parameters. | | payload | Message that we want to transmit to the remote service. | | expectReply | Indicates whether to wait for a reply. By default true. | | ttl | Period of time that command should be handled before being discarded. By default 10000 ms. | | type | The name of a command. By default, it is the name of specific class. | | correlationId | Used to ensure atomicity while working with EventBus. By default, random UUID. | | createdAt | The exact date and time the command was created. |

For more information, please see @sectester/core.

Retry Strategy

For some noncritical operations, it is better to fail as soon as possible rather than retry a coupe of times. For example, it is better to fail right after a smaller number of retries with only a short delay between retry attempts, and display a message to the user.

By default, you can use the Exponential backoff retry strategy to retry an action when errors like ETIMEDOUT appear.

You can implement your own to match the business requirements and the nature of the failure:

export class CustomRetryStrategy implements RetryStrategy {
  public async acquire<T extends (...args: unknown[]) => unknown>(
    task: T
  ): Promise<ReturnType<T>> {
    let times = 0;

    for (;;) {
      try {
        return await task();
      } catch {
        times++;

        if (times === 3) {
          throw e;
        }
      }
    }
  }
}

Once a retry strategy is implemented, you can use it like that:

const retryStrategy = new CustomRetryStrategy();

const bus = new RMQEventBus(container, retryStrategy, options);

License

Copyright © 2022 Bright Security.

This project is licensed under the MIT License - see the LICENSE file for details.