@ezs/core
v3.10.10
Published
A wrapper to build Stream transformers with functional style
Downloads
1,057
Readme
core
Ce plugin propose une série d'instructions natives. Elles sont constamment disponibles car chargées automatiquement.
installation
npm install @ezs/core
détails
Plusieurs instructions permettent de créer des sous-flux (sub pipeline), à partir d'un fichier d’instructions ou d'instructions imbriquées. Si elles s'utilisent toutes de la même manière (avec les mêmes paramètres) certaines peuvent apparaître comme similaires mais leur fonctionnement est différent :
[delegate]
: 1 sous-flux pour tous les éléments[swing]
: 1 sous-flux pour tous les éléments filtrés selon une condition[spawn]
: 1 sous-flux par élément[loop]
: 1 sous-flux par élément[expand]
: 1 sous-flux pour N éléments (N = size), seul le champ sélectionné est envoyé dans le pipeline[combine]
: 1 sous-flux pour tous les éléments, seul le champ sélectionné est comparé avec le résultat du sous-flux[singleton]
: 1 sous-flux pour le premier élément
usage
Table of Contents
- assign
- combine
- concat
- debug
- dedupe
- delegate
- delegate
- dispatch
- dump
- env
- exchange
- expand
- extract
- fork
- group
- identify
- ignore
- keep
- loop
- map
- metrics
- overturn
- pack
- parallel
- pop
- remove
- replace
- shift
- shuffle
- singleton
- spawn
- swing
- throttle
- time
- tracer
- transit
- truncate
- ungroup
- unpack
- use
- validate
assign
- See: exchange
Affecte une valeur à un champ de l'objet courant. Si le champ existe déjà, sa valeur est écrasée, sinon il est créé
Entrée:
[{
"nom": "un",
"valeur": 1
},
{
"nom": "deux",
"valeur": 2
},
{
"nom": "trois",
"valeur": 3
},
{
"nom": "quatre",
"valeur": 4
}]
Script:
[assign]
path = valeur
value = get("valeur").multiply(2)
Output:
[{
"nom": "un",
"valeur": 2
},
{
"nom": "deux",
"valeur": 4
},
{
"nom": "trois",
"valeur": 6
},
{
"nom": "quatre",
"valeur": 8
}]
Le path
peut être le nom simple d'un champ présent à la racine de l'élément
traité, ou un chemin en notation
pointée, en utilisant
une syntaxe proche de celle de la fonction
get
de Lodash.
Parameters
Returns Object
combine
Takes an Object
and substitute a field with the corresponding value found in a external pipeline
the internal pipeline must produce a stream of special object (id, value)
[
{ year: 2000, dept: 54 },
{ year: 2001, dept: 55 },
{ year: 2003, dept: 54 },
]
Script:
[use]
plugin = analytics
[combine]
path = dept
file = ./departement.ini
Output:
[
{ year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
{ year: 2001, dept: { id: 55, value: 'Meuse' } },
{ year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
]
Parameters
path
String? the path to substitutedefault
String? value if no substitution (otherwise value stay unchanged)primer
String Data to send to the external pipeline (optional, defaultn/a
)file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errorscacheName
String? Enable cache, with dedicated name
Returns Object
concat
Take all String
, concat them and throw just one.
[
"a",
"b",
"c"
]
Script:
[concat]
beginWith = <
joinWith = |
endWith = >
Output:
[
"<a|b|c>"
]
Parameters
beginWith
String? Add value at the beginjoinWith
String? use value to join 2 chunkendWith
String? Add value at the end
Returns String
debug
Take Object
, print it (with its index number), and throw the same object.
Parameters
level
String DEBUG ezs level (depends of DEBUG env variable, see cli parameters) (optional, defaultinfo
)text
String text before the dump (optional, defaultvalueOf
)path
String? path of field to print
Returns Object
dedupe
Take Object
, and check that the object identifier has not already been used previously
Parameters
data
feed
path
String path containing the object Identifier (optional, defaulturi
)ignore
Boolean Just ignore duplicate object (optional, defaultfalse
)
Returns Object
delegate
Break the stream if the control file cannot be checked
Parameters
fusible
String? file to check
Returns Object
delegate
Delegate processing to an external pipeline.
Note: works like spawn, but each chunk share the same external pipeline.
Parameters
file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
dispatch
Dispatch processing to an external pipeline on one or more servers.
Parameters
file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like command
Returns Object
dump
Take all Object
s and generate a JSON array
[
{ "a": 1 },
{ "a": 2 },
{ "a": 3 },
{ "a": 4 },
{ "a": 5 }
]
Script:
[dump]
indent = true
Output:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}
]
Parameters
indent
boolean indent JSON (optional, defaultfalse
)
Returns String
env
Crée une variable d'environnement globale à tout le script.
On l'utilise en général au début du script (après [use]
).
Pour utiliser la variable, il faut employer la fonction env()
.
Entrée:
[{
"nom": "un",
"valeur": 1
},
{
"nom": "deux",
"valeur": 2
}]
Script:
[use]
plugin = basics
[env]
path = nom
value = NOM GÉNÉRIQUE
[JSONParse]
[assign]
path = nom
value = env("nom")
[dump]
indent = true
Sortie:
[{
"nom": "NOM GÉNÉRIQUE",
"valeur": 1
},
{
"nom": "NOM GÉNÉRIQUE",
"valeur": 2
}]
Parameters
Returns Object
exchange
Remplace tout un objet par un autre (au sens JSON).
Entrée:
[{
"nom": "un",
"valeur": 1
},
{
"nom": "deux",
"valeur": 2
}]
Script:
```ini
[use] plugin = basics
[JSONParse]
[exchange] value = get("nom")
[dump]
Sortie:
["un","deux"]
Ici, 'objet {"nom":"un","valeur":1}
a été remplacé par l'« objet » (au sens
JSON, une chaîne de caractères, tout autant qu'un nombre, constitue un objet)
"un"
.
Note: assign
ne permet pas de remplacer tout l'objet, mais seulement une de
ses propriétés.
Entrée:
[{
"a": "abcdefg", "b": "1234567", "c": "XXXXXXX"
},
{
"a": "abcdefg", "b": "1234567", "c": "XXXXXXX"
}]
Script:
[exchange]
value = omit('c')
Output:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
Ici, on a remplacé un objet avec trois propriétés par le même objet sans la
propriété c
(voir la function
omit
de Lodash).
Parameters
value
String? la valeur de remplacement de l'objet courant
Returns Object
expand
Takes an Object
and substitute a field with the corresponding value found in a external pipeline
the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value
The internal pipeline can expand value with another
[
{ year: 2000, dept: 54 },
{ year: 2001, dept: 55 },
{ year: 2003, dept: 54 },
]
Script:
[use]
plugin = analytics
[expand]
path = dept
file = ./departement.ini
Output:
[
{ year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
{ year: 2001, dept: { id: 55, value: 'Meuse' } },
{ year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
]
Parameters
path
String? the path to substitutesize
Number How many chunk for sending to the external pipeline (optional, default1
)file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errorscacheName
String? Enable cache, with dedicated nametoken
String? add token values in the subpipeline (optional)
Returns Object
extract
Extrait de l'objet courant les valeurs de certains champs, et renvoie directement les valeurs dans le flux de sortie.
Note:
extract
ne peut pas fournir des valeursundefined
ounull
.
Entrée:
[{
"nom": "un",
"valeur": 1,
"important": false
},
{
"nom": "deux",
"valeur": 2,
"important": true
}]
Script:
[use]
plugin = basics
[JSONParse]
[extract]
path = valeur
path = nom
[dump]
Sortie:
[[1,"un"],[2,"deux"]]
Parameters
path
String? chemin d'un champ à extraire
Returns Object
fork
fork the current pipeline
Note: but each chunk is sent to the same external pipeline.
Parameters
standalone
Boolean The current pipeline will be able to end without waiting for the end of the external pipeline (optional, defaultfalse
)file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errorstarget
String choose the key to set with the forked request identifier (optional, defaultx-request-id
)
Returns Object
group
Take all chunk
s, and throw them grouped by length
.
See also ungroup.
[
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h"
]
Script:
[group]
length = 3
Output:
[
[ "a", "b", "c" ],
[ "d", "e", "f" ],
[ "g", "h" ]
]
Parameters
length
Number? Size of each partition
Returns String
identify
Take Object
, and compute & add an identifier
Parameters
data
feed
scheme
String scheme to use (uid or sha) (optional, defaultuid
)path
String path containing the object Identifier (optional, defaulturi
)
Returns String
ignore
Takes all the chunks, and ignore the firtst N chunk
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[ignore]
length = 3
Output:
[{
"a": 4
},
{
"a": 5
}]
Parameters
length
Number? Length of the feed to ignore
Returns any
keep
Throw input Object
but keep only specific fields.
Input file:
[{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
},
{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
}]
Script:
[keep]
path = a
path = b
Output:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
Parameters
path
String? path of field to keep
Returns Object
loop
Loop on external pipeline, until test will be true
Note: works like delegate, but each chunk use its own external pipeline
Parameters
test
String? if test is truereverse
Boolean to reverse the test (optional, defaultfalse
)maxDepth
Number to limit the number of loops (optional, default100000
)file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in an objectcommand
String? the external pipeline is described in an URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errorsfusible
String? Can be set with the ezs server fusible see env('request.fusible')
Returns Object
map
From an array field delegate processing of each items to an external pipeline
Note: works like delegate, but each chunk use its own external pipeline
Parameters
path
String? the path to substitutefile
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in an objectcommand
String? the external pipeline is described in an URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
metrics
- See: ../server/knownPipeline.js
Take Object
, and throw the same object.
This statement will only be used if :
- EZS_METRICS is enabled
- ezs is running in server mode
WARNING: avoid setting bucket to "input" or "output", as these labels are used by ezs. If you do, you risk distorting the associated metrics.
Parameters
pathName
String to identify the script (optional, defaultauto
)bucket
String to identify the moment of measurement (optional, defaultunknow
)
Returns Object
overturn
Takes an Object
and substitute twice a field with the corresponding value found in a external pipeline
the internal pipeline receive a special object { id, value, token } :
- id is the item identifier
- value is the item path value,
- token is an array containing stream id and an number (0 for first time, 1 for the second tme The internal pipeline can overturn value with another.
It's work like [expand] but the second call starts only when all the values of the stream have been sent once
[
{ year: 2000, dept: 'Meuse' },
{ year: 2001, dept: 'Moselle' },
{ year: 2003, dept: 'Vosges'},
]
Script #1:
[overturn]
path = dept
[overturn/assign]
path = value
value = get('value').split('').reverse().join('')
Output:
[
{ year: 2000, dept: 'Meuse' },
{ year: 2001, dept: 'Moselle' },
{ year: 2003, dept: 'Vosges' },
]
Script #2:
[overturn]
path = dept
[overturn/drop]
path = token.1
if = 0
[overturn/assign]
path = value
value = get('value').split('').reverse().join('')
Output:
[
{ year: 2000, dept: 'esueM' },
{ year: 2001, dept: 'ellesoM' },
{ year: 2003, dept: 'segsoV' },
]
Parameters
path
String? the path to overturnsize
Number How many chunk for sending to the external pipeline (optional, default1
)file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like command
Returns Object
pack
Take all Object
, throw encoded String
Returns String
parallel
Takes an Object
delegate processing to X internal pipelines
Parameters
file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
pop
- See: shift
Return the last Object
and close the feed
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[shift]
Output:
[{
"a": 5
}]
Returns Object
remove
Take Object
and remove it from the feed if test is true
Input file:
[{
a: "a"
},
{
a: 2
},
{
a: "b"
},
{
a: 4
},
{
a: "c"
}]
Script:
[remove]
test = get('a).isInteger()
reverse = true
Output:
[
{
a: 2
},
{
a: 4
}
]
Parameters
Returns Object
replace
Take Object
and replace it with a new object with some fields.
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[replace]
path = b.c
value = 'X'
Output:
[{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
}]
Parameters
Returns Object
shift
Return the first Object
and close the feed
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[shift]
Output:
[{
"a": 1
}]
Returns Object
shuffle
Take Object
, shuffle data of the whole object or only some fields specified by path
Input file:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
Script:
[shuffle]
path = a
Output:
[{
"a": "cadbefg",
"b": "1234567"
},
{
"a": "dcaegbf",
"b": "1234567"
}]
Parameters
path
String? path of field to shuffle
Returns Object
singleton
Takes only the first Object
delegate processing to a external pipeline
Parameters
file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in a objectcommand
String? the external pipeline is described in a URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
spawn
Delegate processing to an external pipeline, throw each chunk from the result.
Note: works like delegate, but each chunk use its own external pipeline
Parameters
file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in an objectcommand
String? the external pipeline is described in an URL-like commandlogger
String? A dedicaded pipeline described in a file to trap or log errorscache
String? Use a specific ezs statement to run commands (advanced)
Returns Object
swing
Delegate processing to an external pipeline under specifics conditions
Note: works like spawn, but each chunk shares the same external pipeline.
Parameters
test
String? if test is truereverse
String reverse the test (optional, defaultfalse
)file
String? the external pipeline is described in a filescript
String? the external pipeline is described in a string of characterscommands
String? the external pipeline is described in an objectcommand
String? the external pipeline is described in an URL-likelogger
String? A dedicaded pipeline described in a file to trap or log errors command
Returns Object
throttle
Take Object
and return the same object
[{
{ id: 'x', value: 2 },
{ id: 't', value: 2 },
}]
Script:
[use]
plugin = analytics
[throttle]
bySecond = 2
Output:
[
{ id: 'x', value: 2 },
{ id: 't', value: 2 },
]
Parameters
bySecond
Number Number of object by second (optional, default1
)
Returns Object
time
Measure the execution time of a script, on each chunk of input.
Parameters
script
string?
Examples
Input
[1]
Program
const script = `
[transit]
`;
from([1])
.pipe(ezs('time', { script }))
Output
[{
data: 1,
time: 15 // milliseconds
}]
Returns object
tracer
Take Object
, print a character and throw the same object.
Useful to see the progress in the stream.
Parameters
print
String character to print at each object (optional, default.
)last
String character to print at last call (optional, default.
)first
String character to print at first call (optional, default.
)
Returns Object
transit
Take Object
and throw the same object again.
Input file:
[{
"a": 1
},
{
"a": 2
}]
Script:
[transit]
Output:
[{
"a": 1
},
{
"a": 2
}]
Returns Object
truncate
Takes all the chunks, and closes the feed when the total length is equal to the parameter.
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[truncate]
length = 3
Output:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
}]
Parameters
length
Number? Length of the feed
Returns any
ungroup
Take all chunk
s, and throw one item for every chunk.
See also group.
[
[ "a", "b", "c" ],
[ "d", "e", "f" ],
[ "g", "h" ]
]
Script:
[ungroup]
Output:
[
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h"
]
Returns Array<any>
unpack
Take String
s or Buffer
s and throw Object
builded by JSON.parse on each line.
Returns object
use
Take all String
, concat them and throw just one.
Script:
[use]
plugin = basics
plugin = analytics
Parameters
beginWith
String? Add value at the beginjoinWith
String? use value to join 2 chunkendWith
String? Add value at the end
Returns String
validate
From an Object
, throw the same object if all rules pass
See
Input file:
[{
"a": 1,
"b": "titi"
},
{
"a": 2,
"b": "toto"
},
{
"a": false
},
]
Script:
[validate]
path = a
rule = required|number
path = a
rule = required|string
Output:
[{
"a": 1,
"b": "titi"
},
{
"a": 2,
"b": "toto"
}]
Parameters
Returns Object