WebSocket Transport
Low-latency bidirectional real-time communication using WebSockets.
Low-latency bidirectional real-time communication using WebSockets.
The @open-ot/transport-websocket package provides a TransportAdapter implementation using WebSockets for low-latency, bidirectional real-time communication.
npm install @open-ot/transport-websocket wsThe ws package is only required for Node.js environments. Browser
environments use the native WebSocket API.
WebSocket transport is ideal for:
import { } from "@open-ot/transport-websocket";
import type { Message } from "./types";
const = new <Message>("ws://localhost:3000");
// Connect and start receiving messages
await .(() => {
.("Received:", .);
});
// Send a message
await .({
: "op",
: [{ : "Hello" }],
: 0,
});
// Cleanup
await .();import { } from "ws";
const = new ({ : 3000 });
.("connection", () => {
.("Client connected");
.("message", () => {
const = .(.());
.("Received:", );
// Echo back to client
.(.({ : "ack" }));
});
.("close", () => {
.("Client disconnected");
});
});import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
//Parameters:
url: WebSocket server URL
"ws://localhost:3000""wss://api.example.com"connect()Establishes the WebSocket connection and registers the message handler.
import { } from "@open-ot/transport-websocket";
const = new <string>("ws://localhost:3000");
await .((message) => { .("Received:", );
});Parameters:
onReceive: Callback function invoked for each received messageReturns: Promise<void> that resolves when connection is established
Throws: Error if connection fails
send()Sends a message to the server.
import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
await .(() => {});
await .({
: "op",
: [{ : "Hello" }],
: 5,
});Parameters:
msg: Message object (automatically JSON-encoded)Returns: Promise<void>
Throws: Error if transport is disconnected
disconnect()Closes the WebSocket connection and cleans up resources.
import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
await .(() => {});
// Later...
await .();Returns: Promise<void>
The transport manages connection state internally:
import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
// State: Disconnected
.("Initial state: disconnected");
try {
await .(() => {
.();
});
// State: Connected
.("Now connected");
await .({ : "ping" });
// State: Still connected
} catch () {
// State: Disconnected (connection failed)
.("Connection failed:", );
} finally {
await .();
// State: Disconnected
.("Cleanly disconnected");
}The base WebSocketTransport doesn't include automatic reconnection. Implement reconnection logic at the application level:
import { } from "@open-ot/transport-websocket";
class < = unknown> {
private transport: <>; private : string;
private : number;
private : number;
private = 0;
constructor(: string, = 5, = 1000) {
this. = ;
this. = ;
this. = ;
this. = new <>();
}
async (: (: ) => void): <void> {
while (this. < this.) {
try {
await this..();
this. = 0; // Reset on success
return;
} catch () {
this.++;
.(`Retry ${this.}/${this.}`);
if (this. >= this.) {
throw new ("Max reconnection attempts reached");
}
await this.(this. * this.);
}
}
}
async send(: ): <void> { return this..();
}
async (): <void> {
return this..();
}
private (: number): <void> {
return new (() => (, ));
}
}All messages are automatically JSON-encoded/decoded.
{
"type": "op",
"op": [{ "i": "Hello" }],
"revision": 5
}Acknowledgment:
{
"type": "ack"
}Operation broadcast:
{
"type": "op",
"op": [{ "i": " World" }],
"revision": 6
}import { } from "ws";
import { } from "@open-ot/server";
import { } from "@open-ot/server";
import { } from "@open-ot/core";
const = new ();
const = new ();
.();
await .("doc-1", "text", "");
const = new ({ : 3000 });
.("connection", () => {
.("message", async () => {
const = .(.());
if (.type === "op") {
try {
const = await .(
"doc-1",
.op,
.revision
);
// Acknowledge to sender
.(
.({
: "ack",
: .,
})
);
// Broadcast to all other clients
const = .({
: "op",
: .,
: .,
});
..(() => {
if ( !== && . === 1) {
.();
}
});
} catch () {
.(
.({
: "error",
: instanceof ? . : "Unknown error",
})
);
}
}
});
});
.("WebSocket server listening on port 3000");For horizontal scaling, use Redis Pub/Sub to synchronize across server instances:
import { } from "ws";
import { } from "@open-ot/server";
import { } from "@open-ot/adapter-redis";
import { } from "@open-ot/core";
const = new (..!);
const = new ();
.();
const = new ({ : 3000 });
// Subscribe to Redis pub/sub for cross-instance communication
const = new <string, () => void>();
async function (: string) {
if (!.()) {
const = await .(
`doc:${}`,
() => {
// Broadcast to all clients on this instance
..(() => {
if (. === 1) {
.();
}
});
}
);
.(, );
}
}
.("connection", async () => {
await ("doc-1");
.("message", async () => {
const = .(.());
if (.type === "op") {
try {
const = await .(
.docId || "doc-1",
.op,
.revision
);
// Acknowledge to sender
.(
.({
: "ack",
: .,
})
);
// Publish to Redis (will broadcast to all instances)
await .(
`doc:${.docId || "doc-1"}`,
.({
: "op",
: .,
: .,
})
);
} catch () {
.(
.({
: "error",
: instanceof ? . : "Unknown error",
})
);
}
}
});
});Always use encrypted WebSockets in production:
// Browser - automatically determine protocol
const = .. === "https:" ? "wss:" : "ws:";
const = ..;
const = new (
await import("@open-ot/transport-websocket")
).(`${}//${}`);Server configuration with TLS:
import { } from "ws";
import { } from "https";
import { } from "fs";
const = ({
: ("/path/to/cert.pem"),
: ("/path/to/key.pem"),
});
const = new ({ });
.(443);Prevent idle connection timeouts with periodic pings:
import { } from "ws";
const = new ({ : 3000 });
.("connection", () => {
let = true;
.("pong", () => {
= true;
});
const = (() => {
if (!) {
.();
return;
}
= false;
.();
}, 30000); // Ping every 30 seconds
.("close", () => {
();
});
});Protect your server from excessive messages:
import { } from "ws";
const = new ({ : 3000 });
interface RateLimitInfo {
: number;
: number;
}
const = new <any, RateLimitInfo>();
const = 100; // messages
const = 60000; // per 60 seconds
.("connection", () => {
.(, {
: 0,
: .() + ,
});
.("message", () => {
const = .()!;
const = .();
// Reset if window expired
if ( > .) {
. = 0;
. = + ;
}
.++;
if (. > ) {
.(
.({
: "error",
: "Rate limit exceeded",
})
);
return;
}
// Process message...
});
.("close", () => {
.();
});
});Authenticate WebSocket connections:
import { } from "ws";
import { } from "http";
const = new ({ : true });
const = (await import("http")).();
.("upgrade", (: , , ) => {
// Extract token from query string or headers
const = new (.!, `http://${..}`);
const = ..("token");
if (!()) {
.("HTTP/1.1 401 Unauthorized\r\n\r\n");
.();
return;
}
.(, , , () => {
.("connection", , );
});
});
function (: string | null): boolean {
// Implement your auth logic
return === "valid-token";
}
.(3000);import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
try {
await .(() => {
.();
});
} catch () {
if ( instanceof ) {
// Handle specific error types
if (..("ECONNREFUSED")) {
.("Server is not running");
} else if (..("timeout")) {
.("Connection timeout");
} else {
.("Connection failed:", .);
}
}
}import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
await .(() => {});
try {
await .({
: "op",
: [{ : "Hello" }],
: 5,
});
} catch () {
// Transport disconnected or send failed
.("Send failed:", );
// Queue message for retry or notify user
}Browsers have native WebSocket support:
import { } from "@open-ot/transport-websocket";
// Uses native browser WebSocket API
const = new ("wss://api.example.com");Node.js requires the ws package:
import { } from "@open-ot/transport-websocket";
// Uses 'ws' package under the hood
const = new ("ws://localhost:3000");| Feature | WebSocket | SSE |
|---|---|---|
| Latency | 10-50ms | 50-200ms |
| Bidirectional | ✅ Native | ❌ Requires separate HTTP |
| Protocol Overhead | Lower | Higher |
| Browser Support | Excellent | Excellent |
| Serverless | ❌ No | ✅ Yes (with limitations) |
| Reconnection | Manual | Built into EventSource |
import { } from "ws";
const = new ({ : 3000 });
.("connection", (, ) => {
const = ..;
.(`[${}] Connected`);
.("message", () => {
.(`[${}] Received:`, .());
});
.("close", (, ) => {
.(`[${}] Disconnected: ${} ${}`);
});
.("error", () => {
.(`[${}] Error:`, );
});
});import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
// Wrap connect to log state changes
const = ..();
. = async () => {
.("Connecting...");
try {
await ();
.("Connected successfully");
} catch () {
.("Connection failed:", );
throw ;
}
};
// Same for send
const = ..();
. = async () => {
.("Sending:", );
await ();
};If you're migrating from HttpSseTransport:
import { } from "@open-ot/transport-http-sse";
const = new ("http://localhost:3000", {
: "/events",
: "/messages",
});
await .(() => {
.();
});
await .({ : "op", : [], : 0 });import { } from "@open-ot/transport-websocket";
const = new ("ws://localhost:3000");
await .(() => {
.();
});
await .({ : "op", : [], : 0 });Server changes: