rx-extra
v2.1.4
Published
rx-extra extends the library RxJS 5 with extra methods like 'fromNodeReadableStream' and 'splitOnChange'
Downloads
48
Readme
rx-extra
rx-extra
extends the RxJS 5 library with extra methods like fromNodeReadableStream
and splitOnChange
. If you're still using RxJS 4, you can find the compatible version of rx-extra
in the RxJS4 branch.
Install
npm install --save rx-extra
Methods
fromNodeReadableStream
import { Observable } from 'rxjs/Observable';
import 'rx-extra/add/observable/fromNodeReadableStream';
import 'rxjs/add/operator/map';
Observable.fromNodeReadableStream(myStream)
.map(x => objectMode ? x : x.toString())
.subscribe(console.log, console.error);
partitionNested
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rx-extra/add/operator/partitionNested';
let partitioned = Observable.range(1, 20)
.partitionNested(x => x % 2 === 0);
let [evenSource, oddSource] = partitioned;
let [multipleOfSixSource, notMultipleOfSixSource] = partitioned
.partitionNested(x => x % 3 === 0);
multipleOfSixSource
.subscribe(console.log, console.error);
splitOnChange
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/from';
import 'rx-extra/add/operator/splitOnChange';
Observable.from([{
value: 5,
}, {
value: 5,
}, {
value: 6,
}, {
value: 7,
}
}])
.splitOnChange(function(item) {
return item.value;
})
.subscribe(console.log, console.error);
then (Promise)
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rx-extra/add/operator/then';
Observable.range(1, 3)
.then(console.log, console.error);
throughNodeStream
import * as JSONStream from 'JSONStream';
import { Observable } from 'rxjs/Observable';
import 'rx-extra/add/operator/throughNodeStream';
Observable.from([
'[{"a": 1},',
'{"a": 2},{"a":',
'3},',
'{"a": 4}]'
])
.throughNodeStream(JSONStream.parse('..a'))
.subscribe(console.log, console.error);
If the objectMode
option for your transform stream is not true
,
you will need to handle any required conversion(s) between
String|Buffer
and Number|Boolean|Object
, e.g.:
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/map';
import 'rx-extra/add/operator/throughNodeStream';
import * as through2 from 'through2';
Observable.range(1, 3)
.map(x => x.toString())
.throughNodeStream(through2(function(chunk, enc, callback) {
var that = this;
let x = parseInt(chunk.toString());
for (var i=0; i<x; i++) {
that.push(String(x + 1));
}
callback();
}))
.map(x => parseInt(x.toString()))
.subscribe(console.log, console.error);
toNodeCallback
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rx-extra/add/operator/toNodeCallback';
Observable.range(1, 3)
.toNodeCallback(function(err, result) {
if (err) {
throw err;
}
console.log(result);
});