@ashnazg/squirrelnado
v0.0.35
Published
a utility toolbelt for handling data
Downloads
47
Readme
title: "@ashnazg/squirrelnado" sidebar_label: Squirrelnado
a refactor of squirrelnado to resolve issues pinning it to node v15.11.0, and increase unixy reusability of its innards.
Usage:
const {read} = require('@ashnazg/squirrelnado2'); // https://ashnazg.com/docs/lib/squirrelnado
// migrate data from one place to another:
await read(src).write(dst);
await read('mysql://server/db/schema/table').write('table.csv');
// load data to memory:
const json_rows = await read(src);
mutate the data in flight
mutate with .map(transformFunc)
: any time before a .write()
, you can alter rows:
const fixed_rows = await read(src).map(row => {
row.id = 'mysrc-' + row.id;
return row;
});
Warning: For simple performance, the system is using pass by ref, so if you're consuming the data in multiple places from the same memory-copy, you'll see cross-contamination. You can either:
- do a shallow fork:
return {...row}
- persist the common starting point to disk first, and run both consumers off of that
filtering out rows
While you can use .filter(test)
, .map()
can drop rows as well:
const fixed_rows = await read(src).map(row => {
if (row.bad) return []; // use the 'multiple rows response' format to tell sqrl that none are needed.
// I like this row, alter it and send it on:
row.id = 'mysrc-' + row.id;
return row;
});
I used to use undefined
instead of []
, but I accidentially forgot to return row;
in so many trivial map steps that I now throw an error instead.
returning more rows
If you return an array of rows from .map()
, those will be streamed to the next step. (dropping, 1:1, and 1:many can all happen in the same step.)
const fixed_rows = await read(src).map(row => {
if (row.bad) return;
if (row.children) {
return [
{id: row.id + 'child-1', parent: row.id},
{id: row.id + 'child-2', parent: row.id}
];
}
row.id = 'mysrc-' + row.id;
return row;
});
visiting rows without using up memory
If you want to iterate over a terabyte of data, but have no reason to write a stream of rows anywhere, you probably don't want the final .map()
step resolving to all the
input-rows. .forEach(visitor)
is shorthand for .map(async row => {await visitor(row); return [];})
, so await read('data').forEach(row => {...})
resolves to a harmless
empty array.
visiting rows asynchronously
While promises are always in play at the per-file and per-sequence level, async functions at the row level drastically slower than old fashioned callbacks -- 20x slower than disk I/O in one 7gig job I've done.
While I've got a plan to build async-friendliness into .map
later, for now, no.
DB control flags
read('foo.csv').write('mysql://server/db/schema/table?existing=truncate');
// which is the same as:
read('foo.csv').write('mysql://server', {existing: 'truncate', table:'table', schema:'schema', database:'db'});
DB table schemas
The above sample has to autodetect the columns in foo.csv in order to create INSERT foo (col1, col2) VALUES (...)
statements -- it'll make guesses based on the first row, which
can't work if there are nulls, or the first row has less than 100% of the fields in the whole set.
There are two ways to do this right:
- if the source system is a real DB (and the driver's parsing the metadata) and there are no
.map()
steps, the same column traits (type, length, nullability) will propagate. - For the other 99% of the time, the write step needs a columns definition:
read('foo.csv').write('mysql://server/db/schema/table?existing=truncate', {columns: {
src_id: 'INT NOT NULL',
name: 'VARCHAR(32) NULL'
}});
.mapColumns({dst_column: 'src_column'})
is a specialized version of .map()
that propagates original traits, if present.
Since mapColumns
tosses out any fields not mentioned, sometimes .renameColumns('src_col', 'dst_col')
is more convenient, since it leaves other columns alone. .deleteColumn('src')
is
the same as .renameColumns('src', null)
.
these three treat SINGLE param mode func('single,csv,string')
the same as func('var', 'args', '...')
.renameColumns('from1', 'to1', 'from2', 'to2')
takes an even number of strings and an optional config object..dropColumns('unwanted1', 'unwanted2')
is built with the same flavor..keepColumns('field1,field2')
is like mapColumns with no renaming, just include/exclude behavior.
keepColumns also accepts a ColSpec and just ignores everything but the map keys:
columns = {a: 'INT', b: 'REAL'};
step.keepColumns(columns).write('db://', {columns});
existing
|value|behavior
|-|-
|undefined|check if table exists, run DDL if not (if the driver supports CREATE IF NOT EXISTS
, this is just a prelude on the first batch of rows instead of a separate query.)
|true|assume table exists, don't waste a trip to the server checking
|'truncate'|remove existing rows
|'replace'|uses 'CREATE OR REPLACE' (or 'DROP' if the driver doesn't support OR REPLACE.)
errors/stats
To make CLI and live-service error management more symmetrical, it uses @ashnazg/pubsub as an internal and app-facing message bus.
prefiguring endpoint details
While the basic plan is to use URLs for most src/dst use cases, separating out passwords is important for leak prevention, and you can use that functionality for other connection settings.
The optional config object that read() and write() take as a 2nd param allows you to selectively separate dynamic or high risk configs:
let {ENDPOINT = 'mysql://server/db/schema/table', PASSWORD} = process.env;
if (!PASSWORD) throw new Error('set PASSWORD');
await read('/tmp/tickets/page-*.csv').write(ENDPOINT, {pass: PASSWORD});
Instead of providing configs on each read/write, you can set defaults in the servers{}
map:
const {read, servers} = require('@ashnazg/squirrelnado2'); // https://ashnazg.com/docs/lib/squirrelnado
servers.mytickets = {
url: 'https://tickets.ashnazg.com/api/tickets',
user: 'app',
pass: process.env.APP_PASS
};
servers['etl@upstream'] = {
pass: process.env.APP_PASS
};
servers.report_server = {
protocol: 'mysql',
host: 'reports.ashnazg.com',
user: 'auditor',
pass: process.env.AUDITOR_PASS,
database: 'bi',
schema: 'daily',
table: 'open_tickets',
columns: {
id: 'INT NOT NULL',
src_id: 'VARCHAR(16) NULL'
},
existing: 'truncate'
};
You can exercise the above defaults by using either the bare nickname (like report_server
) as the URL, or by using it as the server name in a full URL:
mysql://report_server/db1/schema_foo/table_bar
.
// the user+hostname+port given in the URL must exactly match the above; it's never going to look up 'upstream' in the following use case:
let record_count = 0; // bad example in that just SQL'ing count(*) would be better, but pretend like we're doing real work here...
await read('etl@upstream').forEach(record => {
++record_count;
});
let last_id = 0;
await read('tickets.csv').map(ticket => {
return {
id: ++last_id,
src_id: 'ticket-' + ticket.id,
meta: "this field is visible to later .map steps, but is not written to mysql as columns{} didn't include it"
};
}).write('report_server');
// as long as the URL has a protocol://, you can give read/write more than just the nickname:
// this will let everything default (the empty slashes are how you can skip the DB/SCHEMA parts) except the table name and column listing.
...write('mysql://report_server///other_table', {columns: {name: 'VARCHAR(32) NOT NULL'}});
redacting passwords and authorization headers
To mitigate the risk when you have the password in the URL, the "preserved" URL stored in the sequence's config has the password redacted, as the URL ends up in the logs. (This is
also why step.server.pass
is a non-enumerable, and not a plain literal password.) If you have secrets elsewhere, like https://server/?auth_token=foo
, it's up to you to redact the
URL. You can alter or delete it without affecting this tool, as the URL is only kept there for debugging convenience.
While src/dst URLs can cover most use cases
misc utils
installing node CLI error traps
sqrl.install
internals
- load all plugins for dependency injection
- parse URLs and confs into coarse steps
- split coarse into true steps
- when launched, create streams for each step, link them together, and run them
- collect results if in RAM mode
parse URL
const parseURL = require('@ashnazg/squirrelnado2/parse-url'); // ~/projects/ashnazg-npm/squirrelnado/newcore/parse-url.js