A collection of pure functions which help you 'flow' with streams
Rx tools ·
Rx tools is a collection of pure functions which help you 'flow' with streams
npm install --save @fogwarts/rx-tools
Quick Docs
You would normally use this when you need to map multiple streams to an object. Usually input handling and such
import { CombineLatestTo } from '@fogwarts/rx-tools';
const { CombineLatestTo } = require('@fogwarts/rx-tools');
- Sample
// Let's consider the timer as input events from the user
const name$ = timer(0, 100).pipe(
take(5), map(idx => `name: ${idx}`) // For the sake of simplicity, let's limit us to 5 events
const surname$ = timer(0, 150).pipe(
take(5), map(idx => `surname: ${idx}`)
// We usually want to map such events to an object and do something with it
name: name$,
surname: surname$
}).subscribe(fullName => {
- Output
{ name: 'name: 0', surname: 'surname: 0' }
{ name: 'name: 1', surname: 'surname: 0' }
{ name: 'name: 1', surname: 'surname: 1' }
{ name: 'name: 2', surname: 'surname: 1' }
{ name: 'name: 3', surname: 'surname: 1' }
{ name: 'name: 3', surname: 'surname: 2' }
{ name: 'name: 4', surname: 'surname: 2' }
{ name: 'name: 4', surname: 'surname: 3' }
{ name: 'name: 4', surname: 'surname: 4' }
Used everywhere we want to avoid memory leaks. It encapsulates a cleanup stream and provides two functions: one which allows you to pipe any stream with the cleanup one (a simple takeUntil) and one which allows you to trigger the cleanup stream (has a callback as parameter so you can compose any logic here). Plays very nicely with useEffect
import { WithCleanupStream } from '@fogwarts/rx-tools';
const { WithCleanupStream } = require('@fogwarts/rx-tools');
- Sample
// Let's say that we are working inside a class component (Angular/React) and we want to capture some data
class MyComponent {
private pipeWithCleanup: IPipeWithCleanupStream; // We will cache these as class properties, as to be visible both from init and from destroy
private triggerCleanup: ITriggerCleanupStream;
componentDidMount() { // ngOnInit()
const { pipeWithCleanup, triggerCleanup } = WithCleanupStream(); // Create the helper functions
this.pipeWithCleanup = pipeWithCleanup; // Assign the properties
this.triggerCleanup = triggerCleanup;
const name$ = new Subject(); // The actual data
this.pipeWithCleanup(name$).subscribe(name => {
// Do whatever you want with name and be assured of memory leaks not happening
componentWillUnmount() { // ngOnDestroy()
this.triggerCleanup(); // And trigger cleanup
// Let's say that we are working inside a React function component and we want to capture some data
// This looks way better here
const MyComponent = () => {
useEffect(() => {
const name$ = new Subject(); // And we want to capture some data
const { pipeWithCleanup, triggerCleanup } = WithCleanupStream();
pipeWithCleanup(name$).subscribe(name => {
// Do something with name
return triggerCleanup(() => {
// Cleanup was triggered