@tsaqib/trex
v0.9.31
Published
Reactive Extension in TypeScript (TRex)
Downloads
58
Readme
Reactive Extension in TypeScript (TRex)
The computations our online activities cause for free applications such as Facebook and Google are incredibly expensive. Even if you had billions of dollars, an optimized and profitable solution squeezing out of those dollars is still very much non-trivial. They build such immense scale applications we use every day on top of the reactive programming paradigm to help them process data only when needed. Only responding to the relevant query, results in massive cost savings. Thus, allowing internet-scale applications to serve us instantly without charging us as much $. Angular, Vue, React, etc. frameworks are also built on top of the reactive programming principle.
This package helps you do functional reactive programming, both on server-side and front-end apps. It helps you define and destroy data streams easily, which works as an event bus. You can subscribe and unsubscribe to the streams. You can perform map, filter and pass on your own operators at the observable-level and process it however you like in the observers.
This package has zero dependencies as everything was written from scratch.
Installation
npm i -S @tsaqib/trex
Examples:
See /tests directory for more examples
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// Your custom data processor; used as a sample here
const workflowEngine = new WorkflowEngine();
const messageObservable = new tx.Observable();
const workflowQueue = new tx.Observer((message) => {
const workflowObservable = new tx.Observable();
const workflowObserver = new tx.Observer(workflowEngine.process);
workflowObservable.emit(message);
});
const notifier = new tx.Observer((message) => {
// TODO: notify office channel
});
const log = (message) => {
// TODO: log all oncoming messages
};
messageObservable
.pipe(
tx.tap(log),
tx.take(10), // Take first 10 messages
tx.filter((m) => validateJSON(m)),
tx.map((m) => JSON.parse(m)),
tx.pluck('message')
)
.multicast(workflowQueue, notifier);
messageObservable.emit('{ "message": "I am unwell.", "to": "#office" }');
API documentations
- @tsaqib/trex
- Classes
- @tsaqib/trex
- Interfaces
@tsaqib/trex
Classes
Class: FilterOperator
This operator runs the data items through the predicate you pass on to it and if it satisfies the predicate, it returns back the item. As a result, the observers get the item on emit.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observer = new tx.Observer(console.log);
observable
.pipe(
tx.map((num) => num * 3),
tx.filter((num) => num > 10)
)
.subscribe(observer);
observable.emit(10);
Hierarchy
↳ FilterOperator
Implements
Index
Methods
Methods
emit
▸ emit(item
: any): void
Overrides OperatorBase.emit
Defined in operators/FilterOperator.ts:32
Applies the specified predicate on the item and returns it when the predicate returns true.
Parameters:
| Name | Type | Description |
| ------ | ---- | ----------- |
| item
| any | The item |
Returns: void
Class: MapOperator
Executes standard 1:1 map function on an incoming item and returns the computed item back.
The default behaviour of the OperatorBase
is MapOperator
. Therefore, this class is
a placeholder.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observer = new tx.Observer(console.log);
observable
.pipe(
tx.map((num) => num * 2),
tx.map((num) => num * 3)
)
.subscribe(observer);
observable.emit(10);
Hierarchy
↳ MapOperator
Implements
Class: Observable
An Observable
listens to the streams of data and passes on to its observers.
You use the subscribe
function to subscribe and emit
function to add a new data to
the stream.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observable = new tx.Observable();
implements
{IObservable}
Hierarchy
Observable
Implements
Index
Constructors
Properties
Methods
Constructors
constructor
+ new Observable(): Observable
Defined in Observable.ts:26
Constructs an Observable
, which is an implementation of IObservable
.
Returns: Observable
Properties
observers
• observers: IObserver[]
Implementation of IObservable.observers
Defined in Observable.ts:25
Optional
pipeHead
• pipeHead? : LinkedList‹IObservable›
Implementation of IObservable.pipeHead
Defined in Observable.ts:26
Methods
destroy
▸ destroy(): void
Implementation of IObservable
Defined in Observable.ts:131
Returns: void
emit
▸ emit(items
: any | any[]): void
Implementation of IObservable
Defined in Observable.ts:68
Parameters:
| Name | Type |
| ------- | ---------------- |
| items
| any | any[] |
Returns: void
multicast
▸ multicast(...observers
: IObserver[]): void
Implementation of IObservable
Defined in Observable.ts:118
Parameters:
| Name | Type |
| -------------- | ------------------------------------- |
| ...observers
| IObserver[] |
Returns: void
pipe
▸ pipe(...observables
: IObservable[]): IObservable
Implementation of IObservable
Defined in Observable.ts:96
Parameters:
| Name | Type |
| ---------------- | ----------------------------------------- |
| ...observables
| IObservable[] |
Returns: IObservable
subscribe
▸ subscribe(observer
: IObserver): void
Implementation of IObservable
Defined in Observable.ts:38
Parameters:
| Name | Type |
| ---------- | ----------------------------------- |
| observer
| IObserver |
Returns: void
unsubscribe
▸ unsubscribe(observer
: IObserver): void
Implementation of IObservable
Defined in Observable.ts:44
Parameters:
| Name | Type |
| ---------- | ----------------------------------- |
| observer
| IObserver |
Returns: void
Class: Observer
The Observer
gives you the basis for an observer. A function or pipe can be passed onto
the constructor or you can subclass the class itself to make your own observer.
implements
{IObservable}
Hierarchy
- Observer
Implements
Index
Constructors
Properties
Constructors
constructor
+ new Observer(next
: function, error?
: undefined | function): Observer
Defined in Observer.ts:12
Constructs an Observer
.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// The Observer is the implementation of IObserver
const observer1 = new tx.Observer(
(item: string) => console.log(item),
(err: any) => console.error(err);
)
const observer2 = new tx.Observer(console.log);
Parameters:
▪ next: function
The function to invoke on data arrival
▸ (item
: any): void
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
▪Optional
error: undefined | function
The error handler function
Returns: Observer
Properties
Optional
error
• error? : undefined | function
Implementation of IObserver.error
Defined in Observer.ts:12
next
• next: function
Implementation of IObserver.next
Defined in Observer.ts:11
Type declaration:
▸ (item
: any): void
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
Class: OperatorBase
This class provides you the basis for your own operators and operators that are included in
this package already. Operators inherit from Observable
, so they have same methods and
properties.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
class Squarer : OperatorBase {
emit (item: number) {
this.observable.emit(item * item);
}
}
Hierarchy
↳ OperatorBase
Implements
Index
Constructors
Properties
Methods
Constructors
constructor
+ new OperatorBase(fn
: function): OperatorBase
Overrides Observable.constructor
Defined in operators/OperatorBase.ts:30
Constructs an OperatorBase
.
** Warning: You should use this only by subclassing.
Parameters:
▪ fn: function
The function to apply to the item
▸ (item
: any): any
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
Returns: OperatorBase
Properties
fn
• fn: function
Defined in operators/OperatorBase.ts:28
Type declaration:
▸ (item
: any): any
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
observable
• observable: IObservable
Defined in operators/OperatorBase.ts:29
Optional
pipeHead
• pipeHead? : LinkedList‹IObservable›
Implementation of IObservable.pipeHead
Overrides Observable.pipeHead
Defined in operators/OperatorBase.ts:30
Methods
emit
▸ emit(item
: any | any[]): void
Implementation of IObservable
Overrides Observable.emit
Defined in operators/OperatorBase.ts:50
Parameters:
| Name | Type |
| ------ | ---------------- |
| item
| any | any[] |
Returns: void
pipe
▸ pipe(...observables
: IObservable[]): never
Implementation of IObservable
Overrides Observable.pipe
Defined in operators/OperatorBase.ts:54
Parameters:
| Name | Type |
| ---------------- | ----------------------------------------- |
| ...observables
| IObservable[] |
Returns: never
subscribe
▸ subscribe(observer
: IObserver): void
Implementation of IObservable
Overrides Observable.subscribe
Defined in operators/OperatorBase.ts:46
Parameters:
| Name | Type |
| ---------- | ----------------------------------- |
| observer
| IObserver |
Returns: void
Class: PluckOperator
This operator returns the specified property of a value.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observable = new tx.Observable();
observable
.pipe(tx.take(1), tx.pluck('email'))
.subscribe(new tx.Observer(console.log));
observable.emit({ name: 'King', email: 'email@kingdom' });
observable.emit({ name: 'Queen', email: 'email@queendom' });
// Output: email@kingdom
Hierarchy
↳ PluckOperator
Implements
Index
Constructors
Properties
Methods
Constructors
constructor
+ new PluckOperator(propName
: string): PluckOperator
Overrides OperatorBase.constructor
Defined in operators/PluckOperator.ts:25
Constructs the PluckOperator
Parameters:
| Name | Type | Default |
| ---------- | ------ | ------- |
| propName
| string | "" |
Returns: PluckOperator
Properties
Private
propName
• propName: string
Defined in operators/PluckOperator.ts:30
Methods
emit
▸ emit(item
: any): void
Overrides OperatorBase.emit
Defined in operators/PluckOperator.ts:43
Emits the property of an item as specified by the propName in the PluckOperator's constructor.
Parameters:
| Name | Type | Description |
| ------ | ---- | ----------- |
| item
| any | The item |
Returns: void
Class: TakeOperator
This operator keeps the count of the items it has encountered and only allow them to pass through as long as it does not exceed a specified total count.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observer = new tx.Observer(console.log);
observable.pipe(tx.take(3)).subscribe(observer);
observable.emit([10, 20, 30, 40, 50, 60]);
// Output:
// 10
// 20
// 30
Hierarchy
↳ TakeOperator
Implements
Index
Constructors
Properties
Methods
Constructors
constructor
+ new TakeOperator(count
: number): TakeOperator
Overrides OperatorBase.constructor
Defined in operators/TakeOperator.ts:27
Constructs the TakeOperator
Parameters:
| Name | Type | Default |
| ------- | ------ | ------- |
| count
| number | 0 |
Returns: TakeOperator
Properties
Private
count
• count: number
Defined in operators/TakeOperator.ts:32
total
• total: number = 0
Defined in operators/TakeOperator.ts:27
Methods
emit
▸ emit(item
: any): void
Overrides OperatorBase.emit
Defined in operators/TakeOperator.ts:43
Emits the item as long as the current count of items doesn't exceed the total allocated
by count
.
Parameters:
| Name | Type | Description |
| ------ | ---- | ----------- |
| item
| any | The item |
Returns: void
Class: TapOperator
This operator runs a data item through the function you pass on to it, but returns the original value.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const square = (num) => num * num;
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.pipe(tx.tap(square)).subscribe(observer);
observable.emit(10);
// Output: 10
Hierarchy
↳ TapOperator
Implements
Index
Methods
Methods
emit
▸ emit(item
: any): void
Overrides OperatorBase.emit
Defined in operators/TapOperator.ts:32
Applies the specified fucntion on the item, but returns the original value.
Parameters:
| Name | Type | Description |
| ------ | ---- | ----------- |
| item
| any | The item |
Returns: void
Class: TxContext
This is an internal class and not meant for public use, maintains internal states
** Warning: You should never use this class.
Hierarchy
- TxContext
Index
Constructors
Properties
Methods
Constructors
Private
constructor
+ new TxContext(): TxContext
Defined in TxContext.ts:19
Returns: TxContext
Properties
Static
maps
▪ maps: ObserverMap[] = []
Defined in TxContext.ts:22
Methods
Static
addMap
▸ addMap(observer
: IObserver, observable
: IObservable, chainHead?
: LinkedList‹IObservable›): void
Defined in TxContext.ts:44
Adds a tuple of observer, observable and the head of the call's linked list.
memberof
TxContext
static
Parameters:
| Name | Type | Description |
| ------------ | ------------------------------------------------------------------ | -------------- |
| observer
| IObserver | The observable |
| observable
| IObservable | - |
| chainHead?
| LinkedList‹IObservable› | - |
Returns: void
Static
getMap
▸ getMap(observer
: IObserver): ObserverMap[] | undefined
Defined in TxContext.ts:60
Gets a tuple list of observer, observable and the head of the call's linked list for a given
IObserver
memberof
TxContext
static
Parameters:
| Name | Type | Description |
| ---------- | ----------------------------------- | ---------------------- |
| observer
| IObserver | The observer to lookup |
Returns: ObserverMap[] | undefined
Static
print
▸ print(): void
Defined in TxContext.ts:31
Returns: void
Static
removeMap
▸ removeMap(map
: ObserverMap): void
Defined in TxContext.ts:71
Removes a tuple for a given ObserverMap
instance
memberof
TxContext
static
Parameters:
| Name | Type |
| ----- | --------------------------- |
| map
| ObserverMap |
Returns: void
@tsaqib/trex
Index
Classes
- FilterOperator
- MapOperator
- Observable
- Observer
- OperatorBase
- PluckOperator
- TakeOperator
- TapOperator
- TxContext
Interfaces
Type aliases
Functions
Type aliases
LinkedList
Ƭ LinkedList: object
Defined in Shorthands.ts:8
Type declaration:
next? : LinkedList‹T›
value: T
ObserverMap
Ƭ ObserverMap: object
Defined in TxContext.ts:5
Type declaration:
chainHead? : LinkedList‹IObservable›
observable: IObservable
observer: IObserver
Functions
Const
filter
▸ filter(fn
: function): FilterOperator‹›
Defined in Shorthands.ts:65
Returns an item only when the specified predicate is true.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observable = new tx.Observable();
const observer = new tx.Observer(
pipe(
filter((num) => num < 15),
(num) => console.log(num * 4)
)
);
observable.subscribe(observer);
observable.emit(10);
observable.emit(20);
// Output: 40
Parameters:
▪ fn: function
The predcate to check with the item
▸ (item
: any): any
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
Returns: FilterOperator‹›
Const
map
▸ map(fn
: function): MapOperator‹›
Defined in Shorthands.ts:37
Executes standard 1:1 map function on an incoming item and returns the computed item back.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observer = new tx.Observer(console.log);
const observable = new tx.Observable();
observable
.pipe(
tx.map((num: number) => num * 2),
tx.map((num: number) => num * 3)
)
.subscribe(observer);
observable.emit(10);
// Output: 60
Parameters:
▪ fn: function
The function to apply on the item
▸ (item
: any): any
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
Returns: MapOperator‹›
Const
pluck
▸ pluck(propName
: string): PluckOperator‹›
Defined in Shorthands.ts:115
Returns the specified property of a value.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observable = new tx.Observable();
observable
.pipe(tx.take(1), tx.pluck('email'))
.subscribe(new tx.Observer(console.log));
observable.emit({ name: 'King', email: 'email@kingdom' });
observable.emit({ name: 'Queen', email: 'email@queendom' });
// Output: email@kingdom
Parameters:
| Name | Type | Description |
| ---------- | ------ | ------------------------------------------------ |
| propName
| string | The name of the property to return from the item |
Returns: PluckOperator‹›
Const
take
▸ take(count
: number): TakeOperator‹›
Defined in Shorthands.ts:90
Returns up to a specified number of items.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observer = new tx.Observer(console.log);
observable.pipe(tx.take(3)).subscribe(observer);
observable.emit([10, 20, 30, 40, 50, 60]);
// Output:
// 10
// 20
// 30
Parameters:
| Name | Type | Description |
| ------- | ------ | ----------------------------------------------------------------- |
| count
| number | The total number of items will be allowed to pass through further |
Returns: TakeOperator‹›
Const
tap
▸ tap(fn
: function): TapOperator‹›
Defined in Shorthands.ts:139
Applies a function, but returns the original item
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const square = (num) => num * num;
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.pipe(tx.tap(square)).subscribe(observer);
observable.emit(10);
// Output: 10
Parameters:
▪ fn: function
The function to apply on the item.
▸ (item
: any): any
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
Returns: TapOperator‹›
Interfaces
Interface: IObservable
The interface behind the Observable
, maintains the contract for all observables.
interface
Hierarchy
- IObservable
Implemented by
Index
Properties
Methods
Properties
observers
• observers: IObserver[]
Defined in IObservable.ts:18
Maintains a list of observers subscribed to the observable.
** Warning: Never touch this in your use.
memberof
IObservable
Optional
pipeHead
• pipeHead? : LinkedList‹IObservable›
Defined in IObservable.ts:90
** Warning: Do not use this. This is an internal pointer for tracking and cleaning up subscriptions.
memberof
IObservable
Methods
Optional
destroy
▸ destroy(): void
Defined in IObservable.ts:163
Destroys an Observable
along with all its subscribers.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// Observer is an implementation of IObservable
const observable = new tx.Observable();
observable.subscribe(new tx.Observer(console.log));
observable.emit(10);
observable.destroy();
memberof
IObservable
Returns: void
emit
▸ emit(item
: any | any[]): void
Defined in IObservable.ts:82
Emits an item to the stream
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// Observer is an implementation of IObservable
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.subscribe(observer);
observable.emit(10);
memberof
IObservable
Parameters:
| Name | Type | Description |
| ------ | ---------------- | ------------------------------------------- |
| item
| any | any[] | The item(s) to stream; can be an array, too |
Returns: void
multicast
▸ multicast(...observers
: IObserver[]): void
Defined in IObservable.ts:143
Subscribes an array of observers in one go, typically followed by a pipe.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
const observable1 = new tx.Observer(console.log);
const observable2 = new tx.Observer((x) => console.log(x * x));
// Observer is an implementation of IObservable
const observable = new tx.Observable();
observable
.pipe(
tx.map((x) => x * 2),
tx.filter((x) => x > 5)
)
.multicast(observer1, observer2);
observable.emit(50);
memberof
IObservable
Parameters:
| Name | Type | Description |
| -------------- | ------------------------------------- | -------------------------------- |
| ...observers
| IObserver[] | An array of Observer
to update |
Returns: void
pipe
▸ pipe(...observables
: IObservable[]): IObservable
Defined in IObservable.ts:115
Pipes a series of operations per item in the stream. All operators must be inside a pipe.
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// Observer is an implementation of IObservable
const observable = new tx.Observable();
observable
.pipe(
tx.map((x) => x * 2),
tx.filter((x) => x > 5)
)
.subscribe(new tx.Observer(console.log));
observable.emit(50);
memberof
IObservable
Parameters:
| Name | Type | Description |
| ---------------- | ----------------------------------------- | --------------------------------------------- |
| ...observables
| IObservable[] | The observables that form a chain of actions. |
Returns: IObservable
subscribe
▸ subscribe(observer
: IObserver): void
Defined in IObservable.ts:39
Subscribes an Observer
instance
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// Observer is an implementation of IObservable
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.subscribe(observer);
observable.emit(10);
memberof
IObservable
Parameters:
| Name | Type | Description |
| ---------- | ----------------------------------- | --------------------------------------------------- |
| observer
| IObserver | The Observer
instance to be subscribed for update |
Returns: void
unsubscribe
▸ unsubscribe(observer
: IObserver): void
Defined in IObservable.ts:61
Unsubscribes an Observer
instance
Basic usage example:
import * as tx from '@tsaqib/trex';
// or CommonJS: const tx = require("@tsaqib/trex");
// Observer is an implementation of IObservable
const observable = new tx.Observable();
const observer = new tx.Observer(console.log);
observable.subscribe(observer);
observable.emit(10);
observable.unsubscribe(observer);
memberof
IObservable
Parameters:
| Name | Type | Description |
| ---------- | ----------------------------------- | --------------------------------------------------- |
| observer
| IObserver | The Observer
instance to be subscribed for update |
Returns: void
Interface: IObserver
The interface behind the Observer
, maintains the contract for all observers.
interface
Hierarchy
- IObserver
Implemented by
Index
Properties
Methods
Properties
next
• next: function
Defined in IObserver.ts:13
Whenever a new item is available in the stream, the next
function is called with that.
param
The item newly arrived in the stream.
memberof
IObserver
Type declaration:
▸ (item
: any): void
Parameters:
| Name | Type |
| ------ | ---- |
| item
| any |
Methods
Optional
error
▸ error(err
: any): void
Defined in IObserver.ts:21
The error handler for the potential exception occured inside the next
function.
memberof
IObserver
Parameters:
| Name | Type | Description |
| ----- | ---- | ----------------- |
| err
| any | The error object. |
Returns: void