npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pg-iterator

v0.3.0

Published

Typescript wrapper for pg-query-stream

Downloads

3,153

Readme

pg-iterator

TypeScript wrapper for pg-query-stream, which adds the following:

  • Produces a safe AsyncIterable, for row-by-row processing, with for await or a library (like RxJs).
  • The library is strongly-typed throughout: you can optionally specify row entity types for your queries.
  • Auto-connects Pool on the first-row iteration (disconnects on last) - internal connection management.
  • Unifies error handling for queries and connections - initial, interrupted or lost / broken.
  • Offers one protocol for working with Client or Pool objects.
  • Provides workarounds for some issues in pg-query-stream - missing fields, issue #2870, etc.

Installation

$ npm i pg-iterator

Usage

You have the flexibility of using this module with Pool or Client, or a dynamically-determined type, via createQueryIterable function.

Each of the interfaces - QueryIterablePool, QueryIterableClient or createQueryIterable supports strong-type parametrization, for typed row iteration.

See complete examples.

Using Pool

When using an existing Pool object, this library will automatically acquire the connection, create AsyncIterable from a query and release the connection, once the stream has finished.

Class QueryIterablePool implements such functionality:

import {Pool} from 'pg';
import {QueryIterablePool} from 'pg-iterator';

const pool = new Pool(/* connection config */);

const q = new QueryIterablePool(pool); // creating our Pool container

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

for await(const u of i) {
    console.log(u); // output each row
}

Using Client

This library can use a connected Client object directly, via QueryIterableClient class:

import {Pool, Client} from 'pg';
import {QueryIterableClient} from 'pg-iterator';

const pool = new Pool(/* connection config */);
const client: Client = await pool.connect();

const q = new QueryIterableClient(client); // creating our Client container

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

for await(const u of i) {
    console.log(u); // output each row
}

// the onus is on you when to release the client and the pool when done:
// client.release(), pool.end()

Using dynamic driver

When you do not know whether the source is a Pool or Client, you can use function createQueryIterable instead, which will check the type at run-time, and return either QueryIterablePool or QueryIterableClient, which share generic QueryIterable protocol.

Fields information

In every usage scenario, you end up with QueryIterable base interface, which exposes information about columns.

  • You can either access it after reading the very first row:
const q = new QueryIterablePool(pool);

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

// q.fields is empty at this point

for await(const u of i) {
    const {fields} = q; // fields details are available at this point

    console.log(u); // output each row
}
  • Or you can use notification event fields instead:
const q = new QueryIterablePool(pool);

q.on('fields', fields => {
    // sent with complete list of fields here,
    // before the first row in the loop below
});

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

for await(const u of i) {
    console.log(u); // output each row
}

Events

Base interface QueryIterable can emit the following events:

  • fields - fields details, as explained above;
  • stream - notification of a new stream created;
  • complete - notification of completing the current query.

Error handling

This library manages connection and runs queries inside the same row iteration, the only thing that can throw errors:

const q = new QueryIterablePool(pool);

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

try {
    for await(const u of i) {
        console.log(u); // output each row
    }
} catch (err) {
    // all connection and query errors arrive here
}

Integration

Most libraries that are based on node-postgres expose Pool and Client interfaces.

For example, pg-promise exposes Pool via Database.$pool, so you can do:

const q = new QueryIterablePool(db.$pool); // creating Pool container from Database object

And in terms of data consumption, since the data here is AsyncIterable, there are many libraries that can consume and process it.

import {from, take} from 'rxjs';

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

from(i).pipe(take(10)).subscribe(row => {
    console.log(row); // up to 10 rows
});
import {pipe, take} from 'iter-ops';

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

const r = pipe(i, take(10));

for await (const a of r) {
    console.log(a); // up to 10 rows
}

Note that if iteration is incomplete because you interrupted the iteration loop, or used some limiting operators (like take above), the connection will remain open indefinitely. In such cases you may want to force-release the connection, by calling method release of QueryIterable manually:

import {from, take} from 'rxjs';

const q = new QueryIterablePool(pool);

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

from(i).pipe(take(10)).subscribe({
    next(row) {
        console.log(row);
    },
    complete() {
        // since we use "take(10)" above, the iteration may be incomplete,
        // and the connection will be stuck, so we have to force-release it: 
        q.release();
    }
});

Alternatively, you can wrap QueryIterable + query into a safe Observable creator:

function fromQuery<T>(qi: QueryIterable<T>, text: string, params?: any[]): Observable<T> {
    return from(qi.query(text, params)).pipe(finalize(() => {
        qi.release();
    }));
}

See also: complete examples.