npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pg-real

v0.1.0

Published

Postgres publisher/subscriber with the different connectors

Downloads

4

Readme

pg-real

A simple module which helps to manage a pub/sub behavior with the PostgreSQL updates in the real-time through HTTP/SSE/WebSockets connection.

Note: this lib is currently compatible only with a Postgres client from node-postgres. If you use any other client this lib isn't suitable for you.

Table of Content

Installation

To install package, use: npm install pg-real.

Usage

Initialization

To establish a connection to the DB use SubClient class. It inherits Client class from node-postgres module with an additional functionality ti set Functions, Triggers, and Listeners.

Pass connection configuration to the class during instantiation to set desired settings. Review node-postgres Client documentation to get more information how to setup Postgres Client.

Example:

const { user, password, host, port, database } = config;
const connectionString = `postgres://${user}:${password}@${host}:${port}/${database}`;
const client = new SubClient(connectionString);
client.connect();

Set Functions and Triggers

To create Function, use one of the predefined functions from the list of functions or create your own.

Note: If you create your owm function, use a name of a notification channel to subscribe on the Postgres updates.

Example:

const customFunction = Functions.custom('test_channel_name');
const afterAllFunction = Functions.afterAll('public', 'test');

const own_channel_name = 'own_channel_name';
const ownFunction = `
   CREATE OR REPLACE FUNCTION ${own_channel_name}_notifier() RETURNS TRIGGER AS $$
      BEGIN
         IF TG_OP = 'UPDATE' THEN
            PERFORM pg_notify(CAST('${own_channel_name}' AS text), row_to_json(NEW)::text);
         END IF;
         RETURN NEW;
      END;
   $$ LANGUAGE "plpgsql";`;

To create Trigger, use one of the predefined triggers from the list of triggers or create your own.

Note: If you create your owm trigger, use a name of a corresponding function to set it on event.

Note: For custom trigger triggerName should be equal to the channel name.

Example:

const customTrigger = Triggers.custom({schema: 'public', table: 'test', triggerName: 'own_channel_name' }, {before: false, update: true, when: "OLD.id = '12345'"});
const afterAllTrigger = Triggers.afterAll({schema: 'public', table: 'test'}, { when: "OLD.id = '12345'" });

const own_trigger_name = 'own_channel_name';
const ownFunction = `
   DROP TRIGGER IF EXISTS ${own_trigger_name} ON public.test;
   CREATE TRIGGER ${own_trigger_name} AFTER INSERT OR UPDATE OR DELETE ON public.test FOR EACH ROW WHEN OLD.id = '12345' EXECUTE PROCEDURE ${own_channel_name}_notifier();`;   

After creation set function to the client using setFunction and setTrigger methods of the SubClient instance.

Example:

await client.setFunctions(customFunction.function);
await client.setTriggers(customTrigger.trigger);

Connection

You can initialize any kind of a connection using pg-real module: HTTP(S), SSE, and WebSockets. Each connection class takes response context as an input parameters during instantiation.

It's not required to use pg-real connectors. You can set all the headers and statuses by yourself, just ensure to implement a correct subscription callback.

Example:

const connection = new SSEConnector(ctx.response).send('start');
connection.send(channel, payload);

Subscription

To subscribe on the Postgres events use Subscriber class. Pass SubClient or node-postgres Client during instantiation of the class.

Example:

const subscriber = new Subscriber(client);

To kick listening on a trigger use startListen method. Postgres starts consume events for a corresponding trigger and call notifier.

Example:

subscriber.startListen(customTrigger.name);

After that you can subscribe on the events from notifier.

Note: We are highly recommend you to store subscription id for the farther ability to unsubscribe.

Example:

  const afterAllSubscribtionId = await subscriber.subscribe(afterAllFunction.channel, connection.send.bind(connection));
  const customSubscribtionId = await subscriber.subscribe(customFunction.channel, (channel, payload) => {
      /**
      * Do what you need here
      * For instance, connection.send(channel, payload)
      * */
  });

Client side

From client side, make request to API. Specify any information which could help to implement a correct subscription.

Example:

const source = new EventSource(`http://${host}:${port}/${endpoint}?id=12345`);

source.addEventListener('message', function (event) {
    console.log(JSON.parse(event.data));
}, false);

source.addEventListener('own_channel_name', function (event) {
    console.log(JSON.parse(event.data));
}, false);

Unsubscription

Try always unsubscribe from the triggers: on the end of connection, after client unsubscribe, etc.

Example:

ctx.request.on('close', () => {
    subscriber.unsubscribe(customFunction.channel, customSubscribtionId);
});

router.post('/unsubscribe', (ctx) => {
    const { channel, id } = ctx.request.body;
    subscriber.unsubscribe(channel, id);
});

Subscriber

To manage subscriptions use Subscriber class. This class takes SubClient instance as an input parameter and use it to connect to the PostgreSQL.

const subscriber = new Subscriber(client);

SubClient includes several useful methods:

  • subscribe: Function | Stream | Connector.send - subscribes to the events, links channel to a corresponding receiver. The corresponding receiver will receive a message based on the channel name. If a list of the channels don't specified, the receiver will be linked to the all of them. Receiver could be:
    • function;
    • stream;
    • send method of the connector.
    const fn = (channel, payload) => { console.log(channel, payload); };
    subscriber.subscribe("channel_1", fn);
    subscriber.subscribe("channel_2", new PassThrough());
    subscriber.subscribe(["channel_1", "channel_2"], connector.send.bind(connector));
  • startListen: string | string[] - starts listening on the event:
    subscriber.startListen("channel_1");
    subscriber.startListen(["channel_1", "channel_2"]);
  • stopListen - stops listening on the event (if no channels are specified, all listeners are unsubscribed):
    subscriber.stopListen("channel_1");
    subscriber.stoptListen(["channel_1", "channel_2"]);
    subscriber.stoptListen();

SubClient

A PostgreSQL client is a main thing we need for notifier. We recommend to use our SubClient from this lib but you also can use your own client.

const connectionString = `postgres://${user}:${password}@${host}:${port}}/${database}`;
const client = new SubClient(connectionString);
this.client.connect().then(console.log(`Connected to ${connectionString}`));

SubClient includes several the next methods:

  • setFunctions:string | string[] - takes query to create a function (go to Functions section to get more info);
    const { function } = Functions.afterAll('public', 'test');
    client.setFunctions(function);
  • dropFunctions:string | string[] - takes function name and drop it;
    const { name } = Functions.afterAll('public', 'test');
    client.dropFunctions(name);
  • setTrigger:string | string[] - takes query to create a trigger (go to Triggers section to get more info);
    const { trigger } = Triggers.afterAll({
      schema: 'public', 
      table: 'test' 
    });
    client.setTrigger(trigger);
  • removeTriggers:string | string[] - takes trigger name and drop it, Require schema and table name;
    const { name } = Triggers.afterAll({
      schema: 'public', 
      table: 'test' 
    });
    client.removeTriggers(name, 'public', 'test');
  • setListeners:string | string[] - take channel name and activate listening;
    const { channel } = Functions.afterAll('public', 'test');
    client.setListeners(channel);
  • removeListeners:string | string[] - take channel name and stop listening;
    const { channel } = Functions.afterAll('public', 'test');
    client.removeListeners(channel);

###Functions

As per Postgres documentation:

PostgreSQL provides these helper functions to retrieve information from event triggers.

For more information reference an official documentation.

The functions handle events from Postgres based on the preconfitions

Triggers

As per Postgres documentation:

PL/pgSQL can be used to define trigger functions on data changes or database events. A trigger function is created with the CREATE FUNCTION command, declaring it as a function with no arguments and a return type of trigger (for data change triggers) or event_trigger (for database event triggers).

For more information reference an official documentation.

pg-real uses triggers to subscribe to the database changes. User can both specify own functions or use predefined general methods.

For predefined triggers database table name is required. You also can set schema path, 'public' schema is used by default. Columns names are optional and could be used for more accuracy. Trigger generators also take options parameters.

The list of trigger generators:

  • common: path: ITriggerPath, options: ITriggerOptions - creates trigger based on the options;
  • afterAll: path: ITriggerPath, options: ITriggerOptions - emits after any change on the table or column if defined;
  • beforeAll: path: ITriggerPath, options: ITriggerOptions - emits before any change on the table or column if defined;
  • afterInsert: path: ITriggerPath, options: ITriggerOptions - emits after insert to the table;
  • beforeInsert: path: ITriggerPath], options: ITriggerOptions - emits before insert to the table;
  • afterUpdate: path: ITriggerPath, options: ITriggerOptions - emits after update on the table or column if defined;
  • beforeUpdate: path: ITriggerPath, options: ITriggerOptions - emits before any change on the table or column if defined;
  • afterDelete: path: ITriggerPath, options: ITriggerOptions - emits after delete on the table or column if defined;
  • beforeDelete: path: ITriggerPath, options: ITriggerOptions - emits before delete on the table or column if defined;

where path includes:

  • schema: string - schema name, default: public;
  • table: string - table name, required;
  • columns: string[] - list of columns, used for the update events;
  • triggerName: string - custom trigger name, used only for custom method;

and options could be:

  • unique: boolean - creates unique trigger name;
  • when: string - adds where close for the update events;
  • insert, update, delete: boolean - used only in the common trigger, indicates a type of the event;
  • before: boolean - if true, creates before trigger; if false, creates after trigger.

Why uniques is important?

When we want to subscribe a client to a specific event based on the client specific data (id, filter, etc.) and we want to be triggered only for him, but we've already have a trigger on the same table we need to specify a unique trigger to not override an existent one.

Connectors

Notifications from Postgres could be sent with HTTP/HTTPS, SSE or WebSockets. You can create your own connection or use one of the supplied classes.

HTTP/HTTPS

Use HttpConnector to create new HTTP connection. Response object should be supplied as an input parameter.

Values:

  • res - response object;

Methods:

  • send: <payload: string, channel: string> - sends message to the user. Channel is an optional.

Example:

1) new HttpConnector(ctx.response).send('start');
2) new HttpConnector(ctx.response).send('start', 'after_insert_users');

SSE

Use SSEConnector to create new SSE connection. Response object should be supplied as an input parameter.

Note: Supports Express and Koa.

Values:

  • res - response object;

Methods:

  • send: <payload: string, channel: string> - sends message to the stream. Channel is an optional.

Example:

1) new SSEConnector(ctx.response).send('start');
2) new SSEConnector(ctx.response).send('start', 'after_insert_users');

WebSockets

Use SocketConnector to create new WebSocket connection. Socket object should be supplied as an input parameter.

Note: Supports Express and Koa.

Values:

  • socket - socket object;

Methods:

  • send: <payload: string, channel: string> - sends message to the socket. Channel is an optional.

Example:

1) new SocketConnector(socket).send('start');
2) new SocketConnector(socket).send('start', 'after_insert_users');