roadiejs-import
v0.0.8
Published
RoadieJS plug-in providing bulk data import functionality
Downloads
6
Readme
roadiejs-import
A plugin for RoadieJS
A configurable ETL pipeline, based on Node.js streams.
Contents
API
createImportStream
Registers a new import stream, and goes on to start importing.
Request
POST /streams
{
"namespace": "roadietest",
"blueprintName": "importPlanets",
"blueprintVersion": 1,
"localVersion": 0,
"importStream": "planetCsv",
"missingAction": "error",
"source": {
"type": "file",
"options": {
"paths": "./test/planets/import_files/advanced.csv"
}
}
}
| Name | Notes
| ---- | ----
| namespace
| Namespace of the blueprint which contains the import
element.
| blueprintName
| Name of the blueprint which contains the import
element.
| blueprintVersion
| Version number of the blueprint which contains the import
element.
| localVersion
| Local version number of the blueprint which contains the import
element.
| importStream
| The id
of an importStream
element, that's defined in the identified blueprint.
| missingAction
| Controls the behaviour if trying to update/delete a document that doesn't exist. Valid values are warning
, error
.
| source
| An object to configure the source of the import.
Sources
Data can be streamed from multiple sources. The source
object is therefore mandatory, and has two common keys:
| Name | Notes
| ---- | ----
| type
| The type of import (e.g. file
).
| options
| An object containing config specific to the type of import (see below).
file
Imports data from files stored on the local file system.
"source": {
"type": "file",
"options": {
"paths": "./test/school/import_files/full_school_dump.csv"
}
}
| Name | Notes
| ---- | ----
| paths
| Defines which files should be loaded. Supports paths to single files, *
wildcards, glob-style **
(for directory recursion) and arrays of multiple strings.
Response
Status 201
{
"_id": "557c3834f0f1c14e25220e8b",
"_created": "2015-06-13T14:03:32.571Z",
"namespace": "roadietest",
"blueprintName": "importPlanets",
"blueprintVersion": 1,
"blueprintLocalVersion": 0,
"importStream": "planetCsv",
"totalSize": 1147,
"status": "starting",
"processedSize": 0,
"count": 0,
"warnings": 0,
"failures": 0,
"notDealtWith": 0
}
| Name | Notes
| ---- | ----
| _id
| The unique database-generated id
for the import process.
| _created
| A timestamp of when the import was registered.
| namespace
| Namespace of the blueprint, as supplied in the request.
| blueprintName
| Name of the blueprint, as supplied in the request.
| blueprintVersion
| Version of the blueprint, as supplied in the request.
| blueprintLocalVersion
| Local version of the blueprint, as supplied in the request.
| importStream
| The id
of an importStream
element, as supplied in the request.
| totalSize
| The number of units the import is estimated to be. Most likely number of bytes.
| status
| Current status of the import (expect starting
).
| processedSize
| How many units have been imported so far (expect 0 at this point).
| count
| Total count of documents which have been processed (regardless of whether they succeeded or failed).
| warnings
| Number of documents, within the overall count
, that have raised a warning.
| failures
| Number of documents, within the overall count
, that have failed.
| notDealtWith
| Number of documents, within the overall count
, that did not match any record pattern.
getImportStreamStatus
Get the latest status of a flow.
Request
GET /streams/:id
| Name | Notes
| ---- | ----
| id
| The id
that uniquely identifies an import (e.g. the _id
value returned from createImportStream
).
Response
Status 200
{
"_id": "557c6f1487a62fff374fa2ed",
"_created": "2015-06-13T17:57:40.707Z",
"namespace": "roadietest",
"blueprintName": "importSchools",
"blueprintVersion": 1,
"blueprintLocalVersion": 0,
"importStream": "studentCsv",
"totalSize": 364,
"status": "succeeded",
"finished": "2015-06-13T17:57:40.820Z",
"processedSize": 364,
"count": 6,
"warnings": 0,
"failures": 0,
}
| Name | Notes
| ---- | ----
| _id
| The unique database-generated id
for the import process (e.g. the id
provided as a parameter as part of the request).
| _created
| A timestamp of when the import was registered.
| namespace
| Namespace of the blueprint, as supplied in the request.
| blueprintName
| Name of the blueprint, as supplied in the request.
| blueprintVersion
| Version of the blueprint, as supplied in the request.
| blueprintLocalVersion
| Local version of the blueprint, as supplied in the request.
| importStream
| The id
of an importStream
element, as supplied in the request.
| totalSize
| The number of units the import is estimated to be. Most likely number of bytes.
| status
| Current status of the import, valid values are starting
, succeeding
, warning
, failing
, warned
, failed
, succeeded
.
| finished
| Timestamp of when the flow finished (not present if it's still running).
| processedSize
| How many units have been imported so far
| count
| Total count of documents which have been processed (regardless of whether they succeeded or failed).
| warnings
| Number of documents, within the overall count
, that have raised a warning.
| failures
| Number of documents, within the overall count
, that have failed.
| notDealtWith
| Number of documents, within the overall count
, that did not match any record pattern.
getImportStreamMessages
Returns an array of messages that have been generated by the specified flow (ordered-by creation timestamp ascending).
Request
GET /streams/:id/messages
| Name | Notes
| ---- | ----
| id
| The id
that uniquely identifies an import (e.g. the _id
value returned from createImportStream
).
Response
Status 200
[
{ "_id": "557c7680f09749d93b88619f",
"transactionId": "557c7680f09749d93b88619a",
"schemaName": "students",
"namespace": "roadietest",
"blueprintName": "importSchools",
"blueprintVersion": 1,
"blueprintLocalVersion": 0,
"type": "warning",
"name": "noDoc",
"message": "Unable to find document"
}
]
| Name | Notes
| ---- | ----
| _id
| A unique value to identify the message.
| transactionId
| The unique database-generated id
for the import process (e.g. the id
provided as a parameter as part of the request).
| schemaName
| The id
of a schema related to the message.
| namespace
| Namespace of the blueprint responsible for the import.
| blueprintName
| Name of the blueprint responsible for the import.
| blueprintVersion
| Version of the blueprint responsible for the import.
| blueprintLocalVersion
| Local version of the blueprint responsible for the import.
| type
| Type of message: a value from info
, warning
, error
or exception
.
| name
| Name (e.g. code) of the message.
| message
| Short message content
| body
| Data to support the message (content specific to the type/name of message)
Elements
importStream
Registers a new import (e.g. a way of importing data into schemas within the blueprint).
Example
{
"id": "planetCsv",
"element": "importStream",
"config": {
"parser": {
"type": "csv",
"options": {
"delimiter": ",",
"qualifier": "\\"
}
},
"target": {
"type": "data"
}
}
}
Config
| Name | Type | Notes
| ----------- | -------| -----------
| parser
| object
| An object that should contain a type
string (e.g. csv
) for identifying a parser, and an options
object for configuring the parser.
| target
| object
| An object that configures a supported target for the import. The object must include a type
value to identify a target.
Parsers
A parser
takes the raw data stream from a source (configured via createImportStream
) and turns it into a usable object for passing onto an adaptor.
csv
The csv parser expects a source that can provide individual chunks of data (typically a line from a file).
- Internally, parsing is handled via the csv-parse package.
- The
options
defined for the parser are passed through to acsv-parse
parser. More information here.
Adaptors
An adaptor
takes the output of a parser and maps it to fields in a schema.
- There's no need to explicitly define an adaptor.
- If an adaptor hasn't been defined, then an adaptor with the same name as the parser is used.
- The behaviour of an adaptor depends on its type.
csv
The csv
adaptor expects one or more csvRecord
elements to be defined as a child element of the importStream
element.
Targets
A target
is the final destination in the import pipeline, and does something with the output of an adaptor.
data
- Hooks into
roadiejs-data
so the object produced out of the adaptor can persisted.
console
- Outputs the object produced out of the adaptor to the console.
csvRecord
If an importStream
element has a parser of type csv
, then one or more csvRecord
child elements should be defined for it.
- The purpose of a
csvRecord
is to transform the output of acsv
parser to a schema/field structure. - Multiple
csvRecord
elements can be configured under animportStream
element - as it's possible to 'identify' a suitable schema from the available csv columns. - A special
csv
array will be accessible when evaluating expressions, this reflects the parsed columns from the underlying CSV data.
Example
{
"id": "craterRecord",
"element": "csvRecord",
"parent": "importStream.planetCsv",
"config": {
"schemaId": "planets",
"recordIdentification": "csv[0]=='crater'",
"actionIdentification": {
"post": "csv[1]=='I'",
"put": "csv[1]=='U'",
"upsert": "csv[1]=='M'",
"del": "csv[1]=='D'"
},
"paramMap": [
"csv[5]",
"moons",
"csv[6]",
"craters",
"csv[2]"
],
"data": {
"title": "csv[3]",
"diameter": "csv[4]"
}
}
}
Config
| Name | Type | Notes
| ------------ | -------- | -----------
| schemaId
| string
| The id
of a schema defined within the blueprint that the CSV data will be ultimately persisted..
| recordIdentification
| string
| Optional. An expression. If it evaluates to true
then the config of this csvRecord
element will be used to transform the CSV data into a field structure.
| actionIdentification
| object
| Optional. Maps an action (e.g. post
, put
, upsert
or del
) to an expression. If it evaluates to true
then that action will be used to persist/delete the transformed data.
| paramMap
| [String]
| Optional. An array of strings. Maps parameters (starting at docId
) of a /data
route to the contents of the CSV record. It is therefore possible target sub-docs.
| data
| object
| Maps a field name to an expression. The result of the expression will then be used as the value for that field.
populate
A simple way to populate with data - useful for supplying reference/lookup data from within a blueprint definition.
- Ensure a
populate
element is a child of the aschema
element you wish to populate. - Schemas will only ever be populated once, and will not be re-asserted every time the blueprint is used
Example
{
"id": "statesPopulator",
"element": "populate",
"parent": "schema.states",
"config": {
"map": [
"name",
"abbreviation",
"capitalCity",
"mostPopulatedCity",
"population",
"squareMiles"
],
"data": [
["ALABAMA", "AL", "Montgomery", "Birmingham", 4708708, 52423],
["ALASKA", "AK", "Juneau", "Anchorage", 698473, 656425],
["ARIZONA", "AZ", "Phoenix", "Phoenix", 6595778, 114006],
["ARKANSAS", "AR", "Little Rock", "Little Rock", 2889450, 53182],
["CALIFORNIA", "CA", "Sacramento", "Los Angeles", 36961664, 163707],
["COLORADO", "CO", "Denver", "Denver", 5024748, 104100],
["CONNECTICUT", "CT", "Hartford", "Bridgeport", 3518288, 5544]
]
}
}
Config
| Name | Type | Notes
| ------------ | -------| -----------
| map
| [string]
| An array of strings, each a field name within the schema you wish to populate. The order is important...
| data
| [array]
| An array of arrays - mimicking a record/field structure. The values of each 'record' should be in the same order as defined in map
.