npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

mongres

v0.2.6

Published

Mongres ETL System

Downloads

7

Readme

Mongres ETL System for Node.js

Synchronize PostgreSQL and MongoDB using Node.js

Mongres uses configurable modules containing database and operation definitions to perform arbitrary ETL (extract, transform, load) procedures. This project is still in development and so core fundamentals could be changed without notice. Contributions, comments, and issue reports are always welcome.

Installation

npm install mongres

Command Line Usage

Usage: node mongres.js [options] <file1 [file2 ...]>

  -h, --help           Display usage help
  -v, --version        Display version number
  -d, --debug          Enable debug mode
  -q, --quiet          Disable verbose output
  -p, --period         Specify periodic execution ( in sec )
  -f, --file           Specify module file to load ( default )

Database Definitions

Databases can be defined and used almost interchangably with very minor differences. The database abstraction layer provides the following methods:

  • queryArray (query, [options], callback)
    • Execute query using options if given and pass the result set as an array to callback function.
    • query format for MongoDB is {collectionName: {field: value}}
    • query format for PostgreSQL is {text: 'select foo from bar where foo between $1 and $2', values: [1,10]}
    • callback signature is function (error, array)
    • Caution: all records will be loaded into memory. Do not use for very large data sets.
  • queryStream (query, [options], callback)
    • Execute query using options if given and pass a readable result stream to the callback function.
    • query format for MongoDB is {collectionName: {field: value}}
    • query format for PostgreSQL is {text: 'select foo from bar where foo between $1 and $2', values: [1,10]}
    • callback signature is function (error, stream)
  • insert (insert, [options], callback)
    • Inserts insert into database using options if given.
    • insert format is {collectionOrTableName: {field: value}}
    • callback signature is function (error, result) where result is the number of affected records.
  • update (query, update, [options], callback)
    • Applies update to records matching query using options if given.
    • query format is {collectionOrTableName: {field: value}}
    • update format is {collectionOrTableName: {field: value}}
    • callback signature is function (error, result) where result is the number of affected records.
  • upsert (query, update, [options], callback)
    • Applies update to records matching query using options if given.
    • A new record will be inserted if no matching records are found.
    • query format is {collectionOrTableName: {field: value}}
    • update format is {collectionOrTableName: {field: value}}
    • callback signature is function (error, result) where result is the number of affected records.

Operation Definitions

The db parameter will be a Database instance configured for the specified database. The registry parameter is a storage object that is shared among all functions for the entire operation. The operation functions are executed in this order:

  • init (db, registry, callback) (optional)
    • Executed once and must call callback method to proceed with operation.
    • Useful for getting delta starting points or populating initial registry values.
  • extract (db, registry, process, done)
    • Executed once, used to extract data from source database.
    • process method should be called once for each record
    • done method should be called after all records have been processed
  • transform (db, registry, data) (optional)
    • Executed once for each record emitted by extract functions.
    • Used to reshape data before insertion into target database.
    • Called synchronously, so just return the transformed data.
  • load (db, registry, data, callback)
    • Executed once for each record emitted by extract and transform functions.
    • Can be used to populate registry with aggregated or incremental data. interval (db, registry, callback) (optional)
    • Executed at regular intervals after load function
    • Useful for recording incremental progress in case of failures.
  • exit (db, registry, callback) (optional)
    • Executed once after all other functions have finished.
    • Useful for cleaning up after operation, and for recording summary data.

Modules

A Mongres module must export a JSON object like this:

module.exports = {

  db: {

    mongo: {
      type: 'mongodb',
      name: 'test',
      host: 'localhost',
      port: 27017,
      user: 'username',
      pass: 'password'
    },

    postgres: {
      type: 'postgresql',
      name: 'tnt2',
      host: 'localhost',
      port: 5432,
      user: 'username',
      pass: 'password'
    }

  },

  op: {

    name: 'Sample Operation',

    init: { // init is optional
      // read data from "mongo" database into registry
      mongo: [
        function (db, registry, cb) {
          // truncate the collection
          db.remove(
            { // query
              test: {} // matches all documents
            },
            cb // callback
          );
        },

        function (db, registry, cb) {
          db.queryArray(
            { // query
              mongres: {
                _id: "test"
              }
            },
            { // options
              limit: 1,
              fields: {
                _id: 0,
                lastDate: 1
              }
            },
            function (err, docs) { // callback
              if (err) return cb(err);

              var doc = docs && docs.shift() || {};
              registry.lastDate = doc.lastDate || new Date(0);

              return cb(); // continue to "extract" step
            }
          );
        }
      ]
    },

    extract: { // extract is required
      // stream data from "postgres" database to load function(s)
      postgres: function (db, registry, load, cb) {
        db.queryStream(
          { // query
            text: "                                                            \
              SELECT                                                           \
                GENERATE_SERIES($1::int, $2::int) AS series, $3::timestamp +   \
                (RANDOM()::NUMERIC(3,2) || ' days')::INTERVAL AS date,         \
                ARRAY['a','b','c'] AS arr,'{\"a\":{\"b\":\"c\"}}'::json AS obj \
              ORDER BY date                                                    \
            ",
            values: [1, 1000, registry.lastDate]
          },
          // options parameter is not required
          function (err, stream) { // callback
            if (err) return cb(err);

            var error = null, procs = 0, limit = 10;

            // call "load" method for each record
            stream.on('data', function (data) {
              procs++; // increment process counter

              // pause stream if procs over limit
              if (procs >= limit) stream.pause();

              load(data, function (err) {
                procs--; // decrement process counter

                if (err) { // pass the error
                  stream.emit('error', err);
                  return stream.emit('end');
                }

                // resume stream when procs are under limit
                if (!err && procs < limit) stream.resume();
              });
            });

            // record errors for 'end' event
            stream.on('error', function (err) {
              error = err;
            });

            // execute callback function when ended
            return stream.on('end', cb.bind(this, error));
          }
        );
      }
    },

    transform: { // transform is optional
      // transform data from "postgres" database into a different format
      postgres: function (db, registry, data) {
        return { // transformed data
          _id: data.series,
          date: data.date,
          arr: data.arr,
          obj: data.obj
        };
      }
    },

    load: { // load is required
      // load data from all sources into "mongo" database
      mongo: function (db, registry, data, cb) {
        if (!data) return cb('No data was given.');

        db.upsert(
          { // query
            test: {
              _id: data._id
            }
          },
          { // update
            test: data
          },
          { // options
            w: 1,
            journal: true
          },
          function (err, result) { // callback
            if (err) return cb(err);

            // determine most recent changed date
            if (!registry.lastDate || registry.lastDate < data.date) {
              registry.lastDate = data.date;
            }

            // proceed to next load function(s) or exit
            return cb(null, result);
          }
        );
      }
    },

    interval: { // intervals are optional
      100: { // execute every 100 records
        // write progress data to "mongo" database
        mongo: function (db, registry, cb) {
          db.upsert(
            { // query
              mongres: {
                _id: 'test'
              }
            },
            { // update
              mongres: {
                $set: {
                  lastDate: registry.lastDate
                }
              }
            },
            { // options
              w: 1,
              journal: true
            },
            cb // callback
          );
        }
      }
    },

    exit: { // exit is optional
      // write result data to "mongo" database
      mongo: function (db, registry, cb) {
        db.upsert(
          { // query
            mongres: {
              _id: 'test'
            }
          },
          { // update
            mongres: {
              $set: {
                lastDate: registry.lastDate
              }
            }
          },
          { // options
            w: 1,
            journal: true
          },
          cb // callback
        );
      }
    }

  }

};

Contributing

Contributions, comments, and issue reports are always welcome. Send pull requests to the master branch with your proposed changes or create issues to report bugs.

Testing

Copy tests\db.sample.js to tests\db.js and modify the credentials to match your development environment. You will need one MongoDB and one PostgreSQL database set up and running to complete the tests. To start tests, run npm test in your Mongres directory.