comcat
v1.2.1
Published
Share single connection between multiple browser tabs/windows and more.
Downloads
8
Maintainers
Readme
:cat:Comcat
Share single connection between multiple browser tabs/windows and more.
Introduction
This library is currently aimed to solve a common problem:
I want to consume some messages pushed by the backend, but ...
- I don't want to create a new connection every time I open a new tab/window;
- Or, I want a single tab to receive the messages, and shared them between other tabs.
Comcat
offers some critical features around this topic, including:
- Broadcast messages to all tabs;
- Keep one and only one connection alive across tabs;
With the unique characteristics of SharedWorker
, Comcat
can automatically reconnect if the tab owns the connection is closed or even crashed. If you are keen on how it is accomplished, please refer to How it works.
Disclaimer
Comcat
guarantees eliminating duplicate connections or messages, but does not guarantee the integrity of all incoming messages. That means, messages may be lost in certain edge cases.
If it is a major concern over the message integrity, Comcat
may not be suit to your app. Relevant details are discussed in Caveats.
Get Started
Install
npm install comcat
OR
yarn add comcat
Usage
For demonstration purpose, here we implement a minimal example to share time information across tabs.
//examplePump.ts
import { ComcatPump } from 'comcat';
const POLLING_INTERVAL = 60 * 1000;
let intervalHandler = -1;
/**
* First we create a `pump` to pull messages from the backend.
*/
const pump = new ComcatPump({
category: 'example',
});
/**
* Define how we connect to the backend.
*/
pump.onConnect = () => {
intervalHandler = setInterval(() => {
fetch('http://worldtimeapi.org/api/ip')
.then((res) => {
return res.json();
})
.then((data) => {
/**
* When messages come, push them to the consumer...
* ...with a customized topic.
*/
pump.pump('Time', data.datetime);
pump.pump('Unix', data.unixtime);
});
}, POLLING_INTERVAL);
};
/**
* Define how we disconnect from it.
*/
pump.onDisconnect = () => {
clearInterval(intervalHandler);
};
/**
* Start the engine!
*/
pump.start();
// examplePipe.ts
import { ComcatPipe } from 'comcat';
/**
* Then we create a `pipe` to receive messages from `pump`s.
*/
const pipe = new ComcatPipe({
/**
* Choose the topic we care about...
*/
topic: 'Time',
});
pipe.onMessage = (topic, data) => {
/**
* ...And do something with the messages.
*/
console.log('The current time is: ', data);
};
/**
* Start rolling!
*/
pipe.start();
/**
* Don't forget to dispose the pipe when it is no longer used.
*/
// pipe.stop();
The Concepts
Overview
Pump
Pump
behaves like a "Message Provider". It is responsible for creating the connection to the backend and then feed the messages to the further consumer.
You have full control of how to connect, and disconnect, to the source of the message, often the server. Moreover, it is up to you to determine when and what to send. Comcat
only manages the timing of connection and disconnection for you.
If needed, you can create multiple Pump
s to deal with various sources. Comcat
uses category
to identify different groups of pumps. There is only one active connection in each group, thus the tabs own the active connections may not be the same.
Pipe
Pipe
is the "Message Receiver". It notifies the user when the message arrives, and is meant to be a intermediary between Comcat
and the consumer.
Pipe
provides basic filtering based on topic
, but you can choose whether to accept the incoming message, or even modify the content before
it is pushed to the consumer.
Recipes
The repository contains several examples covering the basic usage. Please see example/README.md
APIs
ComcatPump
The base class for constructing Comcat
pumps.
A typical customized pump looks like this:
// myPump.ts
import { ComcatPump } from 'comcat';
const pump = new ComcatPump({
category: 'MyCategory',
});
pump.onConnect = () => {
/**
* Do the connection here.
*
* ...and some other works maybe
*/
};
pump.onDisconnect = () => {
/**
* Do the disconnection here.
*/
};
new ComcatPump(options)
public constructor(options: ComcatPumpOptions);
interface ComcatPumpOptions {
category: string;
}
category
:
An identifier to classify different message sources.
Each category is coupled with only one type of connection, so you can not create multiple pumps with same category.
ComcatPump.start
public start: () => Promise<boolean>;
Register the pump and try to start the underlying connection.
Because the connection is managed by Comcat
, it may be postponed until scheduled.
Returns true if registry succeeds, or vice versa.
ComcatPump.stop
public stop: () => void;
Close the pump and the underlying connection.
In practice, Comcat
will close the pump when the current tab is closed, so usually you wont need to trigger this by hand.
If somehow you still want to do it yourself, please note that once the pump is closed, it is fully disposed and cannot be started again. In order to restarting a new pump with the same category, instantiate a new ComcatPump
.
ComcatPump.onConnect
public onConnect: () => Promise<boolean> | void;
:warning: The default method is only a placeholder. Always override with your own callback.
Invoked when Comcat
tries to connect to your backend. Basically your connection code goes here.
You can fine-tune the inner behavior by returning a flag indicates whether the connection is successful. If the return value is false
, or an error is raised, Comcat
will either retry the connection after a short period of time, or schedule another tab to do the job. If no value is returned, Comcat
will treat the result as successful anyway.
ComcatPump.onDisconnect
public onDisconnect: () => void;
:warning: The default method is only a placeholder. Always override with your own callback.
Invoked when Comcat
tries to disconnect to your backend. Basically your disconnection code goes here.
Don't permanently dispose anything here, because your pump may be rescheduled connecting again.
ComcatPump.pump
public pump: (topic: string, data: any) => Promise<void>;
Send the message with a specified topic.
topic
:
The category of the message. It is used to help filtering messages in different aspects.
data
:
The content of the message. Can be anything that SharedWorker
supports, but with some restrictions. Please see Transferring data to and from workers: further details.
ComcatPipe
The base class for constructing Comcat
pipes.
A typical customized pipe looks like this:
import { ComcatPipe } from 'comcat';
const pipe = new ComcatPipe({
topic: 'MyTopic',
});
pipe.onMessage = (topic, data) => {
/**
* Do some works with the data.
*/
};
new ComcatPipe(options)
public constructor(options?: ComcatPipeOptions);
interface ComcatPipeOptions {
topic?: string | RegExp;
}
topic
: [optional]
The expected category of the messages. It can be either string
or RegExp
. If applied, the incoming message is filtered unless its topic exactly matches the provided string, or passes the RegExp
test.
ComcatPipe.start
public start: () => Promise<boolean>;
Register the pipe and start listening for the messages from the upstream.
Returns true if registry succeeds, or vice versa.
ComcatPipe.stop
public stop: () => void;
Unregister the pipe and stop listening for the messages.
It is strongly recommended that to prevent potential memory leaks, pipes should be closed immediately when they are no longer in use.
ComcatPipe.onMessage
public onMessage: (topic: string, data: any) => void;
:warning: The default method is only a placeholder. Always override with your own callback.
Invoked when messages arrive.
Note that the messages arrives here have already been filtered by the topic
provided in construction options.
topic
:
The topic of the message;
data
:
The content of the message;
Comcat
Provides global settings that alter how Comcat
works.
Comcat.setMode
public setMode: (mode: 'default' | 'direct' = 'default') => void;
Specify the underlying implementation.
By default Comcat
uses SharedWebworker
to share connection and send messages across tabs/windows. If SharedWebworker
is not supported, Comcat
will fall back to the direct
mode.
When running in direct
Mode, all cross-tab features are disabled. The connection activated by pump
is created per tab. The messages sent by pump
are broadcasted back to the pipes on the same tab. Thus, it behaves just like a normal event bus.
Usually you should just leave it be.
import { Comcat } from 'comcat';
Comcat.setMode('direct');
Comcat.enableDebug
public enableDebug: (flag: boolean) => void;
Determines whether enabling the full debug logging, including inner status and transport information.
Comcat will log every message through the transport, and some basic status information to the console. By default, these output is suppressed.
You can enable the full log like following:
import { Comcat } from 'comcat';
Comcat.enableDebug(true);
Be careful, this may output enormous content. However, the logs from the worker is always complete and not affected by the setting.
How it works
Why SharedWorker
First let's revisit the two main goals we ought to achieve:
- Broadcast messages to all tabs;
- Keep one and only one connection alive across tabs;
Broadcasting alone is a relatively simple task. To send messages between tabs, a bunch of techniques can be adopted:
So the true challenge here is the second part. Ideally we need to implement a certain kind of scheduler that:
- Stay alive as long as at least one tab is running (same origin);
- Can be accessed from all opened tabs with same origin;
- Decide which tab has the right to connect and push messages;
- Recover from the situation where the tab currently owns the connection is closed/freezed;
The mechanism of SharedWorker
fits the first two requirements perfectly. It is running on a separate thread. All tabs with the same origin can access the worker. And the best part is, it will keep alive even if the original spawner of the worker is crashed, as long as at least one tab with the same origin is running.
Furthermore, SharedWorker
provides bi-directional, message-based communication to send data between main thread and the worker. We can easily turn it into a mediate proxy and broadcast the message from one tab to other tabs.
In conclusion, SharedWorker
is the best option makes maintaining and rerouting the connection possible.
The Consensus Problem
Consider it roughly, the situation we are facing is quite similar to distributed systems. For example, when managing a replicated log, the system is always trying to:
- Ensure a unique leader to accept log entries from clients;
- Re-elect if the leader is crashed;
The magic working behind the scene is known as "Consensus Algorithm". With the concept of such algorithm, we can convert our problem to:
- Ensure a unique leader(tab) to connect to a backend and broadcast messages;
- Re-elect if the leader(tab) is closed/freezed;
Among the various algorithms, Raft is relatively easy to understand and implement. However, it is designed for the purpose like building distributed log system, so it is still too complicated against our needs. After all, we don't aim for strict safety. Besides, browser tabs are more likely to be opened and closed frequently, which is quite opposite to a stable cluster.
Eventually, a tailored version of Raft is applied. The distributed part of the algorithm is ditched, since a scheduler working on SharedWorker
plays the role as a centralized coordinator, which can chop down all consensus procedure between tabs. The concept of term
is kept to detect stale leader, thus help quickly recovering from abnormal situations.
Overview
Actor
Actor
is the counterpart of Server
in the original Raft algorithm. It is either in one of the following state:
Leader
- Has full permission to broadcast message;
- Maintains heartbeat every 3 seconds to prevent election timeouts. If failed, convert to
Candidate
;
Candidate
- Initiate an election every 5 seconds; If succeed, convert to
Leader
;
Dealer
Dealer
, aka the scheduler, is responsible for...
- Checking if the election from
Candidate
is valid; - Checking if the heartbeat from
Leader
is valid; - Checking if
Actor
has the permission to broadcast messages;
...according to the certain rules. Details see below.
RPC
The communication between Actor
and Dealer
is proceeded using remote procedure call(RPC), built on top of SharedWorker
messaging mechanism. There are three types of RPCs:
Election RPC
Invoked by Candidate
to attempt to become leader.
interface RaftRequestElect {
// Actor's term
term: number;
// Actor's ID
candidateId: string;
}
interface RaftResponseElect {
// True if election succeed
isGranted: boolean;
// Dealer's term
term: number;
}
Heartbeat RPC
Invoked by Leader
to maintain its authority.
interface RaftRequestHeartbeat {
// Actor's term
term: number;
}
interface RaftResponseHeartbeat {
// True if there's another valid leader
isExpired: boolean;
// Dealer's term
term: number;
}
Message Request RPC
Invoked by Leader
to check permission for broadcasting.
interface RaftRequestMessaging<T> {
// Actor's ID
leaderId: number;
// The message to be sent
message: T;
}
type RaftResponseMessaging = void;
The Algorithm
Election
The election is initiated by Candidate
and issued every 5 seconds.
When the election stage starts:
- [
Actor
] Increase the local term by 1, and send it withElection RPC
toDealer
- [
Dealer
] Check the incoming term(a
) against local term(b
):- If
a > b
, election succeed- Override the local term with the incoming term
- Record the id of current leader
- Accept and reply with incoming term
- Else, election failed
- Reject and reply with the local term
- If
- [
Actor
] Check the reply:- If the election is granted,
- Convert to
Leader
- Start heartbeat loop
- Convert to
- Else,
- Override the local term with the incoming term
- Start election loop
- If the election is granted,
Heartbeat
The heartbeat is initiated by Leader
and issued every 3 seconds.
When the heartbeat stage starts:
- [
Actor
] Increase the local term by 1, and send it withHeartbeat RPC
toDealer
- [
Dealer
] Check the incoming term(a
) against local term(b
):- If
a > b
, heartbeat check succeed- Override the local term with the incoming term
- Accept and reply with incoming term
- Else, heartbeat check failed
- Reject and reply with the local term
- If
- [
Actor
] Check the reply:- If the heartbeat is expired,
- Override the local term with the incoming term
- Convert to
Candidate
- Start election loop
- Else,
- Restart heartbeat loop
- If the heartbeat is expired,
Message Request
The message request is initiated by Leader
and manually invoked from user land.
When the message request stage starts:
- [
Actor
] Send its own id alongMessage Request RPC
toDealer
- [
Dealer
] Check the incoming id(a
) against current leader id(b
):- If
a == b
, request check succeed- Send the message
- Else, request check failed
- If
Caveats
TL;DR: Messages may be lost during the shifting of Leader
s.
Consider the following cases:
Case 1
The tab owns the connection is closed. X seconds later another tab is elected as leader and establishes the connection instead.
Message lost after tab closing is unavoidable. One of our goals is to keep one and one connection alone, so there must be a vacancy period before a new Leader
is elected. A possible way to solve this is to keep multiple connections, but apparently it is off-target here.
Case 2
The tab owns the connection is freezed for a few seconds and then recovered. It is not the leader now, and it is waiting for confirmation from heartbeat request. During that exact period, one or more message requests are issued;
As previously mentioned, if a stale Leader
is tying to send messages, those requests will be instantly failed due to the leader id check. That means, those messages from the stale Leader
will be "ditched" right away. Please be advised that this is intentional.
Supposed that the actual connection we talked about is created by Websocket. While a stale Leader
is waiting for heartbeat request, its connection is being kept open. If the server pushes a message right now, both the stale Leader
and the current Leader
will receive the message, and trying to issue a broadcast at the same time. This causes the same message being sent twice, which breaks the no-duplicate guarantee.
Generating the embedded SharedWorker
Obviously, it is awkward for user to manually copy the distributed worker file to somewhere like public
folder, then provide the instantiated worker as part of startup procedure. Thus, the inlined worker is a much better solution.
If you searched for how to embed workers, such as MDN, it tells you something like this:
var blob = new Blob('Worker code goes here');
var worker = new Worker(window.URL.createObjectURL(blob));
This works well for Worker
(aka Dedicated worker
). But when it comes to SharedWorker
, the object URL will cause serious problem.
According to MDN:
Each time you call createObjectURL(), a new object URL is created, even if you've already created one for the same object.
...which means every tab will generate its unique object URL, even if the worker code is always the same. In conclusion, different tabs will point to different SharedWorker
if they are not instantiated by the same URL.
So what is next?
Again according to WHATWG:
Any same-origin URL (including blob: URLs) can be used. data: URLs can also be used, but they create a worker with an opaque origin.
This shed some light on what URLs can be provided as valid scriptURL
for SharedWorker
constructor. So the final solution is simple: Convert the code into a DataURL.
var url = `data:text/javascript,${code}`;
var worker = new Worker(url);
This technique meets all the desired needs:
- Easily generated and embedded;
- The result URL is always the same as long as the worker code maintains unchanged;
Now each tab can point to the same worker. Problem solved.
Browser Compatibility
Because the core functionality is heavily rely on SharedWorker
, the minimum requirements are aligned with SharedWorker
as follows:
Chrome
: 4Firefox
: 29
You can refer to "Can I Use" for more compatibility details.
Q&A
Any plans for IE suppo...
NO.
Roadmap
- [ ] (Maybe) Extend
Pump
concept to enable sending messages toLeader Pump
, to further support two-way communication sharing single connection; - [ ] Better debug information;