@synonymdev/blocktank-worker2
v0.2.8
Published
Downloads
394
Keywords
Readme
blocktank-worker2
Microservice module based on Grenache DHT and AMPQlib RabbitMQ messages. Written in Typescript, supports Javascript.
Usage
Run DHT for service discovery
npm i -g grenache-grape # Only once
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'
Run RabbitMQ for events
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:3.10.2-management
Open the dashboard http://localhost:15672/ and login with guest/guest.
Worker
A Worker consists of
- a server that listens on method calls.
- RabbitMQ event publisher (fanout exchange).
import { Worker, WorkerImplementation, waitOnSigint, GrenacheClient } from '@synonymdev/blocktank-worker2';
const client = new GrenacheClient()
class HelloWorldWorkerImplementation extends WorkerImplementation {
/**
* Every method defined in here can be called by other workers/clients.
*/
async helloWorld(name1: string, name2: string) {
return `Hello ${name1} and ${name2}`;
}
async callOtherWorkerUsdToBtc(usd: number) {
const exchangeRate = client.encapsulateWorker('exchange_rate') // Get exchangeRate worker
const btcUsd = await exchangeRate.getRate("BTCUSD") // Call method on exchangeRate worker.
console.log('Current BTCUSD price is', btcUsd)
// Current BTCUSD price is $30,000
return usd/btcUsd
}
}
const runner = new Worker(new HelloWorldWorkerImplementation(), {
name: 'HelloWorldService', // Name of the worker.
})
try {
await runner.start();
await waitOnSigint() // Wait on Ctrl+C
} finally {
await runner.stop()
}
Class WorkerImplementation
- Supports, async and sync and callback functions.
- If callback functions are used, initialize the Worker with
callbackSupport: true
.
- If callback functions are used, initialize the Worker with
- Automatically returns
Error
s.
Class Worker
constructor(worker, config?)
worker
: WorkerImplementationconfig?
GrenacheServerConfigname?
string Name of the worker. Announced on DHT. Used to name RabbitMQ queues. Default: Random name.grapeUrl?
string URL to the grape DHT. Default:http://127.0.0.1:30001
.port?
integer Server port. Default: Random port between 10,000 and 40,000.callbackSupport?
boolean Allows WorkerImplementation functions to be written with callbacks. Disables the method argument count check. Default: falseconnection?
amp.Connection RabbitMQ connection. Mutually exclusive withamqpUrl
.amqpUrl
string RabbitMQ connection URL. Mutually exclusive withconnection
. Default:amqp://localhost:5672
.namespace
string RabbitMQ namespace. All objects like exchanges, queues will start with{namespace}.
. Default:blocktank
async start() Starts the worker. Listens on given port.
async stop() Stops the worker. Graceful shutdown.
options?
WorkerStopOptionscleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
Class GrenacheClient
GrenacheClient
allows to call other workers without exposing your own server.
constructor(grapeUrl?)
grapeUrl
string URL to the DHT. Default:http://127.0.0.1:30001
.
import { GrenacheClient } from '@synonymdev/blocktank-worker2'
const client = new GrenaceClient()
// Method 1 - Call function
const method = 'helloWorld';
const args = ['Sepp', 'Pirmin'];
const response1 = await client.call('HelloWorldService', method, args)
console.log(response1) // Hello Sepp and Pirmin
async call(workerName, method, args?, opts?) call method of another worker. Returns the worker response.
workerName
string Name of the worker you want to call.method
string Method name you want to call.args?
any[] List of arguments. Default: [].opts?
: Partial GrenacheClientCallOptionstimeoutMs?
Request timeout in milliseconds. Default: 60,000.
encapsulateWorker(workerName) Conveninence wrapper. Returns a worker object that can be called with any worker method.
// Example
const helloWorldService = client.encapsulateWorker('HelloWorldService')
const response = await helloWorldService.helloWorld('Sepp', 'Pirmin')
// Hello Sepp and Pirmin
RabbitMQ / Events
RabbitPublisher
and RabbitConsumer
manage all events around the worker.
Events work on a "at least once" delivery basis. If an error is thrown, the even is retried with an exponential backoff.
Checkout RabbitMQ docs to get an overview on the exchange/queue structure.
Class RabbitConsumer
Consume events from RabbitMQ.
const myServiceName = 'MyServiceName'
const consumer = new RabbitConsumer(myServiceName)
await consumer.init()
try {
await consumer.onMessage('HelloWorldService', 'invoicePaid', async event => {
console.log('HelloWorldService.invoicePaid event:', event)
})
await waitOnCtrlC()
} finally {
await consumer.stop() // Graceful shutdown
}
async init() Initializes the consumer. Creates the RabbitMQ exchanges and queues.
async stop(cleanupRabbitMq?, timeoutMs?) Stops consuming messages. Graceful shutdown.
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.timeoutMs?
number Timeout in milliseconds to wait on currently consumed messages to finish. Default: 20,000.
async onMessage(sourceWorkerName, eventName, callback, options?)
sourceWorkerName
string Name of the worker that emits the event.eventName
string Name of the event.callback
function Callback function that is called when the event is received.- Type: (msg: RabbitEventMessage) => any
- May be async or sync.
options?
RabbitConsumeOptions options for this specific event type.backoffFunction
: (attempt: number) => number Function that returns the backoff time in milliseconds. Default: exponential backoff.
Important properties of onMessage
- At-Least-Once-Delivery: Messages can be delivered multiple times and potentially in a different order.
- Retries: If an error is thrown, the event is retried with an exponential backoff. The backoff function can be customized.
Class RabbitPublisher
Publish events without a consumer.
constructor(myWorkerName, options?)
myWorkerName
string Name of the worker that emits the event.options?
RabbitConnectionOptionsconnection?
amp.Connection RabbitMQ connection. Mutually exclusive withamqpUrl
.amqpUrl
string RabbitMQ connection URL. Mutually exclusive withconnection
. Default:amqp://localhost:5672
.namespace
string RabbitMQ namespace. All objects like exchanges, queues will start with{namespace}.
. Default:blocktank
async init() Initializes the producer. Creates the main RabbitMq Exchange.
async stop(cleanupRabbitMq?) Stops the connection.
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
async publish(eventName: string, data: any) Publishes an event.
eventName
string Name of the event.data
any Any json serializable data that is sent with the event.
Logging
The Worker
and RabbitConsumer
take a logger
option.
logger
pino.Logger | boolean Default: false. If set to true, a default logger is used. If set to false, no logging is done. If set to a pino logger, this logger is used.
MongoDatabase
Experimental The goal of
MongoDatabase
is to provide convenient methods for testing. This class is not mature though so it might change in the future.
Run mongo locally:
docker run -it --rm -p 27017:27017 --name ma-mongo-db mongo:latest
Checkout the MongoDB Compass if you need a UI.
Define entities in its own folder:
import { Entity, PrimaryKey, Property, SerializedPrimaryKey } from "@mikro-orm/core";
import {randomUUID} from 'crypto'
import { ObjectId } from "@mikro-orm/mongodb";
@Entity()
export class SampleAuthor {
@PrimaryKey({ name: "_id" })
id: string = randomUUID();
@Property()
name!: string;
}
Create a mikro-orm.config.ts
file to configure your database connection.
import { MikroORMOptions, ReflectMetadataProvider } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
import entities from './1_database/entities';
import { AppConfig } from './0_config/AppConfig';
const appConfig = AppConfig.get()
const config: Partial<MikroORMOptions<MongoDriver>> = {
entities: entities,
clientUrl: appConfig.dbUrl,
metadataProvider: ReflectMetadataProvider,
debug: false,
type: 'mongo',
migrations: {
path: 'dist/1_database/migrations',
pathTs: 'src/1_database/migrations',
transactional: false
}
};
export default config;
- See this mikro-orm.config.ts for an example config.
- Checkout the mikro-orm docs for more info to set up the ORM.
- You may choose to use another ORM. In that case, make sure you manage test integrations yourself.
- Checkout this example Entity SampleAuthor.ts.
import {MongoDatabase} from '@synonymdev/blocktank-worker2';
import config from './mikro-orm.config.ts'
try {
await MongoDatabase.connect(config)
await MongoDatabase.migrateUp()
const em = MongoDatabase.createEntityManager()
const author = new SampleAuthor()
author.name = 'Sepp'
await em.persistAndFlush(author)
} finally {
await MongoDatabase.close()
}
MongoDatabase provides a InMemory database for testing. Checkout the example MongoDatabase.test.ts for more details on how to use the inMemory database to run independent tests.
CLI & Migrations
MikroORM comes with a cli. To use the cli, add this config to your package.json:
"mikro-orm": {
"useTsNode": true,
"configPaths": [
"./src/mikro-orm.config.ts",
"./dist/mikro-orm.config.js"
]
},
- Use
npx mikro-orm migration:create
to create a new migration. - Use
npx mikro-orm migration:up
to run migrations.
Development
Testing
- Test:
npm run test
. Checkout vscode jest to selectively run tests.
Make tests independent + cleanup RabbitMQ:
import { Worker } from "@synonymdev/blocktank-worker2";
// Use a random RabbitMQ namespace to avoid any conflicts between tests:
const runner = new Worker(worker, {
namespace: Worker.randomNamespace()
});
try {
await runner.start()
// Do your tests here
} finally {
// Cleanup all existing rabbitMQ objects
await runner.stop({cleanupRabbitMq: true})
}
Versioning
- Increase version in
package.json
. - Add changes to
CHANGELOG.md
. - Commit changes.
- Tag new version:
git tag v0.1.0
. - Push tag
git push origin v0.1.0
. - Publish to npm:
npm publish
.