npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rxjs-grpc-minimal

v0.2.6

Published

grpc node callback and streams wrapped in observables

Downloads

42

Readme

Build Status npm version

rxjs-grpc-minimal

Based off the great work of rxjs-grpc. However, this library intends to very little except to offer you to wrap your server or client GRPC implementation the way you want it.

There is no cli as this library is trying to stay out of the way and allow grpc, or protobufjs do the amazing things they already do.

Install

> yarn add rxjs-grpc-minimal

Usage

Client

const path = require('path');
const { loadSync } = require('protobufjs');
const { loadObject: toGrpc, credentials } = require('grpc');
const { Subject, ReplaySubject } = require('rxjs');

const {
  toRxClient, // used most often
  toRxServer, // used most often
  utils,
  errors
} = require('rxjs-grpc-minimal');

const pbAPI = loadSync(
    path.join(__dirname,'../examples/helloworld/helloworld.proto'))
  .lookup('helloworld');

/*
Wraps all service.prototype methods with RXJS implementations.
Each method is appended to the prototype as `method${RX}` by default.
Thus allowing you access to both RX and nonRx grpc implementations.
*/
const grpcAPI = toRxClient(toGrpc(pbAPI));
/*
Wraps all service.prototype methods with RXJS implementations. However,
this overrides / overwrites all original prototype methods with the RX impl.
*/
const grpcApiOverride = toRxClient(toGrpc(pbAPI), '');

const greeter = new grpcAPI.Greeter('localhost:56001', credentials.createInsecure());

// non stream
conn.sayHelloRx({ name: 'Bob' });
.forEach(resp => {
    console.log(grpcAPI.cancelCache.size) // 0
    console.log(resp); // { message: 'Hello Bob' } // depends on server
  })

let calls = 0;

// STREAMING REPLY FROM SERVER
conn.sayMultiHelloRx({
  name: 'Brody',
  numGreetings: 2,
  doComplete: true
})
.forEach(resp => {
  calls++;
  console.log({ size: grpcAPI.cancelCache.size})
  console.log(resp)
})
.then(() => {
  console.log({ size: grpcAPI.cancelCache.size})
  console.log({ calls })
});

calls = 0;

/* console out

{ size: 1 }
{ message: 'Hello Brody' }
{ size: 1 }
{ message: 'Hello Brody' }
{ size: 0 }
{ calls: 2 }
*/

// streaming reply from server
const multiHelloStream = conn.sayMultiHelloRx({
  name: 'Brody',
  numGreetings: 2,
  doComplete: true
})
.forEach(resp => {
  calls++;
  console.log({ size: grpcAPI.cancelCache.size})
  console.log(resp)
  // imagine you need to cancel this stream in between and abort early
  multiHelloStream.grpcCancel();
})
.then(() => {
  console.log({ size: grpcAPI.cancelCache.size})
  console.log({ calls })
});
calls = 0;
/* console out

{ size: 1 }
{ message: 'Hello Brody' }
{ size: 0 }
{ calls: 1 }
*/

// STREAMING REQUEST | client streaming to server
const writer = new Subject();
const observable = conn.streamSayHelloRx(writer);

observable
.forEach(resp => {
  calls++;
  console.log(resp);
  console.log({ calls });
})
.then(() => {
  writer.unsubscribe();
});

console(grpcAPI.cancelCache.size) // 1
// ok we're now subscribed
writer.next({ name:  'Al' });
writer.next({ name: 'Bundy' });
writer.complete();

console.log(grpcAPI.cancelCache.size) // 0

/* console out

1
{ message: 'Hello Al' }
{ calls: 1 }
{ message: 'Hello Bundy' }
{ calls: 2 }
0
*/

// CONNECTION CLEANUP
/*
Imagine we abort in between or crash but catch the problem.
prior to conn.close we could clean up all.

This guarantees that the observer on the sever side is cleaned up and released.
This also allows you to truly close your connection without dangling a channel/subchannel.
*/
grpcAPI.cancelCache.forEach((cancel) => cancel());
conn.close();

Server

See serverRx.js

API

toRxClient(grpcObject, methodExt)

  • grpcObject

    Type: Object - initially created by grpc.loadObject w or w/o protobufjs load|loadSync

  • methodExt

    Type: String - defaults to 'Rx'

    This is the method naming extension where the original method name is appended with something{RX}.

    greeter.sayMultiHelloRx // RX function
    greeter.sayMultiHello // node stream function, and other functions could be callback, callback and steams
    
    toRxClient(grpcAPI, '');
    // ...
    greeter.sayMultiHello // RX function all node stream, callback etc hidden / wrapped
    // NOTE: all RX functions will always return observables (consistent!) .

toRxServer(service, rxImpl, serviceName)

  • service

    Type: Object - GrpcService definition grpcAPI[serviceName]

  • rxImpl

    Type: Object - Your RxJS server implementation which matches the service method handles to be implemented.

  • serviceName (optional)

    Type: String - aids in debug via debug-fabulous logging.