rxjs-grpc-minimal
v0.2.6
Published
grpc node callback and streams wrapped in observables
Downloads
10
Maintainers
Readme
rxjs-grpc-minimal
Based off the great work of rxjs-grpc. However, this library intends to very little except to offer you to wrap your server or client GRPC implementation the way you want it.
There is no cli as this library is trying to stay out of the way and allow grpc, or protobufjs do the amazing things they already do.
Install
> yarn add rxjs-grpc-minimal
Usage
Client
const path = require('path');
const { loadSync } = require('protobufjs');
const { loadObject: toGrpc, credentials } = require('grpc');
const { Subject, ReplaySubject } = require('rxjs');
const {
toRxClient, // used most often
toRxServer, // used most often
utils,
errors
} = require('rxjs-grpc-minimal');
const pbAPI = loadSync(
path.join(__dirname,'../examples/helloworld/helloworld.proto'))
.lookup('helloworld');
/*
Wraps all service.prototype methods with RXJS implementations.
Each method is appended to the prototype as `method${RX}` by default.
Thus allowing you access to both RX and nonRx grpc implementations.
*/
const grpcAPI = toRxClient(toGrpc(pbAPI));
/*
Wraps all service.prototype methods with RXJS implementations. However,
this overrides / overwrites all original prototype methods with the RX impl.
*/
const grpcApiOverride = toRxClient(toGrpc(pbAPI), '');
const greeter = new grpcAPI.Greeter('localhost:56001', credentials.createInsecure());
// non stream
conn.sayHelloRx({ name: 'Bob' });
.forEach(resp => {
console.log(grpcAPI.cancelCache.size) // 0
console.log(resp); // { message: 'Hello Bob' } // depends on server
})
let calls = 0;
// STREAMING REPLY FROM SERVER
conn.sayMultiHelloRx({
name: 'Brody',
numGreetings: 2,
doComplete: true
})
.forEach(resp => {
calls++;
console.log({ size: grpcAPI.cancelCache.size})
console.log(resp)
})
.then(() => {
console.log({ size: grpcAPI.cancelCache.size})
console.log({ calls })
});
calls = 0;
/* console out
{ size: 1 }
{ message: 'Hello Brody' }
{ size: 1 }
{ message: 'Hello Brody' }
{ size: 0 }
{ calls: 2 }
*/
// streaming reply from server
const multiHelloStream = conn.sayMultiHelloRx({
name: 'Brody',
numGreetings: 2,
doComplete: true
})
.forEach(resp => {
calls++;
console.log({ size: grpcAPI.cancelCache.size})
console.log(resp)
// imagine you need to cancel this stream in between and abort early
multiHelloStream.grpcCancel();
})
.then(() => {
console.log({ size: grpcAPI.cancelCache.size})
console.log({ calls })
});
calls = 0;
/* console out
{ size: 1 }
{ message: 'Hello Brody' }
{ size: 0 }
{ calls: 1 }
*/
// STREAMING REQUEST | client streaming to server
const writer = new Subject();
const observable = conn.streamSayHelloRx(writer);
observable
.forEach(resp => {
calls++;
console.log(resp);
console.log({ calls });
})
.then(() => {
writer.unsubscribe();
});
console(grpcAPI.cancelCache.size) // 1
// ok we're now subscribed
writer.next({ name: 'Al' });
writer.next({ name: 'Bundy' });
writer.complete();
console.log(grpcAPI.cancelCache.size) // 0
/* console out
1
{ message: 'Hello Al' }
{ calls: 1 }
{ message: 'Hello Bundy' }
{ calls: 2 }
0
*/
// CONNECTION CLEANUP
/*
Imagine we abort in between or crash but catch the problem.
prior to conn.close we could clean up all.
This guarantees that the observer on the sever side is cleaned up and released.
This also allows you to truly close your connection without dangling a channel/subchannel.
*/
grpcAPI.cancelCache.forEach((cancel) => cancel());
conn.close();
Server
See serverRx.js
API
toRxClient(grpcObject, methodExt)
grpcObject
Type:
Object
- initially created bygrpc.loadObject
w or w/o protobufjsload
|loadSync
methodExt
Type:
String
- defaults to'Rx'
This is the method naming extension where the original method name is appended with
something{RX}
.greeter.sayMultiHelloRx // RX function greeter.sayMultiHello // node stream function, and other functions could be callback, callback and steams toRxClient(grpcAPI, ''); // ... greeter.sayMultiHello // RX function all node stream, callback etc hidden / wrapped // NOTE: all RX functions will always return observables (consistent!) .
toRxServer(service, rxImpl, serviceName)
service
Type:
Object
- GrpcService definitiongrpcAPI[serviceName]
rxImpl
Type:
Object
- Your RxJS server implementation which matches the service method handles to be implemented.serviceName (optional)
Type:
String
- aids in debug via debug-fabulous logging.