amq-rpc
v0.5.1
Published
Adbstraction over MQ for RPC service
Downloads
3
Maintainers
Readme
AMQ RPC
Attention, module currently in active development ⚠️Soon to be released, maybe around 30 february 2019 🖖
Samples
Client:
import { RpcClient } from 'amq-rpc';
(async () => {
const client = new RpcClient({
service: 'my-awesome-service',
version: '1.2',
connectParams: {
url: 'amqp://guest:guest@localhost:5672/?vhost=/',
heartbeat: 30,
},
waitResponseTimeout: 30 * 1000, // timeout for wait result from service
defaultRetryLimit: 10 // retry limit, by default retry 1 (disabled)
});
await client.ensureConnection(); // accept in first param object as connectParams in constructor
// equal to call 'default' handler
const result = await client.send({ foo: 'bar' }, {
correlationId: 'e.g. nginx req id',
retryLimit: 5, // override default from constructor
});
const result2 = await client.call('myAction', { foo: 'bar' }, {
correlationId: 'e.g. nginx req id',
retryLimit: 5, // override default from constructor
});
await client.destroy();
})().catch(err => console.error(err) || process.exit(1));
Service:
import { RpcService, RpcServiceHandler } from 'amq-rpc';
(async () => {
const service = new RpcService({
service: 'my-awesome-service',
version: '1.2',
connectParams: {
url: 'amqp://guest:guest@localhost:5672/?vhost=/',
heartbeat: 30,
},
queue: {
prefetch: 1,
durable: true,
maxPriority: 100,
},
});
service.setErrorHandler((error) => {
// All errors, which can't passed to reject operation (as error in subscriber function,
// outside of user handler), will be passed to this callback.
});
await service.addHandler(class extends RpcServiceHandler {
// If in message "type" property didn't fill (send without special options),
// service will find handler with action 'default'
get action() {
// in base class, RpcServiceHandler, action equal to 'default'
return 'myAction2';
}
async beforeHandle() {
// called nearly before handle method
// use it for prepare data, init resources or logging
// all throwed errors, as in handle method passed to handleFail method
}
// ⚠️ you must redefine this method from RpcServiceHandler class
async handle() {
// this.payload - sended payload
// this.context - special object, shared between methods. By default equal to {}.
// returned data passed to client as reply payload
return { bar: 'foo' };
}
// ⚠️ redefine this method only if you know what you do
async handleFail(error: Error) {
/*
In base class, RpcServiceHandler:
- if retry disabled or retry limit exceeded
- reject message in queue
- reply to client error with messageId and correlationId
- else
- ack currect message
- resend message to source queue with decremented retry limit header
*/
// you can redefine and customize error handling behavior
}
// ⚠️ redefine this method only if you know what you do
async handleSuccess(replyPayload: Object) {
/*
In base class, RpcServiceHandler:
- ack message in queue
- reply to client with payload and error: null
*/
// you can redefine and customize success handling behavior
}
async onFail(error: Error) {
// hook for logging
}
async onSuccess(replyPayload: Object) {
// hook for logging
}
async afterHandle(error: ?Error, replyPayload: ?Object) {
// if current handler failed, error passed in first argument
// if success handling, replyPayload passed as second argument
// use it for logging or deinit resouces
// wrap this code in try..catch block, because all errors from afterHandle method just
// pass to error handler callback
}
});
// Minimal handler
await service.addHandler(class extends RpcServiceHandler {
async handle() {
return { bar: `${this.payload.foo} 42` };
}
});
await service.ensureConnection();
// If process receive SIGINT, service will be gracefully stopped
// (wait for handler end work until timeout exceeded and then call for process.exit())
await service.interventSignalInterceptors({ stopSignal: 'SIGINT', gracefulStopTimeout: 10 * 1000 });
})().catch(err => console.error(err) || process.exit(1));