pg-events
v1.0.2
Published
Implements publish/subscribe or event emitting mechanism with PostgreSQL Notifications
Downloads
4
Maintainers
Readme
TypeSafe PubSub with PostgreSQL
Install
npm i --save pg-events
Usage
import { Client } from 'pg';
import { PubSub, ClientProvider, Contract, encoder, decoder } from 'pg-events';
// 1. Define channels
enum Channel {
myChannel = 'myChannel',
}
// 2. Define protocol
interface Protocol {
[Channel.myChannel]: { message: string };
}
// 3. Define conection options
const options = {
user: 'postgres',
password: 'postgres',
database: 'postgres',
};
// 4. Initialize connection provider
const provider = new ClientProvider();
// 5. Initialize pubsub instance
const pubSub = new PubSub<Contract<Protocol>>({
decoder, // default object->JSON decoder
encoder, // default JSON->object encoder
provider, // connection emitter
});
// 6. Usage
(async () => {
// 7. Initialize PG connection
const client = new Client(options);
// 8. Connect to PostgreSQL server
await client.connect();
// 9. Emit connection to PubSub
provider.next(client);
// 10. Add event listener
pubSub.on(Channel.myChannel, async ({ message }) => {
console.warn('Received ' + message);
});
// 11. Subscribe to channel
await pubSub.subscribe(Channel.myChannel);
// 12. Emit message
await pubSub.publish(Channel.myChannel, { message: 'Hello!' });
// 13. Close PubSub
await pubSub.end();
await client.end();
})();
Strict mode
Implement AsyncEncoder to validate payload before broadcasting.
Implement AsyncDecoder to ensure valid payloads are received
import { Client } from 'pg';
import { PubSub, ClientProvider, Contract, encoder, decoder } from 'pg-events';
// 1. Define channels
enum Channel {
myChannel = 'myChannel',
}
// 2. Define protocol
interface Protocol {
[Channel.myChannel]: { value: number };
}
// 3. Define conection options
const options = {
user: 'postgres',
password: 'postgres',
database: 'postgres',
};
// 4. Initialize connection provider
const provider = new ClientProvider();
// 4.1 Define validators
function isMyChannelMessage(obj: any): obj is Protocol['myChannel'] {
return obj && typeof obj === 'object' && typeof obj.value === 'number' && obj.value >= 0;
}
function checkMyChannelPayload(payload: any) {
if (!isMyChannelMessage(payload)) {
throw new Error('myChannel.value must be a positive number');
}
}
// 5. Initialize strict pubsub instance
const strictPubSub = new PubSub<Contract<Protocol>>({
// Custom async encoder which validates payload before publishing
encoder: {
async encode(payload: object): Promise<string> {
checkMyChannelPayload(payload);
return encoder.encode(payload);
},
},
// Custom async decoder which validates payload before emitting
decoder: {
async decode(data: string): Promise<object> {
const payload = await decoder.decode(data);
checkMyChannelPayload(payload);
return payload;
},
},
provider, // connection emitter
});
// 6. Usage
(async () => {
// 7. Initialize PG connection
const client = new Client(options);
// 8. Connect to PostgreSQL server
await client.connect();
// 9. Emit connection to PubSub
provider.next(client);
// 10. Add event listener
strictPubSub.on(Channel.myChannel, async ({ value }) => {
console.warn('Received ' + value);
});
strictPubSub.on('unprocessed', async (error) => {
console.warn(error);
});
// 11. Subscribe to channel
await strictPubSub.subscribe(Channel.myChannel);
// 12. Emit message
console.warn('First call sends 5');
await strictPubSub.publish(Channel.myChannel, { value: 5 });
// 13. Emit message with invalid payload
console.warn('Second call sends -1');
try {
await strictPubSub.publish(Channel.myChannel, { value: -1 });
} catch (error) {
console.warn('It fails because -1 is not a negative number');
}
// 14. Close PubSub
await strictPubSub.end();
await client.end();
})();