RxPort is a library for baking web workers with RxJS
pu# RxPort
The RxPort library provides a simple and efficient way to create communication channels between different threads in your application using RxJS.
npm install rxport rxjs
To create a observable between threads, you need something that will share between threads information about you stream. Class RxPort provides a simple way to do that.
const myObservablePort = new RxPort<string, string>('MyObservable');
Now you can use this port to create a observable between threads.
In one thread you need to create producer part of the observable:
// SharedWorker
const myProducer = myObservablePort.createProducer(self);
Now you can use this producer to response to requests from other threads:
myProducer.subscribe((request: string) => {
console.log(response); // 'Hi Alis'
return of('Hi Bob');
In other thread you need to create consumer part of the observable:
// Main thread
const request = myObservablePort.createRequest(new SharedWorker('shared-worker.js'));
Now you can use this request to create a observable that will send requests to other thread:
request('Hi Alis').subscribe((response: string) => {
console.log(response); // 'Hi Bob'
Destroying producer
If you want to destroy producer, you need to call destroy method:
import {interval} from "rxjs";
const destroy = myProducer.subscribe(() => interval(1000));
If you destroy producer, while there are active requests, they will receive Abort Error;
import {AbortConnectionMessage} from "rxport";
error: (error) => {
if (error.message === AbortConnectionMessage) {
console.log('Producer was aborted');
Lose connection
If you lose connection between threads, you will receive Lose Connection Error:
import {LoseConnectionMessage} from "rxport";
error: (error) => {
if (error.message === LoseConnectionMessage) {
console.log('Connection was lost');
You can meet this error, if create observable from SharedWorker to Window(browser tab), and close window.
Advanced usage
You can create a observable not only between threads, actually you can create a stream any object that implements MessagePort interface.
const channel = new MessageChannel();
const rxPort = new RxPort<string, string>('...');
const producer = rxPort.createProducer(channel.port1);
const request = rxPort.createRequest(channel.port2);