HTTP + SSE Transport
Deep dive into Server-Sent Events transport with adaptive polling fallback.
Deep dive into Server-Sent Events transport with adaptive polling fallback.
The @open-ot/transport-http-sse package provides two transport adapters: a basic SSE transport and a sophisticated hybrid transport that intelligently switches between SSE and HTTP polling.
npm install @open-ot/transport-http-sseThis package exports two transport implementations:
HttpSseTransportA simple, unidirectional Server-Sent Events transport. Messages flow from server to client via SSE, while client-to-server communication uses HTTP POST.
HybridTransportAn intelligent transport that combines SSE and HTTP polling with automatic mode switching based on:
The basic SSE transport is ideal when you have reliable long-lived connections.
import { } from "@open-ot/transport-http-sse";
const = new ("http://localhost:3000", {
: "/events", // SSE endpoint path
: "/messages", // POST endpoint path
: {
// Optional custom headers
: "Bearer token",
},
});
//import { } from "@open-ot/transport-http-sse";
import type { Message } from "./types";
const = new <Message>("http://localhost:3000");
// Connect and start receiving messages
await .(() => {
.("Received:", .);
});
// Send operations to server
await .({
: "op",
: [{ : "Hello" }],
: 0,
});
// Cleanup
await .();Authentication and custom headers are supported for the POST endpoint:
import { } from "@open-ot/transport-http-sse";
const = new ("http://localhost:3000", {
: {
: "Bearer eyJhbGc...",
"X-Client-ID": "client-123",
},
});Standard EventSource doesn't support custom headers for SSE connections. Use
cookies or query parameters for SSE authentication.
import { } from "@open-ot/transport-http-sse";
const = new ("http://localhost:3000");
try {
await .(() => {
.();
});
} catch () {
.("Failed to establish SSE connection:", );
}
// Sending errors
try {
await .({ : "op", : [], : 0 });
} catch () {
.("Failed to send operation:", );
// Handle send failure (retry, queue, etc.)
}The hybrid transport provides production-grade resilience with automatic fallback strategies.
import { } from "@open-ot/transport-http-sse";
const = new ({
// Required
: "document-123",
// Optional - Connection settings
: "/api/ot", // Default: '/api/ot'
: {
// Custom headers for requests
"X-Session-ID": "abc123",
},
// Optional - Inactivity detection
: 120000, // Default: 2 minutes (120000ms)
: 5000, // Default: 5 seconds (5000ms)
// Optional - Reconnection strategy
: 5, // Default: 5
: 1000, // Default: 1000ms (with exponential backoff)
});
//The transport operates in one of three modes:
// Check current mode
const = .();
//
.(`Current mode: ${}`);The transport automatically switches modes based on various conditions:
Switches from SSE to Polling when:
inactivityTimeout milliseconds of no activityimport { } from "@open-ot/transport-http-sse";
const = new ({
: "doc-1",
: 120000, // Switch after 2 minutes inactive
});{
"type": "timeout",
"suggestPolling": true,
"message": "Connection timeout - please reconnect or switch to polling"
}maxReconnectAttempts consecutive failuresimport { } from "@open-ot/transport-http-sse";
const = new ({
: "doc-1",
: 5, // Fall back to polling after 5 failures
});Switches from Polling to SSE when:
User activity is detected (sending any operation):
import { } from "@open-ot/transport-http-sse";
const = new ({ : "doc-1" });
// Currently in polling mode due to inactivity
// User makes an edit...
await .({
: "op",
: [{ : "Hello" }],
: 5,
});
// Automatically switches back to SSE modeExponential Backoff Reconnection:
The transport retries SSE connections with increasing delays:
import { } from "@open-ot/transport-http-sse";
const = new ({
: "doc-1",
: 1000, // Base delay
: 5,
});
// Reconnection timeline:
// Attempt 1: Wait 1000ms (1s)
// Attempt 2: Wait 2000ms (2s)
// Attempt 3: Wait 3000ms (3s)
// Attempt 4: Wait 4000ms (4s)
// Attempt 5: Wait 5000ms (5s)
// After 5 failures: Switch to pollingYou can manually override the automatic mode switching:
import { } from "@open-ot/transport-http-sse";
const = new ({ : "doc-1" });
await .(() => .());
// Force switch to SSE mode
await .();
// Force switch to polling mode
.();
// Check current mode
const = .();
//
.(`Mode: ${}`);When in polling mode, the transport:
pollingInterval)import { } from "@open-ot/transport-http-sse";
const = new ({
: "doc-1",
: 3000, // Poll every 3 seconds
});
// Makes requests to: GET /api/ot/poll?docId=doc-1&since={currentRevision}Expected server response format:
{
"type": "poll",
"hasUpdates": true,
"operations": [
{ "op": [{ "i": "Hello" }], "revision": 6 },
{ "op": [{ "i": " World" }], "revision": 7 }
],
"revision": 7
}The transport considers a user "active" when:
send()import { } from "@open-ot/transport-http-sse";
const = new ({
: "doc-1",
: 60000,
});
await .(() => .());
// User types something after being inactive
await .({
: "op",
: [{ : "a" }],
: 10,
});
// ✓ Inactivity timer resets
// ✓ If in polling mode, switches to SSEBoth transports require specific server endpoints.
// GET /api/ot/events?docId={docId}
// Returns: text/event-stream
const stream = new ReadableStream({
start(controller) {
// Send initial state
controller.enqueue(
new TextEncoder().encode(
`data: ${JSON.stringify({
type: "init",
snapshot: documentSnapshot,
revision: currentRevision,
})}\n\n`
)
);
// Subscribe to updates and stream them
// Send keepalive comments every 30s
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});// POST /api/ot/messages
// Body: { docId, type, op, revision }
const { docId, type, op, revision } = await request.json();
if (type === "op") {
const result = await otServer.submitOperation(docId, op, revision);
// Broadcast to all clients
await broadcastUpdate(docId, {
type: "op",
op: result.op,
revision: result.revision,
});
return json({ success: true, revision: result.revision });
}// GET /api/ot/poll?docId={docId}&since={revision}
const docId = params.get("docId");
const sinceRevision = parseInt(params.get("since") || "0");
const doc = await getDocument(docId);
if (doc.revision <= sinceRevision) {
return json({
type: "poll",
hasUpdates: false,
revision: doc.revision,
});
}
const operations = await getOperationsSince(docId, sinceRevision);
return json({
type: "poll",
hasUpdates: true,
operations: operations,
revision: doc.revision,
});import { } from "@open-ot/transport-http-sse";
class {
private : ;
private : NodeJS. | null = null;
constructor(: string) {
this. = new ({
,
: 3,
});
}
async (: (: unknown) => void) {
try {
await this..();
.(`Connected in ${this..()} mode`);
} catch () {
.("Connection failed:", );
this.();
}
}
private (: (: unknown) => void) {
this. = (() => {
.("Attempting to reconnect...");
this.();
}, 5000);
}
async () {
if (this.) {
(this.);
}
await this..();
}
}import { } from "@open-ot/transport-http-sse";
const = new ({ : "doc-1" });
// Poll mode status
let = .();
const = (() => {
const = .();
if ( !== ) {
.(`Mode changed: ${} → ${}`);
= ;
// Update UI, analytics, etc.
();
}
}, 1000);
function (: string) {
// Update connection status in UI
// 'sse' = green (realtime)
// 'polling' = yellow (delayed)
// 'disconnected' = red (offline)
}import { } from "@open-ot/transport-http-sse";
// For active editing (low latency)
const = new ({
: "doc-1",
: 60000, // 1 minute
: 3000, // 3 seconds when polling
});
// For viewing/reading (lower server load)
const = new ({
: "doc-1",
: 10000, // 10 seconds
: 10000, // 10 seconds when polling
});Enable verbose logging for troubleshooting:
import { } from "@open-ot/transport-http-sse";
const = new ({
: "doc-1",
});
// The transport logs mode changes to console:
// "SSE connected"
// "User inactive, switching to polling to save costs..."
// "Activity detected, switching back to SSE..."
// "Max reconnect attempts reached, switching to polling"