npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@synonymdev/blocktank-worker2

v0.2.8

Published

Downloads

394

Readme

blocktank-worker2

Microservice module based on Grenache DHT and AMPQlib RabbitMQ messages. Written in Typescript, supports Javascript.

Usage

Run DHT for service discovery

npm i -g grenache-grape   # Only once
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'

Run RabbitMQ for events

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:3.10.2-management

Open the dashboard http://localhost:15672/ and login with guest/guest.

Worker

A Worker consists of

  • a server that listens on method calls.
  • RabbitMQ event publisher (fanout exchange).
import { Worker, WorkerImplementation, waitOnSigint, GrenacheClient } from '@synonymdev/blocktank-worker2';

const client = new GrenacheClient()

class HelloWorldWorkerImplementation extends WorkerImplementation {
    /**
     * Every method defined in here can be called by other workers/clients.
     */
    async helloWorld(name1: string, name2: string) {
        return `Hello ${name1} and ${name2}`;
    }

    async callOtherWorkerUsdToBtc(usd: number) {
        const exchangeRate = client.encapsulateWorker('exchange_rate') // Get exchangeRate worker
        const btcUsd = await exchangeRate.getRate("BTCUSD") // Call method on exchangeRate worker.
        console.log('Current BTCUSD price is', btcUsd) 
        // Current BTCUSD price is $30,000
        return usd/btcUsd
    }
}

const runner = new Worker(new HelloWorldWorkerImplementation(), {
    name: 'HelloWorldService', // Name of the worker.
})
try {
    await runner.start();
    await waitOnSigint() // Wait on Ctrl+C
} finally {
    await runner.stop()
}

Class WorkerImplementation

  • Supports, async and sync and callback functions.
    • If callback functions are used, initialize the Worker with callbackSupport: true.
  • Automatically returns Errors.

Class Worker

constructor(worker, config?)

  • worker: WorkerImplementation
  • config? GrenacheServerConfig
    • name? string Name of the worker. Announced on DHT. Used to name RabbitMQ queues. Default: Random name.
    • grapeUrl? string URL to the grape DHT. Default: http://127.0.0.1:30001.
    • port? integer Server port. Default: Random port between 10,000 and 40,000.
    • callbackSupport? boolean Allows WorkerImplementation functions to be written with callbacks. Disables the method argument count check. Default: false
    • connection? amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl.
    • amqpUrl string RabbitMQ connection URL. Mutually exclusive with connection. Default: amqp://localhost:5672.
    • namespace string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.. Default: blocktank

async start() Starts the worker. Listens on given port.

async stop() Stops the worker. Graceful shutdown.

  • options? WorkerStopOptions
    • cleanupRabbitMq? boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.

Class GrenacheClient

GrenacheClient allows to call other workers without exposing your own server.

constructor(grapeUrl?)

  • grapeUrl string URL to the DHT. Default: http://127.0.0.1:30001.
import { GrenacheClient } from '@synonymdev/blocktank-worker2'

const client = new GrenaceClient()

// Method 1 - Call function
const method = 'helloWorld';
const args = ['Sepp', 'Pirmin'];
const response1 = await client.call('HelloWorldService', method, args)
console.log(response1) // Hello Sepp and Pirmin

async call(workerName, method, args?, opts?) call method of another worker. Returns the worker response.

  • workerName string Name of the worker you want to call.
  • method string Method name you want to call.
  • args? any[] List of arguments. Default: [].
  • opts?: Partial GrenacheClientCallOptions
    • timeoutMs? Request timeout in milliseconds. Default: 60,000.

encapsulateWorker(workerName) Conveninence wrapper. Returns a worker object that can be called with any worker method.

// Example
const helloWorldService = client.encapsulateWorker('HelloWorldService')
const response = await helloWorldService.helloWorld('Sepp', 'Pirmin')
// Hello Sepp and Pirmin

RabbitMQ / Events

RabbitPublisher and RabbitConsumer manage all events around the worker.

Events work on a "at least once" delivery basis. If an error is thrown, the even is retried with an exponential backoff.

Checkout RabbitMQ docs to get an overview on the exchange/queue structure.

Class RabbitConsumer

Consume events from RabbitMQ.

const myServiceName = 'MyServiceName'
const consumer = new RabbitConsumer(myServiceName)
await consumer.init()

try {
    await consumer.onMessage('HelloWorldService', 'invoicePaid', async event => {
        console.log('HelloWorldService.invoicePaid event:', event)
    })
    await waitOnCtrlC()
} finally {
    await consumer.stop() // Graceful shutdown
}

async init() Initializes the consumer. Creates the RabbitMQ exchanges and queues.

async stop(cleanupRabbitMq?, timeoutMs?) Stops consuming messages. Graceful shutdown.

  • cleanupRabbitMq? boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
  • timeoutMs? number Timeout in milliseconds to wait on currently consumed messages to finish. Default: 20,000.

async onMessage(sourceWorkerName, eventName, callback, options?)

  • sourceWorkerName string Name of the worker that emits the event.
  • eventName string Name of the event.
  • callback function Callback function that is called when the event is received.
    • Type: (msg: RabbitEventMessage) => any
    • May be async or sync.
  • options? RabbitConsumeOptions options for this specific event type.
    • backoffFunction: (attempt: number) => number Function that returns the backoff time in milliseconds. Default: exponential backoff.

Important properties of onMessage

  • At-Least-Once-Delivery: Messages can be delivered multiple times and potentially in a different order.
  • Retries: If an error is thrown, the event is retried with an exponential backoff. The backoff function can be customized.

Class RabbitPublisher

Publish events without a consumer.

constructor(myWorkerName, options?)

  • myWorkerName string Name of the worker that emits the event.
  • options? RabbitConnectionOptions
    • connection? amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl.
    • amqpUrl string RabbitMQ connection URL. Mutually exclusive with connection. Default: amqp://localhost:5672.
    • namespace string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.. Default: blocktank

async init() Initializes the producer. Creates the main RabbitMq Exchange.

async stop(cleanupRabbitMq?) Stops the connection.

  • cleanupRabbitMq? boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.

async publish(eventName: string, data: any) Publishes an event.

  • eventName string Name of the event.
  • data any Any json serializable data that is sent with the event.

Logging

The Worker and RabbitConsumer take a logger option.

logger pino.Logger | boolean Default: false. If set to true, a default logger is used. If set to false, no logging is done. If set to a pino logger, this logger is used.

MongoDatabase

Experimental The goal of MongoDatabase is to provide convenient methods for testing. This class is not mature though so it might change in the future.

Run mongo locally:

docker run -it --rm -p 27017:27017 --name ma-mongo-db mongo:latest

Checkout the MongoDB Compass if you need a UI.

Define entities in its own folder:


import { Entity, PrimaryKey, Property, SerializedPrimaryKey } from "@mikro-orm/core";
import {randomUUID} from 'crypto'
import { ObjectId } from "@mikro-orm/mongodb";

@Entity()
export class SampleAuthor {
    @PrimaryKey({ name: "_id" })
    id: string = randomUUID();

    @Property()
    name!: string;
}

Create a mikro-orm.config.ts file to configure your database connection.

import { MikroORMOptions, ReflectMetadataProvider } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
import entities from './1_database/entities';
import { AppConfig } from './0_config/AppConfig';

const appConfig = AppConfig.get()

const config: Partial<MikroORMOptions<MongoDriver>> = {
  entities: entities,
  clientUrl: appConfig.dbUrl,
  metadataProvider: ReflectMetadataProvider,
  debug: false,
  type: 'mongo',
  migrations: {
    path: 'dist/1_database/migrations',
    pathTs: 'src/1_database/migrations',
    transactional: false
  }
};

export default config;
  • See this mikro-orm.config.ts for an example config.
  • Checkout the mikro-orm docs for more info to set up the ORM.
  • You may choose to use another ORM. In that case, make sure you manage test integrations yourself.
  • Checkout this example Entity SampleAuthor.ts.
import {MongoDatabase} from '@synonymdev/blocktank-worker2';
import config from './mikro-orm.config.ts'


try {
    await MongoDatabase.connect(config)
    await MongoDatabase.migrateUp()
    const em = MongoDatabase.createEntityManager()
    const author = new SampleAuthor()
    author.name = 'Sepp'
    await em.persistAndFlush(author)
} finally {
    await MongoDatabase.close()
}

MongoDatabase provides a InMemory database for testing. Checkout the example MongoDatabase.test.ts for more details on how to use the inMemory database to run independent tests.

CLI & Migrations

MikroORM comes with a cli. To use the cli, add this config to your package.json:

  "mikro-orm": {
    "useTsNode": true,
    "configPaths": [
      "./src/mikro-orm.config.ts",
      "./dist/mikro-orm.config.js"
    ]   
  },
  • Use npx mikro-orm migration:create to create a new migration.
  • Use npx mikro-orm migration:up to run migrations.

Development

Testing

  • Test: npm run test. Checkout vscode jest to selectively run tests.

Make tests independent + cleanup RabbitMQ:

import { Worker } from "@synonymdev/blocktank-worker2";

// Use a random RabbitMQ namespace to avoid any conflicts between tests:
const runner = new Worker(worker, {
    namespace: Worker.randomNamespace()
});

try {
    await runner.start()
    // Do your tests here
} finally {
    // Cleanup all existing rabbitMQ objects
    await runner.stop({cleanupRabbitMq: true})
}

Versioning

  1. Increase version in package.json.
  2. Add changes to CHANGELOG.md.
  3. Commit changes.
  4. Tag new version: git tag v0.1.0.
  5. Push tag git push origin v0.1.0.
  6. Publish to npm: npm publish.