npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

apollo-gateway-subscription

v0.5.6

Published

Sidecar for subscriptions that work with Apollo Federation/Supergraphs

Downloads

13

Readme

Using Subscriptions with a Federated Data Graph

In brief, the utilities contained within this library will allow you to:

  • Create a decoupled, independently scalable subscriptions service to run alongside a unified data graph
  • Use types defined in your unified data graph as return types within the subscriptions service's type definitions (without manually redefining those types in the subscriptions service)
  • Publish messages to a shared pub/sub implementation from any subgraph service
  • Allow clients to write subscription operations just as they would if the Subscription fields were defined directly within the unified data graph itself

Example Usage

If you don't have Docker, get here: https://docs.docker.com/get-docker/

git clone https://github.com/reduxdj/example-gateway-subscriptions
docker compose up

Diagram

The architecture of the provided example may be visualized as follows:

Architectural diagram of a federated data graph with a subscriptions service and a React client app

Make an Executable Schema from Federated and Subscription Type Definitions

The subscriptions service should only contain a definition for the Subscription object type, the types on this field may output any of the types defined in the federated data graph's schema:

// typeDefs.js (subscriptions service)

import gql from "graphql-tag";

export const typeDefs = gql`
  type Subscription {
    postAdded: Post
  }
`;

To make the federated data graph's types available to the subscription service, instantiate an ApolloGateway and call the makeSubscriptionSchema function in the gateway's onSchemaLoadOrUpdate method to combine its schema with the subscription service's type definitions and resolvers to make the complete executable schema.

Managed federation option:

// index.js (subscriptions service)
let schema;
const gateway = new ApolloGateway();

gateway.onSchemaLoadOrUpdate((schemaContext) => {
  schema = makeSubscriptionSchema({
    gatewaySchema: schemaContext.apiSchema,
    typeDefs,
    resolvers,
  });
});

await gateway.load({ apollo: getGatewayApolloConfig(apolloKey, graphVariant) });

Unmanaged federation option:

// index.js (subscriptions service)
let schema;
const gateway = new ApolloGateway({
  serviceList: [
    /* Provide your service list here... */
  ],
  experimental_pollInterval = 36000;
});

gateway.onSchemaLoadOrUpdate(schemaContext => {
  schema = makeSubscriptionSchema({
    gatewaySchema: schemaContext.apiSchema,
    typeDefs,
    resolvers
  });
});

await gateway.load();

Note that for unmanaged federation, we must set a poll interval to query the subgraph services for their schemas to detect a schema change. Polling the running endpoint for these SDLs is fairly blunt approach, so in production, a more computationally efficient approach would be preferable (or managed federation).

Use an Apollo Data Source to Fetch Non-Payload Fields

The subscription service can resolve fields that are included in a published message's payload, but it will need to reach out to the federated data graph to resolve additional non-payload fields. Using an Apollo data source subclassed from the provided GatewayDataSource, specific methods can be defined that fetch the non-payload fields by diffing the payload fields with the overall selection set. Optionally, headers (etc.) may be attached to the request to the federated data graph by providing a willSendRequest method:

// DataSourceExample/index.js (subscriptions service)

import { GatewayDataSource } from "federation-subscription-tools";
import gql from "graphql-tag";

export class LiveBlogDataSource extends GatewayDataSource {
  constructor(gatewayUrl) {
    super(gatewayUrl);
  }

  willSendRequest(request) {
    if (!request.headers) {
      request.headers = {};
    }

    request.headers["apollographql-client-name"] = "Subscriptions Service";
    request.headers["apollographql-client-version"] = "0.1.0";

    // Forwards the encoded token extracted from the `connectionParams` with
    // the request to the gateway
    request.headers.authorization = `Bearer ${this.context.token}`;
  }

  async fetchAndMergeNonPayloadPostData(postID, payload, info) {
    const selections = this.buildNonPayloadSelections(payload, info);
    const payloadData = Object.values(payload)[0];

    if (!selections) {
      return payloadData;
    }

    const Subscription_GetPost = gql`
      query Subscription_GetPost($id: ID!) {
        post(id: $id) {
          ${selections}
        }
      }
    `;

    try {
      const response = await this.query(Subscription_GetPost, {
        variables: { id: postID },
      });
      return this.mergeFieldData(payloadData, response.data.post);
    } catch (error) {
      console.error(error);
    }
  }
}

In the resolvers for the subscription field, the fetchAndMergeNonPayloadPostData method may be called to resolve all requested field data:

// resolvers.js (subscriptions service)

const resolvers = {
  Subscription: {
    postAdded: {
      resolve(payload, args, { dataSources: { gatewayApi } }, info) {
        return gatewayApi.fetchAndMergeNonPayloadPostData(
          payload.postAdded.id,
          payload, // known field values
          info // contains the complete field selection set to diff
        );
      },
      subscribe(_, args) {
        return pubsub.asyncIterator(["POST_ADDED"]);
      },
    },
  },
};

In effect, this means that as long the resource that is used as the output type for any subscriptions field may be queried from the federated data graph, then this node may be used as an entry point to that data graph to resolve non-payload fields.

For the gateway data source to be accessible in Subscription field resolvers, we must manually add it to the request context using the addGatewayDataSourceToSubscriptionContext function. Note that this example uses graphql-ws to serve the WebSocket-enabled endpoint for subscription operations. A sample implementation may be structured as follows:

// index.js (subscriptions service)

const httpServer = http.createServer(function weServeSocketsOnly(_, res) {
  res.writeHead(404);
  res.end();
});

const wsServer = new ws.Server({
  server: httpServer,
  path: "/graphql",
});

useServer(
  {
    execute,
    subscribe,
    context: (ctx) => {
      // If a token was sent for auth purposes, retrieve it here
      const { token } = ctx.connectionParams;

      // Instantiate and initialize the GatewayDataSource subclass
      // (data source methods will be accessible on the `gatewayApi` key)
      const liveBlogDataSource = new LiveBlogDataSource(gatewayEndpoint);
      const dataSourceContext = addGatewayDataSourceToSubscriptionContext(
        ctx,
        liveBlogDataSource
      );

      // Return the complete context for the request
      return { token: token || null, ...dataSourceContext };
    },
    onSubscribe: (_ctx, msg) => {
      // Construct the execution arguments
      const args = {
        schema,
        operationName: msg.payload.operationName,
        document: parse(msg.payload.query),
        variableValues: msg.payload.variables,
      };

      const operationAST = getOperationAST(args.document, args.operationName);

      // Stops the subscription and sends an error message
      if (!operationAST) {
        return [new GraphQLError("Unable to identify operation")];
      }

      // Handle mutation and query requests
      if (operationAST.operation !== "subscription") {
        return [new GraphQLError("Only subscription operations are supported")];
      }

      // Validate the operation document
      const errors = validate(args.schema, args.document);

      if (errors.length > 0) {
        return errors;
      }

      // Ready execution arguments
      return args;
    },
  },
  wsServer
);

httpServer.listen({ port }, () => {
  console.log(
    `🚀 Subscriptions ready at ws://localhost:${port}${wsServer.options.path}`
  );
});

Try the Demo

Installation & Set-up

mutation AddPost {
  addPost(authorID: 1, content: "Hello, world!", title: "My Next Post") {
    id
    author {
      name
    }
    content
    publishedAt
    title
  }
}

Rationale

The architecture demonstrated in this project seeks to provide a bridge to native Subscription operation support in Apollo Federation. This approach to subscriptions has the advantage of allowing the Apollo Gateway API to remain as the "stateless execution engine" of a federated data graph while offloading all subscription requests to a separate service, thus allowing the subscription service to be scaled independently of the gateway.

To allow the Subscription fields to specify return types that are defined in gateway API only, the federated data graph's type definitions are merged with the subscription service's type definitions and resolvers in the gateway's onSchemaChange callback to avoid re-declaring these types explicitly here.

Architectural Details

Components

Docker will start five different services with docker-compose up:

1. Gateway Server + Subgraph Services

This service contains the federated data graph. For simplicity's sake, two implementing services (for authors and posts) have been bundled with the gateway API in this service. Each implementing service connects to Redis as needed so it can publish events from mutations (the "pub" end of subscriptions). For example:

import { pubsub } from "./redis";

export const resolvers = {
  // ...
  Mutation: {
    addPost(root, args, context, info) {
      const post = newPost();
      pubsub.publish("POST_ADDED", { postAdded: post });
      return post;
    },
  },
};

2. Subscriptions Server

This service also connects to Redis to facilitate the "sub" end of the subscriptions. This service is where the Subscription type and related fields are defined. As a best practice, only define a Subscription type and applicable resolvers in this service.

When sending subscription data to clients, the subscription service can't automatically resolve any data beyond what's provided in the published payload from the implementing service. This means that to resolve nested types (or any other fields that aren't immediately available in the payload object), the resolvers must be defined in the subscription services to fetch this data on a field-by-field basis.

There are a number of possible approaches that could be taken here, but one recommended approach is to provide an Apollo data source with methods that automatically compare the fields included in the payload against the fields requested in the operation, then selectively query the necessary field data in a single request to the gateway, and finally combine the returned data with the with original payload data to fully resolve the request. For example:

import { pubsub } from "./redis";

export const resolvers = {
  Subscription: {
    postAdded: {
      resolve(payload, args, { dataSources: { gatewayApi } }, info) {
        return gatewayApi.fetchAndMergeNonPayloadPostData(
          payload.postAdded.id,
          payload,
          info
        );
      },
      subscribe(_, args) {
        return pubsub.asyncIterator(["POST_ADDED"]);
      },
    },
  },
};

3. Redis

A shared Redis instance is used to capture publications from the services behind the federated data graph as well as the subscriptions initiated in the subscriptions service, though other PubSub implementations could easily be supported. Note that an in-memory pub/sub implementation will not work because it cannot be shared between the separate gateway and subscription services.

4. React App

The React app contains a homepage with a list of posts as well as a form to add new posts. When a new post is added, the feed of posts on the homepage will be automatically updated.

Important Considerations

Subscriptions Must be Defined in a Single Service:

This solution requires all Subscription fields to be defined in a single, decoupled subscription service. This requirement may necessitate that ownership of this service is shared amongst teams that otherwise manage independent portions of the schema applicable to queries and mutations.

Synchronizing Event Labels:

Some level of coordination would be necessary to ensure that event labels (e.g. POST_ADDED) are synchronized between the implementing services that publish events and the subscription service that calls the asyncIterator method with these labels as arguments. Breaking changes may occur without such coordination.