Next.js + SSE
Real-time collaboration with Server-Sent Events and Redis.
Real-time collaboration with Server-Sent Events and Redis.
This guide demonstrates how to integrate OpenOT with Next.js App Router using Server-Sent Events (SSE) and Redis.
Deployment Considerations
SSE requires long-lived connections, which have limitations in different environments:
Traditional Serverless (Vercel, AWS Lambda):
HybridTransport with polling fallbackRecommended Platforms:
This guide focuses on the hybrid approach that works everywhere, gracefully degrading to polling when SSE isn't viable.
npm install @open-ot/core @open-ot/client @open-ot/react @open-ot/server @open-ot/adapter-redis @open-ot/transport-http-sseYou'll need a Redis instance with Pub/Sub support:
Development:
docker run -d -p 6379:6379 redis:alpineProduction:
If you're using Upstash or another Redis provider without Pub/Sub, you'll need to implement polling-based synchronization or use a different backend adapter.
The @open-ot/adapter-redis includes built-in Pub/Sub support for real-time broadcasting.
import { Server } from "@open-ot/server";
import { RedisAdapter } from "@open-ot/adapter-redis";
import { TextType } from "@open-ot/core";
const globalForOT = global as unknown as {
otServer: Server;
redisAdapter: RedisAdapter;
};
// Initialize Redis Adapter (handles both storage and pub/sub)
export const redisAdapter =
globalForOT.redisAdapter ||
new RedisAdapter(process.env.REDIS_URL || "redis://localhost:6379");
// Initialize OpenOT Server
export const otServer = globalForOT.otServer || new Server(redisAdapter);
otServer.registerType(TextType);
if (process.env.NODE_ENV !== "production") {
globalForOT.otServer = otServer;
globalForOT.redisAdapter = redisAdapter;
}
// Initialize demo document (idempotent)
(async () => {
try {
await redisAdapter.createDocument("demo-doc", "text", "Hello Next.js + Redis!");
} catch (e) {
// Document already exists
}
})();The adapter's publish and subscribe methods handle Redis Pub/Sub automatically.
import { NextRequest, NextResponse } from "next/server";
import { otServer, redisAdapter } from "@/lib/ot-server";
// Track SSE clients per document
const clients = new Map<string, Set<ReadableStreamDefaultController>>();
// Track subscriptions per document
const subscriptions = new Map<string, () => void>();
async function ensureSubscription(docId: string) {
if (!subscriptions.has(docId)) {
const unsubscribe = await redisAdapter.subscribe(
`doc:${docId}`,
(message) => {
const data = `data: ${message}\n\n`;
const docClients = clients.get(docId);
if (docClients) {
docClients.forEach((controller) => {
try {
controller.enqueue(new TextEncoder().encode(data));
} catch (e) {
docClients.delete(controller);
}
});
}
}
);
subscriptions.set(docId, unsubscribe);
}
}
export async function GET(req: NextRequest) {
const { searchParams, pathname } = req.nextUrl;
const docId = searchParams.get("docId") || "demo-doc";
// SSE endpoint
if (pathname.endsWith("/events")) {
await ensureSubscription(docId);
const stream = new ReadableStream({
async start(controller) {
if (!clients.has(docId)) {
clients.set(docId, new Set());
}
clients.get(docId)!.add(controller);
// Send initial document state
try {
const doc = await redisAdapter.getRecord(docId);
const initMsg = JSON.stringify({
type: "init",
snapshot: doc.data,
revision: doc.v,
});
controller.enqueue(
new TextEncoder().encode(`data: ${initMsg}\n\n`)
);
} catch (e) {
console.error("Failed to get initial state:", e);
}
// Keepalive every 30s
const interval = setInterval(() => {
try {
controller.enqueue(new TextEncoder().encode(": keepalive\n\n"));
} catch (e) {
clearInterval(interval);
clients.get(docId)?.delete(controller);
}
}, 30000);
},
cancel() {
clients.get(docId)?.delete(controller);
},
});
return new NextResponse(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
}
// Polling endpoint (fallback for HybridTransport)
if (pathname.endsWith("/poll")) {
const sinceRevision = parseInt(searchParams.get("since") || "0");
try {
const doc = await redisAdapter.getRecord(docId);
if (doc.v <= sinceRevision) {
return NextResponse.json({
type: "poll",
hasUpdates: false,
revision: doc.v,
});
}
const ops = await redisAdapter.getHistory(docId, sinceRevision, doc.v);
return NextResponse.json({
type: "poll",
hasUpdates: true,
operations: ops,
revision: doc.v,
});
} catch (error) {
console.error("Polling error:", error);
return NextResponse.json({ error: "Internal error" }, { status: 500 });
}
}
return new NextResponse("Not Found", { status: 404 });
}
export async function POST(req: NextRequest) {
if (req.nextUrl.pathname.endsWith("/messages")) {
try {
const msg = await req.json();
const docId = msg.docId || "demo-doc";
if (msg.type === "op") {
const result = await otServer.submitOperation(
docId,
msg.op,
msg.revision
);
// Broadcast via Redis Pub/Sub
await redisAdapter.publish(
`doc:${docId}`,
JSON.stringify({
type: "op",
op: result.op,
revision: result.revision,
})
);
return NextResponse.json({
success: true,
revision: result.revision,
});
}
} catch (err) {
console.error("Failed to process op:", err);
return NextResponse.json(
{ error: "Failed to process operation" },
{ status: 500 }
);
}
}
return new NextResponse("Not Found", { status: 404 });
}The HybridTransport automatically switches between SSE and polling based on connection stability and user activity. This ensures your app works reliably across all deployment environments.
"use client";
import { useMemo } from "react";
import { useOTClient } from "@open-ot/react";
import { HybridTransport } from "@open-ot/transport-http-sse";
import { TextType } from "@open-ot/core";
export function Editor() {
const transport = useMemo(
() =>
new HybridTransport({
docId: "demo-doc",
baseUrl: "/api/ot",
inactivityTimeout: 2 * 60 * 1000, // Switch to polling after 2min inactive
pollingInterval: 5000, // Poll every 5s when in polling mode
}),
[]
);
const { client, snapshot } = useOTClient({
type: TextType,
initialSnapshot: "Hello Next.js + Redis!",
initialRevision: 0,
transport: transport,
});
const handleChange = (e: React.ChangeEvent<HTMLTextAreaElement>) => {
const newText = e.target.value;
// Naive diff (use fast-diff in production)
if (newText.startsWith(snapshot)) {
const inserted = newText.slice(snapshot.length);
client.applyLocal([{ r: snapshot.length }, { i: inserted }]);
} else if (snapshot.startsWith(newText)) {
const deleted = snapshot.length - newText.length;
client.applyLocal([{ r: newText.length }, { d: deleted }]);
}
};
return (
<div className="space-y-2">
<h2 className="text-lg font-semibold">Collaborative Editor</h2>
<textarea
className="w-full h-64 p-4 border rounded font-mono"
value={snapshot}
onChange={handleChange}
/>
<p className="text-sm text-muted-foreground">
Open in multiple tabs to see real-time sync!
</p>
<p className="text-xs text-muted-foreground">
Connection: {transport.getCurrentMode()}
</p>
</div>
);
}Add to .env.local:
REDIS_URL=redis://localhost:6379
# For Redis Cloud: redis://default:password@redis-xxxxx.cloud.redislabs.com:12345Deploy to platforms that support persistent connections:
Railway:
REDIS_URL from the Redis pluginRender:
REDIS_URL environment variableFly.io:
REDIS_URL in your app secretsfly deployFor edge deployment with persistent connections, see the Cloudflare integration guide.
If deploying to traditional serverless:
HybridTransport will automatically fall back to polling due to connection timeoutsinactivityTimeout: 0 to use polling-only modeFor the best serverless experience, consider using Cloudflare Durable Objects or deploying to a platform with long-lived connection support.
HybridTransport automatically switches to polling