etl-gun
v2.2.23
Published
ETL toolkit which supports RxJS streams, error handling, business rules and many more
Downloads
62
Readme
ETL-Gun is a platform that employs promises, generators, IxJs iterables, RxJs observables and nodejs streams, allowing developers to build stream-based ETL (Extract, Transform, Load) pipelines complete with buffering, bulk operations, error handling and many useful features.
Table of Contents
- Why / when would I need this?
- Installation
- Usage
- Concept
- Features
- GUI
- Examples (how to)
- API Reference
- Core classes
- BaseCollection - base class for all collections
- Errors - queues to store and process errors which occurred in other endpoints
- Endpoints
- Auxiliary
- Filesystems
- File formates
- Databases
- Knex
- CockroachDB
- MariaDb
- MS SQL Server
- MySQL - mysql and mysql2 drivers both are supported
- Oracle DB
- PostgreSQL
- Amazone Redshift
- SQLite
- Messangers
- GeneralEmail - any smtp + imap email server can be used
- Gmail
- sms.ru
- Telegram
- CMS
- Task tracking systems
- Operators
- run
- log
- expect - as expect() in unit test engines, used for data validation
- where - similar to rxjs filter() operator, but more useful to data processing
- collect - analog of rxjs buffer operators, but with improvements for data processing
- push
- rools - integration with business rules engine
- move
- copy
- numerate
- addField
- addColumn
- join
- mapAsync - as rxjs map() operator, but work with async callback handler
- Misc
- Core classes
Why / when would I need this?
ETL-Gun is a simple ETL glue represented as an extention to the RxJs library. Typically, you'd use ETL-Gun to help with ETL processes. It can extract data from the one or more sources, transform it and load to one or more destinations in nedded order.
You can use javascript and typescript with it.
ETL-Gun will NOT help you with "big data" - it executes on the one computer and is not supports clustering from the box.
Here's some ways to use it:
- Read some data from database and export it to the .csv file and vice versa
- Create file converters
- Filter or sort content of some files
- Run some queries in database
- Scan remote ftp folder and download some files from it
You can find many examples of using ETL-Gun in the API Reference section of this file.
Installation
npm install etl-gun
or
yarn add etl-gun
Usage
Info: You can get the ready to use blank example project in the example-project folder of this repository.
Info: You can convert your etl project to the systemd service and setup schedule for it with the etl-gun-service tool.
Warning: Since the version 2.0.4 this library is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you will have to convert to ESM or use the dynamic import()
function.
Introductory example of library using: postgresql -> .csv
import { map } from "rxjs";
import { Csv, GuiManager, Header, Postgres, log, push, run } from "etl-gun";
// If you want to view GUI, uncomment the next line of code
// new GuiManager();
// Step 1: endpoint creation
const postgres = new Postgres.Endpoint("postgres://user:[email protected]:5432/database");
const source = postgres.getTable('users');
const csv = new Csv.Endpoint('./dest-folder');
const dest = csv.getFile('users.scv');
const header = new Header("id", "name", "login", "email");
// Step 2: transformation streams creation
const sourceToDest$ = source.selectRx().pipe(
log(),
map(v => header.objToArr(v)),
push(dest)
);
// Step 3: runing transformations (and wait until they finish, if necessary)
await run(sourceToDest$);
Concept
ETL-Gun contains several main concepts:
- Endpoints - sources and destinations of data, which holds connection to the one system instance and other parameters of this system, and groups methods to get collections related this system
- Collections - data object types exists in the endpoint system
- Piplines (or streams) - routs of data transformation and delivery, based on RxJs streams
Using of this library consists of 3 steps:
- Define your endpoints and collections for sources and destinations
- Define data transformation pipelines using pipe() method of input streams of your source endpoints
- Run transformation pipelines in order and wait for completion
ETL process:
- Extract: Data extraction from the source collection performs with selectRx() method, which returns the RxJs stream
- Transform: Use any RxJs and ETL-Gun operators inside pipe() method of the input stream to transform the input data. To complex data transformation you can use the Memory.Endpoint class, which can store data and which collections have forEach() and some other methods to manipulate with data in it
- Load: Loading of data to the destination endpoint performs with push() collection operator
Chaining:
Chaning of data transformation performs with pipe() method of the input data stream. Chaning of several streams performs by using await with run() procedure.
Features
- Simple way to use! Consists of only 3 steps:
- Create endpoints and get all collections which you need
- Create pipelines to process collection data (via select method of the source collections)
- Run piplines in order you want (with run operator)
- Support all main data processing paradigms:
- Simple async/await - with select() method which returns promise
- Generators - with selectGen() method which returns generator as AsyncGenerator generic class
- IxJs - with selectIx() which returns AsyncIterable generic class fully compatible with IxJs library
- RxJs - with selectRx() which returns Observable generic class fully compatible with RxJs library, it's observables, operators etc.
- Node ReadableStream - with selectStream() which returns ReadableStream generic class
- This library contains embedded debug console. It created as console application and works in any terminals. It supports step-by-step debuging with watching the processed values. If you want to use this GUI - you simply need to create the instance of GuiManager class before any endpoints and collections creation (see GUI)
- Library written in typescript, contains end systems types information and full support of types checking. But you can use it in javascript applications too
- Contains many kind of sources and destinations, for example many relational databases (Postgre, Mysql, ...), file formats (csv, json, xml), business applications (Magento, Trello, ZenDesk, ...), etc.
- Work with any types of input/output data, including arrays any hierarchical data structures (json, xml)
- With endpoint events mechanism you can handle different stream events, for example stream start/end, errors and other (see Endpoint)
- Supports validation and error handling mechanism:
- Data validation with expect operator
- Special endpoint type for errors, which base on queue
- Any collections contains property errors with endpoint which collect all errors, occurred while collection processing. This endpoint automatic creates when the collection creates, but you can change it's value to collect errors in place of you chois
- Console GUI display all error collections, statistic for it and it's errors
- Contains some ready to use helpers and integrations, for example you can translate some data to another language with GoogleTranslateHelper
- Contains business rules integration and allows to extract analisys and transformation logic from the etl program sources, and then change it in runtime without application changing and redeployment (see rools)
GUI
- Simple way to use, you need only create instance of GuiManager class before any endpoint creation (at the begin of the program)
- You can pause the ETL-process and resume it with 'space' on keyboard
- With 'enter' you can execute ETL process step-by-step in pause mode
- With 'esc' you can quit the program
- GUI display full list of created endpoints, collections, their statuses and last values recived from (or pushed to) them
- Logs are displayed in footer part of console window
- You can select the log window with 'tab' and scroll it with up/down arrows
Examples (how to)
Export rows from Postgres table to csv-file (postgresql -> .csv)
import { Postgres, Csv, Header, log, push, run } from "etl-gun";
import { map } from "rxjs";
const postgres = new Postgres.Endpoint("postgres://user:[email protected]:5432/database");
const source = postgres.getTable('users');
const csv = new Csv.Endpoint('./dest-folder');
const dest = csv.getFile('users.scv');
const header = new Header("id", "name", "login", "email");
const sourceToDest$ = source.selectRx().pipe(
log(),
map(v => header.objToArr(v)),
push(dest)
);
await run(sourceToDest$);
Sort rows in csv-file by the first column (.csv -> .csv)
import * as etl from "etl-gun";
const csvEndpoint = new etl.Csv.Endpoint();
const csv = csvEndpoint.getFile('users.scv');
const memory = new etl.Memory.Endpoint();
const buffer = memory.getBuffer('buffer 1');
const scvToBuffer$ = csv.selectRx().pipe(
etl.push(buffer)
);
const bufferToCsv$ = buffer.selectRx().pipe(
etl.push(csv)
);
await etl.run(scvToBuffer$);
buffer.sort((row1, row2) => row1[0] > row2[0]);
await csv.delete();
await etl.run(bufferToCsv$)
Create telegram bot with translation functionality
import * as etl from "etl-gun";
const telegram = new etl.Telegram.Endpoint();
const bot = telegram.startBot('bot 1', process.env.TELEGRAM_BOT_TOKEN!);
const translator = new etl.GoogleTranslateHelper(process.env.GOOGLE_CLOUD_API_KEY!, 'en', 'ru');
const startTelegramBot$ = bot.selectRx().pipe(
etl.log(), // log user messages to the console
translator.operator([], [message]), // translate 'message' field
etl.push(bot) // echo input message back to the user
);
etl.run(startTelegramBot$);
API Reference
Core
BaseCollection
Base class for all collections. Declares public interface of collection and implements event mechanism.
Methods:
// Read elements from the collection and return it
// where: condition of the element selection
// Read elements as promise
async select(where?: any): Promise<T[]>;
// Read elements from the collection and return generator to process it
async* selectGen(where?: any): AsyncGenerator<T>;
// Read elements from the collection and return IxJS iterable to process it
selectIx(where?: any): AsyncIterable<T>;
// Read elements from the collection and create RxJS observable to process it
selectRx(where?: any): Observable<T>;
// Read elements from the collection and create data stream to process it
selectStream(where?: any): ReadableStream<T>;
// Add value to the collection (usually to the end of stream)
// value: what will be added to the collection
async insert(value: any);
// Update collection elements
// where: condition of the element selection
// value: what will be added to the collection
async update(where: any, value: any);
// Clear data of the collection
// where: condition of the element selection
async delete(where?: any);
// Add listener of specified event
// event: which event we want to listen, see below
// listener: callback function to handle events
on(event: CollectionEvent, listener: (...data: any[]) => void);
// Readable/writable property wich contains errors collection instance for this collection
errors: Errors.ErrorsQueue;
Types:
export type CollectionEvent =
"select.start" | // fires at the start of stream
"select.end" | // at the end of stream
"select.error" | // on error
"select.skip" | // when the collection skip some data
"select.up" | // when the collection go to the parent element while the tree data processing
"select.down" | // when the collection go to the child element while the tree data processing
"recive" | // for every data value in the stream
"pipe.start" | // when processing of any collection element was started
"pipe.end" | // when processing of one collection element was ended
"insert" | // when data is inserted to the collection
"update" | // when data is updated in the collection
"delete"; // when data is deleted from the collection
Endpoints and it's collections
Errors
Store and process etl errors. Every collection by default has errors property wich contains collection of errors from collection etl process. You can cancel default errors collection creation for any collection, and specify your own manualy created error collection.
Endpoint
// Creates new errors collection
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getCollection(collectionName: string, options: CollectionOptions<EtlError> = {}): ErrorsQueue;
// Release errors collection object
// collectionName: identificator of the releasing collection object
releaseCollection(collectionName: string);
ErrorsQueue
Queue in memory to store etl errors and process thea. Should be created with getCollection method of Errors.Endpoint
Methods:
// Create the observable object and send errors data from the queue to it
// stopOnEmpty: is errors processing will be stopped when the queue is empty
selectRx(stopOnEmpty: boolean = false): BaseObservable<EtlError>;
// Pushes the error to the queue
// error: what will be added to the queue
async insert(error: EtlError);
// Clear queue
async delete();
Memory
Create and manipulate with collections of objects in memory.
Endpoint
// Creates new memory buffer. This is a generic method so you can specify type of data which will be stored in
// collectionName: identificator of the creating collection object
// values: initial data
// guiOptions: Some options how to display this endpoint
getBuffer<T>(collectionName: string, values: T[] = [], guiOptions: CollectionGuiOptions<T> = {}): BufferCollection;
// Release buffer data
// collectionName: identificator of the releasing collection object
releaseBuffer(collectionName: string);
getQueue<T>(collectionName: string, values: T[] = [], guiOptions: CollectionGuiOptions<T> = {}): QueueCollection;
releaseQueue(collectionName: string);
BufferCollection
Buffer to store values in memory and perform complex operations on it. Should be created with getBuffer method of MemoryEndpoint
Methods:
// Create the observable object and send data from the buffer to it
selectRx(): Observable<T>;
// Pushes the value to the buffer
// value: what will be added to the buffer
async insert(value: T);
// Clear endpoint data buffer
async delete();
// Sort buffer data
// compareFn: You can spacify the comparison function which returns number
// (for example () => v1 - v2, it is behaviour equals to Array.sort())
// or which returns boolean (for example () => v1 > v2)
sort(compareFn: (v1: T, v2: T) => number | boolean);
// This function is equals to Array.forEach
forEach(callbackfn: (value: T, index: number, array: T[]) => void);
Example:
import * as etl from "etl-gun";
const csvEndpoint = new etl.Csv.Endpoint();
const csv = csvEndpoint.getFile('users.scv');
const memory = new etl.Memory.Endpoint();
const buffer = memory.getBuffer('buffer 1');
const scvToBuffer$ = csv.selectRx().pipe(
etl.push(buffer);
)
const bufferToCsv$ = buffer.selectRx().pipe(
etl.push(csv)
)
await etl.run(scvToBuffer$);
buffer.sort((row1, row2) => row1[0] > row2[0]);
await csv.delete();
etl.run(bufferToCsv$)
QueueCollection
Queue to store values in memory and perform ordered processing of it. Should be created with getQueue method of MemoryEndpoint
Methods:
// Create the observable object wich send process queue elements one by one and remove processed element from queue
// dontStopOnEmpty - do we need stop queue processing (unsubscribe) when the queue will be empty
// interval - pause between elements processing, in milliseconds
selectRx(dontStopOnEmpty: boolean = false, interval: number = 0): Observable<T>;
// Pushes the value to the queue
// value: what will be added to the queue
async insert(value: T);
// Clear queue
async delete();
Local Filesystem
Search for files and folders with standart unix shell wildcards see glob documentation for details.
Endpoint
Methods:
// rootFolder: full or relative path to the folder of intetest
constructor(rootFolder: string);
// Creates new FilesystemCollection
// folderName: subfolder of the root folder and identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getFolder(folderName: string = '.', guiOptions: CollectionGuiOptions<PathDetails> = {}): Collection;
// Release FilesystemCollection
// folderName: identificator of the releasing collection object
releaseFolder(folderName: string);
Collection
Methods:
// Create the observable object and send files and folders information to it
// mask: search path mask in glob format (see glob documentation)
// for example:
// *.js - all js files in root folder
// **/*.png - all png files in root folder and subfolders
// options: Search options, see below
selectRx(mask: string = '*', options?: ReadOptions): BaseObservable<PathDetails>;
// Create folder or file
// pathDetails: Information about path, which returns from selectRx() method
// filePath: File or folder path
// isFolder: Is it file or folder
// data: What will be added to the file, if it is a file, ignore for folders
async insert(pathDetails: PathDetails, data?: string | NodeJS.ArrayBufferView | Iterable<string | NodeJS.ArrayBufferView> | AsyncIterable<string | NodeJS.ArrayBufferView> | internal.Stream);
async insert(filePath: string, data?: string | NodeJS.ArrayBufferView | Iterable<string | NodeJS.ArrayBufferView> | AsyncIterable<string | NodeJS.ArrayBufferView> | internal.Stream, isFolder?: boolean);
// Clear the root folder by mask
// mask: Which files and folders we need to delete
// options: Search options, see below
// IMPORTANT! Be careful with option includeRootDir because if it is true, and the objectsToSearch is not 'filesOnly',
// then the root folder will be deleted with all its content! Including folder itself.
async delete(mask: string = '*', options?: ReadOptions);
Types:
type ReadOptions = {
includeRootDir?: boolean; // Is root folder itself will be included to search results
// false by default
objectsToSearch?: // Which object types will be included to the search results
'filesOnly' | // Only files
'foldersOnly' | // Only folders
'all'; // Both files and folders
// all is default option
}
type PathDetails = {
isFolder: boolean
name: string;
relativePath: string; // Empty for root folder
fullPath: string;
parentFolderRelativePath: string; // '..' for root folder
parentFolderFullPath: string;
}
Example:
import * as etl from "etl-gun";
import * as rxjs from "rxjs";
const fs = new etl.Filesystem.Endpoint('~');
const scripts = ep.getFolder('scripts');
const printAllJsFileNames$ = scripts.selectRx('**/*.js').pipe(
rx.map(v => v.name)
etl.log()
);
etl.run(printAllJsFileNames$)
FTP
Endpoint to access files on ftp and ftps servers. Implementation based on Basic ftp package.
Endpoint
Methods:
// options: specify connection parameters
constructor(options: AccessOptions, verbose: boolean = false);
// Creates new Collection object to get remote folder contents
// folderPath: remote path to the ftp folder and identificator of the creating collection object
// options: Some options how to display this endpoint
getFolder(folderPath: string = '.', options: CollectionOptions<FileInfo> = {}): Collection;
// Release FilesystemCollection
// folderPath: identificator of the releasing collection object
releaseFolder(folderPath: string);
Collection
Methods:
// Create the observable object and send files and folders information to it
selectRx(): BaseObservable<FileInfo>;
// Create folder or file.
// remoteFolderPath, remoteFilePath, remotePath: remote path to be created
// localFilePath: Local source file path
// sourceStream: Source stream
// fileContents: String as file contents
async insertFolder(remoteFolderPath: string);
async insertFile(remoteFilePath: string, localFilePath: string);
async insertFile(remoteFilePath: string, sourceStream: Readable);
async insertFileWithContents(remoteFilePath: string, fileContents: string);
// isFolder: flag to indicate want want to add folder or file
// Only one of localFilePath, sourceStream, contents can be specified here
async insert(remotePath: string, contents: { isFolder: boolean, localFilePath?: string, sourceStream?: Readable, contents?: string });
// Delete file or folder with all it's contents
// remoteFolderPath, remoteFilePath, remotePath: Remote path to file or folder we want to delete
async deleteFolder(remoteFolderPath: string);
async deleteEmptyFolder(remoteFolderPath: string); // raise the exception if the specified folder is not empty
async deleteFile(remoteFilePath: string);
async delete(remotePath: string);
Example:
import * as etl from "etl-gun";
import * as rxjs from "rxjs";
const ftp = new etl.filesystems.Ftp.Endpoint({host: process.env.FTP_HOST, user: process.env.FTP_USER, password: process.env.FTP_PASSWORD});
const folder = ftp.getFolder('/var/logs');
const PrintFolderContents$ = folder.selectRx().pipe(
etl.log()
)
await etl.run(PrintFolderContents$);
SFTP
Endpoint to access files by sftp. Implementation based on ssh2-sftp-client package.
Endpoint
Collection
WebDAV
Endpoint to access remote filesystem via WebDAV protocol. Implementation based on webdav package.
Endpoint
Collection
Csv
Parses source csv file into individual records or write record to the end of destination csv file. Every record is csv-string and presented by array of values.
Endpoint
Methods:
// Create collection object for the specified file
// filename: full or relative name of the csv file and identificator of the creating collection object
// delimiter: delimiter of values in one string of file data, equals to ',' by default
// guiOptions: Some options how to display this endpoint
getFile(filename: string, delimiter: string = ",", guiOptions: CollectionGuiOptions<string[]> = {}): Collection;
// Release collection object
// filename: identificator of the releasing collection object
releaseFile(filename: string);
Collection
Methods:
// Create the observable object and send file data to it string by string
// skipFirstLine: skip the first line in the file, useful for skip header
// skipEmptyLines: skip all empty lines in file
selectRx(skipFirstLine: boolean = false, skipEmptyLines = false): Observable<string[]>;
// Add row to the end of file with specified value
// value: what will be added to the file
async insert(value: string[]);
// Clear the csv file
async delete();
Example:
import * as etl from "etl-gun";
const csv = new etl.Csv.Endpoint('~');
const testFile = csv.getFile('test.csv')
const logTestFileRows$ = testFile.selectRx().pipe(
etl.log()
);
etl.run(logTestFileRows$)
Json
Read and write json file with buffering it in memory. You can get objects from json by path specifing in JSONPath format or in lodash simple path manner (see logash 'get' function documentation).
Endpoint
Methods:
// Create collection object for the specified file
// filename: full or relative name of the json file and identificator of the creating collection object
// autosave: save json from memory to the file after every change
// autoload: load json from the file to memory before every get or search operation
// encoding: file encoding
// guiOptions: Some options how to display this endpoint
getFile(filename: string, autosave: boolean = true, autoload: boolean = false, encoding?: BufferEncoding, guiOptions: CollectionGuiOptions<number> = {}): Collection;
// Release collection object
// filename: identificator of the releasing collection object
releaseFile(filename: string);
Collection
Methods:
// Find and send to observable child objects by specified path
// path: search path in lodash simple path manner
// jsonPath: search path in JSONPath format
// options: see below
selectRx(path: string, options?: ReadOptions): Observable<any>;
selectByJsonPath(jsonPath: string | string[], options?: ReadOptions): Observable<any>;
// Find and return child object by specified path
// path: search path in lodash simple path manner
// jsonPath: search path in JSONPath format
get(path: string): any;
getByJsonPath(jsonPath: string): any;
// If fieldname is specified, the function find the object by path and add value as its field
// If fieldname is not specified, the function find the array by path and push value to it
// value: what will be added to the json
// path: where value will be added as child, specified in lodash simple path manner
// fieldname: name of the field to which the value will be added,
// and flag - is we add value to array or to object
async insert(value: any, path?: string, fieldname?: string);
// Clear the json file and write an empty object to it
async delete();
// Reload the json to the memory from the file
load();
// Save the json from the memory to the file
save();
Types:
type JsonReadOptions = {
searchReturns?: 'foundedOnly' // Default value, means that only search results objects will be sended to observable by the function
| 'foundedImmediateChildrenOnly' // Only the immidiate children of search results objects will be sended to observable
| 'foundedWithDescendants'; // Recursive send all objects from the object tree of every search result, including search result object itself
addRelativePathAsField?: string; // If specified, the relative path will be added to the sended objects as addRelativePathAsField field
}
Example:
import * as etl from "etl-gun";
import { tap } from "rxjs";
const json = new etl.Json.Endpoint('~');
const testFile = etl.getFile('test.json');
const printJsonBookNames$ = testFile.selectRx('store.book').pipe(
tap(book => console.log(book.name))
);
const printJsonAuthors$ = testFile.selectByJsonPath('$.store.book[*].author', {searchReturns: 'foundedOnly', addRelativePathAsField: "path"}).pipe(
etl.log()
);
await etl.run(printJsonAuthors$, printJsonBookNames$);
Xml
# etl.XmlEndpoint(filename, autosave?, autoload?, encoding?)
Read and write XML document with buffering it in memory. You can get nodes from XML by path specifing in XPath format.
Endpoint
Methods:
// Create collection object for the specified file
// filename: full or relative name of the xml file and identificator of the creating collection object
// autosave: save xml from memory to the file after every change
// autoload: load xml from the file to memory before every get or search operation
// encoding: file encoding
// guiOptions: Some options how to display this endpoint
getFile(filename: string, autosave: boolean = true, autoload: boolean = false, encoding?: BufferEncoding, guiOptions: CollectionGuiOptions<string[]> = {}): Collection;
// Release collection object
// filename: identificator of the releasing collection object
releaseFile(filename: string);
Collection
Methods:
// Find and send to observable child objects by specified xpath
// xpath: xpath to search
// options: see below
selectRx(xpath: string = '', options: XmlReadOptions = {}): EtlObservable<Node>;
// Find and return child node by specified path
// xpath: search path
get(xpath: string = ''): XPath.SelectedValue
// If attribute is specified, the function find the object by xpath and add value as its attribute
// If attribute is not specified, the function find the node by xpath and push value as its child node
// value: what will be added to the xml
// xpath: where value will be added as child, specified in lodash simple path manner
// attribute: name of the attribute which value will be setted,
// and flag - is we add value as attribute or as node
async insert(value: any, xpath: string = '', attribute: string = '');
// Clear the xml file and write an empty object to it
async delete();
// Reload the xml to the memory from the file
load();
// Save the xml from the memory to the file
save();
Types:
export type XmlReadOptions = {
searchReturns?: 'foundedOnly' // Default value, means that only search results nodes will be sended to observable by the function
| 'foundedImmediateChildrenOnly' // Only the immediate children of search results nodes will be sended to observable
| 'foundedWithDescendants'; // Recursive send all nodes from the tree of every searched result, including searched result node itself
addRelativePathAsAttribute?: string; // If specified, the relative path will be added to the sended nodes as attribute, specified with this value
}
Example
import * as etl from "etl-gun";
import { map } from "rxjs";
const xml = new etl.Xml.Endpoint('/tmp');
const testFile = xml.getFile('test.xml');
const printXmlAuthors$ = testFile.selectRx('/store/book/author').pipe(
map(v => v.firstChild.nodeValue),
etl.log()
);
await etl.run(printXmlAuthors$);
Knex
Represents common Knex database. Based on knex engine.
KnexEndpoint
Methods:
constructor(client: ClientType, connectionString: string, pool?: PoolConfig);
constructor(client: ClientType, connectionConfig: ConnectionConfig, pool?: PoolConfig);
constructor(knexConfig: pkg.Knex.Config);
// Create collection object for the specified database table
// table: name of database table and identificator of the creating collection object
// options: Some options how to display this endpoint
getTable<T = Record<string, any>>(table: string, options: CollectionOptions<string[]> = {}): KnexTableCollection<T>;
// Create collection object for the specified sql query result
// collectionName: identificator of the creating collection object
// query: sql query
// options: Some options how to display this endpoint
getQuery<T = Record<string, any>>(collectionName: string, query: string, options: CollectionOptions<string[]> = {}): KnexQueryCollection<T>;
// Release collection object
// table: identificator of the releasing collection object
releaseCollection(collectionName: string);
// Release all collection objects, endpoint object and release connections to database.
async releaseEndpoint();
KnexTableCollection
Presents the table from the database.
Methods:
// Create the observable object and send data from the database table to it
// where: you can filter incoming data by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
selectRx(where: SqlCondition<T>, fields?: string[]): BaseObservable<T>;
selectRx(whereSql?: string, whereParams?: any[], fields?: string[]): BaseObservable<T>;
// Insert value to the database table
// value: what will be added to the database
async insert(value: T): Promise<number[]>;
async insert(values: T[]): Promise<number[]>;
// Update all rows in database table which match to the specified condition
// where: you can filter table rows to deleting by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
// value: what will be set as new value for updated rows
async update(value: T, where: SqlCondition<T>): Promise<number>;
async update(value: T, whereSql?: string, whereParams?: any[]): Promise<number>;
// Update all rows in database table which match to the specified condition
// where: you can filter table rows to deleting by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
// value: what will be set as new value for updated rows
async upsert(value: T): Promise<number[]>;
// Delete rows from the database table by condition
// where: you can filter table rows to deleting by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
async delete(where: SqlCondition<T>): Promise<number>;
async delete(whereSql?: string, whereParams?: any[]): Promise<number>;
KnexQueryCollection
Readonly collection of sql query results.
Methods:
// Create the observable object and send data from the database table to it
// where: you can filter incoming data by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
selectRx(params?: any[]): BaseObservable<T>;
// Execute query with specified parameters and return the first founded record or null
async get(params?: any[]): Promise<T> {
// Execute query with specified parameters and return founded records
async find(params?: any[]): Promise<T[]>
CockroachDB
Represents CockroachDB database. Endpoint implementation based on KnexEndpoint. You should install node-postgres (aka 'pg') package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.CockroachDb.Endpoint('postgres://user:[email protected]:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
MariaDB
Represents MariaDB database. Endpoint implementation based on KnexEndpoint. You should install mysql package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.MariaDb.Endpoint('mysql://user:[email protected]:3306/database');
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
MS SQL Server
Represents MS SQL Server database. Endpoint implementation based on KnexEndpoint. You should install tedious package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.SqlServer.Endpoint('mssql://user:[email protected]:1433/database');
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
MySQL
Represents MySQL database. Endpoint implementation based on KnexEndpoint. You should install mysql package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.MySql.Endpoint('mysql://user:[email protected]:3306/database');
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
Oracle DB
Represents Oracle database. Endpoint implementation based on KnexEndpoint. You should install oracledb package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.OracleDb.Endpoint({
host: config.oracle.host,
user: config.oracle.user,
password: config.oracle.password,
database: config.oracle.database,
});
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
Postgres
Represents PostgreSQL database. Endpoint implementation based on KnexEndpoint. You should install node-postgres (aka 'pg') package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.Postgres.Endpoint('postgres://user:[email protected]:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
Amazone Redshift
Represents Amazone Redshift database. Endpoint implementation based on KnexEndpoint. You should install node-postgres (aka 'pg') package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.Redshift.Endpoint('postgres://user:[email protected]:5439/database');
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
SQLite
Represents SQLite3 database. Endpoint implementation based on KnexEndpoint. You should install sqlite3 package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.SqlLite.Endpoint(connection: {
filename: "./mydb.sqlite"
});
const table = pg.getTable('users');
const logUsers$ = table.selectRx().pipe(
etl.log()
);
etl.run(logUsers$)
GeneralEmail
Endpoint to send and recive emails. Implements SMTP protocol and based on nodemailer library.
Endpoint
Methods:
constructor(connectionOptions: ConnectionOptions);
async releaseEndpoint();
async send(to: string[] | string, subject: string, body: string, cc?: string[] | string, bcc?: string[] | string, from?: string): Promise<SendError | undefined>;
async send(value: EMail): Promise<SendError | undefined>;
getInbox(options: CollectionOptions<EMail> = {}): Collection;
releaseInbox();
getMailbox(mailBox: string, options: CollectionOptions<EMail> = {}): Collection;
releaseMailbox(mailBox: string);
Collection
Presents emails mailbox.
Methods:
// Create the observable object and send product data from the Magento to it
selectRx(): BaseObservable<EMail>;
selectRx(searchOptions: SearchOptions, markSeen?: boolean): BaseObservable<EMail>;
selectRx(range: string, markSeen?: boolean): BaseObservable<EMail>;
selectRx(searchCriteria: any[], markSeen?: boolean ): BaseObservable<EMail>;
async get(UID: string | number, markSeen: boolean = false): Promise<EMail>;
Example:
import * as etl from "etl-gun";
const gmail = new etl.messangers.Gmail.Endpoint(process.env.GMAIL_USER!, process.env.GMAIL_PASSWORD!);
const inbox = gmail.getInbox();
const PrintMails$ = inbox.selectRx({seen: false}).pipe(
rx.take(10),
etl.log()
)
await etl.run(PrintMails$);
const mail = await inbox.get(1463);
console.log(mail);
Gmail
Endpoint to work with Gmail service. Based on GeneralEmail endpoint.
You should to get the application password from Gmail service to use this endpoint. Follow this steps to get it:
- Login to you Gmail account
- Open this link https://myaccount.google.com/security
- Enable 2 factor authentication
- Go to https://myaccount.google.com/apppasswords
- From Select App options select Other and write your app name (it could be any name like mycustomapp)
- It will generate you the password - copy the password from the popup
- Use that copied password in the application password parameter in the Gmail endpoint constructor.
Endpoint
Methods:
constructor(userEmail: string, appPassword: string);
async releaseEndpoint();
async send(to: string[] | string, subject: string, body: string, cc?: string[] | string, bcc?: string[] | string, from?: string): Promise<SendError | undefined>;
async send(value: EMail): Promise<SendError | undefined>;
getInbox(options: CollectionOptions<EMail> = {}): Collection;
releaseInbox();
getMailbox(mailBox: string, options: CollectionOptions<EMail> = {}): Collection;
releaseMailbox(mailBox: string);
Collection
Presents emails mailbox.
Methods:
// Create the observable object and send product data from the Magento to it
selectRx(): BaseObservable<EMail>;
selectRx(searchOptions: SearchOptions, markSeen?: boolean): BaseObservable<EMail>;
selectRx(range: string, markSeen?: boolean): BaseObservable<EMail>;
selectRx(searchCriteria: any[], markSeen?: boolean ): BaseObservable<EMail>;
async get(UID: string | number, markSeen: boolean = false): Promise<EMail>;
Example:
import * as etl from "etl-gun";
const gmail = new etl.messangers.Gmail.Endpoint(process.env.GMAIL_USER!, process.env.GMAIL_PASSWORD!);
const inbox = gmail.getInbox();
const PrintMails$ = inbox.selectRx({seen: false}).pipe(
rx.take(10),
etl.log()
)
await etl.run(PrintMails$);
const mail = await inbox.get(1463);
console.log(mail);
SMS.RU
Endpoint to work with http://sms.ru service.
You should have the account on this service to use it.
Endpoint
Methods:
constructor(apiId: string);
async send(message: string, toPhone: string, from?: string): Promise<SendError | undefined>;
async send(message: string, toPhones: string[], from?: string): Promise<SendError | undefined>;
Example:
import * as etl from "etl-gun";
const sms = new etl.messangers.SmsRu();
const err = await sms.sendSms('hello', '123-45-67');
if (err) console.log(err); // log error if any
Magento
Presents Magento CMS objects via Magento REST API.
It uses token-based authentication to access API. You must register a web service on Admin. Use the following general steps to set up Magento to enable web services:
- Create a web services user on Admin by selecting System > Permission > All Users > Add New User. (If you are using session-based or OAuth authentication, you do not need to create the new user in the Admin.)
- Create a new integration on Admin. To create an integration, click System > Extensions > Integration > Add New Integration. Be sure to restrict which resources the integration can access.
See the official guide for more information. Or you can go to this article for details how to configure Magento integration to get access to it's API.
Endpoint
Methods:
// magentoUrl: Url of Magento
// login: admin login
// password: admin password
// rejectUnauthorized: You can set it to true to ignore ssl servificate problems while development.
constructor(magentoUrl: string, login: string, password: string, rejectUnauthorized: boolean = true);
// Create collection object for the Magento products
// guiOptions: Some options how to display this endpoint
getProducts(guiOptions: CollectionGuiOptions<Partial<Product>> = {}): ProductsCollection;
// Release products collection object
releaseProducts();
ProductsCollection
Presents Magento CMS products.
Methods:
// Create the observable object and send product data from the Magento to it
// where: you can filter products by specifing object with fields as collumn names and it's values as fields values
// fields: you can select which products fields will be returned (null means 'all fields')
selectRx(where: Partial<Product> = {}, fields: (keyof Product)[] = null): BaseObservable<Partial<Product>> ;
// Add new product to the Magento
// value: product fields values
async insert(value: NewProductAttributes);
// Upload image to the magento and set it as image of specified product and returns total count of images for this product
// product: product sku
// imageContents: binary form of the image file
// filename: name of the file in with magento will store the image
// label: label of the product image
// type: mime type of the image
async uploadImage(product: {sku: string} | string, imageContents: Blob, filename: string, label: string, type: "image/png" | "image/jpeg" | string): Promise<number>;
// Operator to upload product image from the pipe
uploadImageOperator<T>(func: (value: T) => {product: {sku: string} | string, imageContents: Blob, filename: string, label: string, type: "image/png" | "image/jpeg" | string}): OperatorFunction<T, T>;
// Utility static function to get products as array
static async getProducts(endpoint: Endpoint, where: Partial<Product> = {}, fields: (keyof Product)[] = null): Promise<Partial<Product>[]>;
Example:
import * as etl from "etl-gun";
const magento = new etl.Magento.Endpoint('https://magento.test', process.env.MAGENTO_LOGIN!, process.env.MAGENTO_PASSWORD!);
const products = magento.getProducts();
const logProductsWithPrice100$ = products.selectRx({price: 100}).pipe(
etl.log()
);
etl.run(logProductsWithPrice100$)
StockCollection
Presents Magento CMS stock items. Stock items - is products on stock.
Methods:
// Create the observable object and send stock items data from the Magento to it
// sku, product: you can filter stock items by product attributes
selectRx(sku: string): BaseObservable<StockItem>;
selectRx(product: Partial<Product>): BaseObservable<StockItem>;
// Get stock item for specified product
// sku, product: product, wich stock items we need to get
public async getStockItem(sku: string): Promise<StockItem>;
public async getStockItem(product: {sku: string}): Promise<StockItem>;
// Update product stock quantity
public async updateStockQuantity(sku: string, quantity: number);
public async updateStockQuantity(product: {sku: string}, quantity: number);
Trello
Presents Trello task tracking system objects. For details how to get API key and authorization token please read Trello documentation.
Endpoint
Methods:
// url: Trello web url
// apiKey: Trello API key
// authToken: Trello authorization token
// rejectUnauthorized: You can set it to true to ignore ssl servificate problems while development.
constructor(apiKey: string, authToken: string, url: string = "https://trello.com", rejectUnauthorized: boolean = true);
// Create collection object for the Trello user boards
// username: user, which boards we need to get, by default it is a Trello authorization token owner
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getUserBoards(username: string = 'me', collectionName: string = 'Boards', guiOptions: CollectionGuiOptions<Partial<Board>> = {}): BoardsCollection;
// Create collection object for the Trello board lists
// boardId: board id
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getBoardLists(boardId: string, collectionName: string = 'Lists', guiOptions: CollectionGuiOptions<Partial<List>> = {}): ListsCollection;
// Create collection object for the Trello list cards
// listId: list id
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getListCards(listId: string, collectionName: string = 'Cards', guiOptions: CollectionGuiOptions<Partial<Card>> = {}): CardsCollection;
// Create collection object for the Trello card comments
// cardId: card id
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getCardComments(cardId: string, collectionName: string = 'Comments', guiOptions: CollectionGuiOptions<Partial<Comment>> = {}): CommentsCollection;
// Release collection data
// collectionName: identificator of the releasing collection object
releaseCollection(collectionName: string);
BoardsCollection
Presents Trello boards accessible by user which was specified while collection creation.
Methods:
// Create the observable object and send boards data from the Trello to it
// where (does not working now!): you can filter boards by specifing object with fields as collumn names and it's values as fields values
// fields (does not working now!): you can select which board fields will be returned (null means 'all fields')
selectRx(where: Partial<Board> = {}, fields: (keyof Board)[] = null): EtlObservable<Partial<Board>>;
// Add new board to the Trello
// value: board fields values
async insert(value: Omit<Partial<Board>, 'id'>);
// Update board fields values by board id
// boardId: board id
// value: new board fields values as hash object
async update(boardId: string, value: Omit<Partial<Board>, 'id'>);
// Get all user boards
async get(): Promise<Board[]>;
// Get board by id
// boardId: board id
async get(boardId?: string): Promise<Board>;
// Get board by url from browser
async getByBrowserUrl(url: string): Promise<Board>;
ListsCollection
Presents Trello lists on board which was specified while collection creation.
Methods:
// Create the observable object and send lists data from the Trello to it
// where (does not working now!): you can filter lists by specifing object with fields as collumn names and it's values as fields values
// fields (does not working now!): you can select which list fields will be returned (null means 'all fields')
selectRx(where: Partial<List> = {}, fields: (keyof List)[] = null): EtlObservable<Partial<List>>;
// Add new list to the Trello
// value: list fields values
async insert(value: Omit<Partial<List>, 'id'>);
// Update list fields values by list id
// listId: list id
//