npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

datapumps

v0.5.1

Published

Node.js ETL (Extract, Transform, Load) toolkit for easy data import, export or transfer between systems.

Downloads

199

Readme

Datapumps: Simple ETL for node.js

Travis CI Badge

Overview

Use pumps to import, export, transform or transfer data. A data pump will read from its input stream, array or datapumps Buffer and will write to its output buffers. A pump will finish when all data is consumed from its output buffers. Make a group of pumps to handle complex ETL tasks.

Installation

$ npm install datapumps --save

Usage example: export mongodb to excel

var
  datapumps = require('datapumps'),
  Pump = datapumps.Pump,
  MongodbMixin = datapumps.mixin.MongodbMixin,
  ExcelWriterMixin = datapumps.mixin.ExcelWriterMixin,
  pump = new Pump();

pump
  .mixin(MongodbMixin('mongodb://localhost/marketing'))
  .useCollection('Contact')
  .from(pump.find({ country: "US" }))

  .mixin(ExcelWriterMixin())
  .createWorkbook('/tmp/ContactsInUs.xlsx')
  .createWorksheet('Contacts')
  .writeHeaders(['Name', 'Email'])

  .process(function(contact) {
    return pump.writeRow([ contact.name, contact.email ]);
  })
  .logErrorsToConsole()
  .run()
    .then(function() {
      console.log("Done writing contacts to file");
    });

Usage example with more details:

  • First, we create a pump and setup reading from mongodb

    var pump = new Pump();
    pump
      .mixin(MongodbMixin('mongodb://localhost/marketing'))
      .useCollection('Contact')
      .from(pump.find({ country: "US" }))

    Mixins extend the functionality of a pump. The MongodbMixin adds .find() method which executes a query on the collection specified with .useCollection() method. The pump will read the query results and controls data flow, i.e. it pauses read when it cannot write excel rows.

  • Write data to excel with ExcelWriterMixin:

    pump
      .mixin(ExcelWriterMixin())
      .createWorkbook('/tmp/ContactsInUs.xlsx')
      .createWorksheet('Contacts')
      .writeHeaders(['Name', 'Email'])
    
      .process(function(contact) {
        return pump.writeRow([ contact.name, contact.email ]);
      })

    The excel workbook, worksheet and header rows are created after adding ExcelWriterMixin to the pump. Each pump has a .process() callback that may transform or filter data. The callback is called for every data item of the buffer and should return a promise (we use bluebird library) that fulfills when the data is processed. In this example, the default processing callback (which copies data to the output buffer by default) is overridden with writing rows to the excel worksheet.

  • Finally, start the pump and write to console when it's done.

    pump
      .logErrorsToConsole()
      .run()
        .then(function() {
          console.log("Done writing contacts to file");
        });

    The .logErrorsToConsole() will log any error to the console, surprisingly. The pump will start on calling .run(). It returns a promise that resolves when the pump finished.

Pump

A pump reads data from its input buffer or stream and copies it to the output buffer by default:

datapumps = require('datapumps');
(pump = new datapumps.Pump())
  .from(<put a nodejs stream or datapumps buffer here>)
  .run()

To access the output buffer, use the .buffer() method, which returns a Buffer instance:

buffer = pump.buffer('output');
buffer = pump.buffer(); // equivalent with previous as the default buffer
                        // of the pump is called 'output'

Use the .buffers() method when you need to write data into multiple output buffers:

ticketsPump
  .buffers({
    openTickets: ticketsPump.createBuffer(),
    closedTickets: ticketsPump.createBuffer(),
  });

reminderMailer = new datapumps.Pump()
reminderMailer
  .from(ticketPump.buffer('openTickets'))
  ...

Note that the ticketsPump pump has two output buffers: openTickets and closedTickets. The reminderMailer pump reads data from the openTickets buffer of the tickets pump.

Transforming data

Use the .process() method to set the function which processes data:

ticketsPump
  .process(function(ticket) {
    ticket.title = 'URGENT: ' + ticket.title;
    return this.buffer('openTickets').writeAsync(ticket);
  });

The argument of .process() is a function that will be executed after the pump reads a data item. The function is executed in the context of the pump object, i.e. this refers to the pump itself. The function should return a Promise that fulfills when the data is processed (i.e. written into a buffer or stored elsewhere).

Start and end of pumping

A pump is started by calling the .start() method. The end event will be emitted when the input stream or buffer ended and all output buffers became empty.

pump.on('end', function() {
  console.log('Pumped everything, and all my output buffers are empty. Bye.')
})

Pump group

You often need multiple pumps to complete an ETL task. Pump groups help starting multiple pump in one step, and also enables handling the event when every pump ended:

sendMails = datapumps.group();
sendMails.addPump('tickets')
  ...;
sendMails.addPump('reminderMailer')
  ...;
sendMails
  .start()
  .whenFinished().then(function() {
    console.log('Tickets processed.');
  });

The .addPump() method creates a new pump with given name and returns it for configuration. .start() will start all pumps in the group, while .whenFinished() returns a Promise the fulfills when every pump ended (Note: end event is also emitted).

Encapsulation

Sometimes you wish to encapsulate a part of an ETL process and also use it elsewhere. It is possible to set an input pump and expose buffers from the group, so it will provide the same interface as a simple pump (i.e. it has .from(), .start(), .buffer() methods and emits end event).

Most likely, you want to extend datapumps.Group class (example is written in CoffeeScript):

{ Group, mixin: { MysqlMixin } } = require 'datapumps'

class Notifier extends Group
  constructor: ->
    super()
    @addPump 'emailLookup'
      .mixin MysqlMixin connection
      .process (data) ->
        @query('SELECT email FROM user where username = ?', [ data.username ])
          .then (result) =>
            data.emailAddress = result.email
            @buffer().writeAsync data
    @addPump 'sendMail'
      .from @pump 'emailLookup'
      .process (data) ->
        ... # send email to data.emailAddress
        @buffer().writeAsync
          recipient:
            name: data.name
            email: data.emailAddress

    @setInputPump 'emailLookup'
    @expose 'output', 'sendMail/output'

The Notifier will behave like pump, but in the inside, it does an email address lookup using mysql, and sends mail to those addresses. The output buffer of sendMail pump is filled with recipient data.

Use the created class like this:

etlProcess = datapumps.group()
etlProcess
  .addPump 'notifier', new Notifier
    .from <node stream or datapumps buffer>

etlProcess
  .addPump 'logger'
    .from etlProcess.pump('notifier').buffer()
    .process (data) ->
      console.log "Email sent to #{data.name} (#{data.email})"

Please note that you cannot use .process method on a group.

Error handling

Errors may occur while data is transferred between systems. Most of the time, you don't want to stop on the first error but complete the transfer and re-run after fixing problems. Therefore the pump group has an error buffer (.errorBuffer()) which can hold ten error messages by default. When the error buffer fills up, error event is triggered and .whenFinised() promise is rejected:

group
  .start()
  .whenFinished()
    .then(function() {
      if (!group.errorBuffer().isEmpty()) {
        console.log("Transfer finished, but with errors.");
        // errors list will be at group.errorBuffer().getContent()
      }
    })
    .catch(function() {
      console.log("Pump group failed with errors");
      // errors list will be at group.errorBuffer().getContent()
    });

You can use the .logErrorsToConsole() helper method will configure the pump or group to print errors when processing finished:

group
  .logErrorsToConsole()
  .start();

You can use the .logErrorsToLogger() helper method will configure the pump or group to print errors to a logger when processing finished:

group
  .logErrorsToLogger(logger)
  .start();

This is useful for running the ETL on a server. The logger can be any logging method that contains an .error() method such as Winston, Log4js, etc.

Debugging

The following example shows a fingers-crossed type logging, i.e. debug logging is turned on after the first error occured:

{ group } = require('datapumps')

(d = group())
  .addPump 'test'
    .from d.createBuffer
      sealed: true,
      content: [ 'first', 'second', 'third', 'fourth' ]
    .process (data) ->
      throw new Error 'Start debugging', data if data == 'second'
      @copy data

d.errorBuffer().on 'write', (data) ->
  console.log data
  d.buffer('test/output').on 'write', (data) ->
    console.log "#{data} was written to test/output buffer"

d.start()

The output:

{ message: [Error: Start debugging], pump: 'test' }
third was written to test/output buffer
fourth was written to test/output buffer

Mixins

The core components of datapumps is only responsible for passing data in a flow-controlled manner. The features required for import, export or transfer is provided by mixins:

When you implement new mixins, please fork datapumps and make a pull request.