npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

appsync-subscription-observable

v1.2.2

Published

AppSync subscription implementation with RxJs observable

Downloads

2

Readme

AppSync subscription implemented as an RxJs observable

This library provides an optimized way to subscribe to AppSync updates and get a stream of events. It aims to hide the connection management part and behaves as each subscription has its own connection.

It implements the AppSync websocket protocol detailed here with only RxJS as a dependency.

It handles:

  • Reuses the same connection for multiple subscriptions
  • The authorization headers used for the connection are the same as the subscription that opened that connection. This makes it easy to refresh authorization for reconnections.
  • Provides an opened handler that is called when AppSync sends the start_ack message. This makes it possible to reliably know when events are expected
  • Allows defining the retry parameters for both the connection and the subscription

Installation

npm install appsync-subscription-observable

Usage

Initialize the connection object:

import { appsyncRealtime } from "appsync-subscription-observable";

// APIURL is the AppSync GraphQL URL
const connection = appsyncRealtime({APIURL});

Subscribe to updates:

connection({getAuthorizationHeaders: () => ({...})(query, variables)
  .subscribe({
    next: (e) => console.log("new item", e),
    error: (e) => console.log("error", e),
    complete: () => console.log("complete"),
  });

Options

appsyncRealtime

APIURL (requried)

AppSync GraphQL endpoint.

connectionRetryConfig

Defines the retry parameters when the connection can not be establised. The time to wait between attempts is calculated as: Math.min(base ^ retryCount, cap). The values that can be configured:

  • base: The base value for the exponential backoff
  • cap: The max time between two retries
  • maxAttempts: The maximum number of times to retry before erroring
  • timeout: How much time to wait in each try for the connection to be established

closeDelay

An observable factory that can delay closing the connection when all subscription are unsubscribed.

Example:

import {timer} from "rxjs";

// wait 6 seconds before closing the connection
const connection = appsyncRealtime({APIURL, closeDelay: () => timer(6000)});

This is useful as if the last subscription is closed then the connection will be closed immediately, resulting in multiple opening-closing of the WebSocket connection.

// subscribe to query1/variables1 and wait for the first event
await firstValueFrom(connection(config)(query1, variables1));

// here the WebSocket connection is closed without a closeDelay

// subscribe to query2/variable2 and wait for the first event
await firstValueFrom(connection(config)(query2, variables2));

WebSocketCtor

The WebSocket to use. Useful if there is no global WebSocket object (such as in NodeJS)

import WebSocket from "ws":

const connection = appsyncRealtime({APIURL, WebSocketCtor: WebSocket});

Subscription

getAuthorizationHeaders (required)

A function that gets a connect (boolean) and a data ({query, variables}) arguments and needs to return an object with the authorization headers.

The values depend on the authorization mode and are documented here.

Example for API key authorization:

const subscription = connection({
  getAuthorizationHeaders: () => ({host: new URL(APIURL).host, "x-api-key": APIKEY})
});

Cognito User Pool JWT:

// getAccessToken() returns the Cognito Access Token
const subscription = connection({
  getAuthorizationHeaders: async () => ({host: new URL(APIURL).host, Authorization: await getAccessToken()}),
});

IAM:

import {SignatureV4} from "@aws-sdk/signature-v4";
import {HttpRequest} from "@aws-sdk/protocol-http";
import {defaultProvider} from "@aws-sdk/credential-provider-node";
import {URL} from "url";
import {Hash} from "@aws-sdk/hash-node";

// IAM authorization needs the region as well
const {APIURL, apiRegion} = process.env;

const subscription = connection({
  getAuthorizationHeaders: async ({connect, data}) => {
    const url = new URL(APIURL + (connect ? "/connect" : ""));
    const httpRequest = new HttpRequest({
      body: JSON.stringify(connect ? {} : data),
      headers: {
        "content-type": "application/json; charset=UTF-8",
        accept: "application/json, text/javascript",
        "content-encoding": "amz-1.0",
        host: url.hostname,
      },
      hostname: url.hostname,
      method: "POST",
      path: url.pathname,
      protocol: url.protocol,
      query: {},
    });
    
    const signer = new SignatureV4({
      credentials: defaultProvider(),
      service: "appsync",
      region: apiRegion,
      sha256: Hash.bind(null, "sha256"),
    });
    
    const req = await signer.sign(httpRequest);

    return req.headers;
  }
});

opened

A function that will be called when the start_ack message is received from AppSync. Useful when you need to make sure the subscription is established before moving on, such as before fetching data from the backend to not lose events.

const opened = new Subject();

connection({
  // ...
  opened: () => opened.next(),
})(query, variables).subscribe({
  next: (e) => console.log("Item!", e),
  error: (e) => console.error(e),
});

// wait for the subscription open
await firstValueFrom(opened);
// subscription is live here

subscriptionRetryConfig

Retry config for the subscription itself. Gets the same arguments as the connection retry config

Arguments for the returned function

The subscription returns a function that needs the query and the variables. These define the GraphQL subscription query.

connection(connectionParams)(`subscription MySubscription {
  door {
    open
    last_updated
  }
}`, {}),

The returned observable

  • emit an event for every data event coming for the subscription
  • completes when the subscription is completed or the connection is closed
  • errors if the subscription or the connection receives an error event

Persistent subscription

The library also provides a persistentSubscription export that is designed to never terminate. It is useful for operations that need to run without termination.

To use it:

import { appsyncRealtime, persistentSubscription } from "appsync-subscription-observable";

const connection = appsyncRealtime({APIURL});

persistentSubscription(connection)({getAuthorizationHeaders})(query, variables)
  .subscribe((e) => console.log("new item", e);

The options it supports on top of the subscription:

  • closed: a function that is called when the connection is offline. It is an approximation, but can be used to show an offline label
  • reopenTimeoutOnError: an Observable factory to define when to retry after an error
  • reopenTimeoutOnComplete: an Observable factory to define when to retry after a complete

The obseravable never ends, it only emits events.