const express = require("express"); const app = express(); const http = require("http"); const server = http.createServer(app); const { Server } = require("socket.io"); const io = new Server(server, { cors: { origin: "*", methods: ["GET", "POST"] } }); const mediasoup = require("mediasoup"); // Password setting via environment variable, defaulting to "secret" const BROADCASTER_PASSWORD = process.env.BROADCASTER_PASSWORD; // Default to server IP if run across network; 127.0.0.1 for local testing const ANNOUNCED_IP = process.env.ANNOUNCED_IP || '127.0.0.1'; // --- MEDIASOUP SETUP --- let worker; let router; // State tracking let broadcasterSocketId = null; let producers = {}; // e.g. { video: producer1, audio: producer2 } // Map of socket.id -> { transports: {}, consumers: {} } let clients = {}; // Mediasoup media codecs const mediaCodecs = [ { kind: 'audio', mimeType: 'audio/opus', clockRate: 48000, channels: 2, parameters: { useinbandfec: 1, minptime: 10 } }, { kind: 'video', mimeType: 'video/H264', clockRate: 90000, parameters: { 'packetization-mode': 1, 'profile-level-id': '42e01f', 'level-asymmetry-allowed': 1 } }, { kind: 'video', mimeType: 'video/VP8', clockRate: 90000, parameters: {} } ]; async function startMediasoup() { worker = await mediasoup.createWorker({ logLevel: 'warn', rtcMinPort: 40000, rtcMaxPort: 49999, }); worker.on('died', () => { console.error('mediasoup worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); router = await worker.createRouter({ mediaCodecs }); console.log("Mediasoup router created."); } // Serve static viewer files app.use(express.static("public")); io.on("connection", (socket) => { console.log("a user connected:", socket.id); clients[socket.id] = { transports: {}, consumers: {} }; // --- 1. Router Capabilities --- // Clients need these to initialize their mediasoup-client Device socket.on("getRouterRtpCapabilities", (callback) => { try { callback(router.rtpCapabilities); } catch (e) { callback({ error: e.message }); } }); // --- Broadcaster Auth --- socket.on("broadcaster", (password) => { if (password !== BROADCASTER_PASSWORD) { socket.emit("authError", "Invalid broadcaster password."); return; } broadcasterSocketId = socket.id; console.log("Broadcaster authenticated:", socket.id); socket.broadcast.emit("broadcasterConnected"); }); // --- 2. Create WebRTC Transport --- socket.on("createWebRtcTransport", async ({ direction }, callback) => { try { const transport = await router.createWebRtcTransport({ listenIps: [{ ip: '0.0.0.0', announcedIp: ANNOUNCED_IP }], enableUdp: true, enableTcp: true, preferUdp: true, }); transport.on("dtlsstatechange", dtlsState => { if (dtlsState === "closed") transport.close(); }); transport.on("routerclose", () => transport.close()); // Store the transport server-side tied to this socket clients[socket.id].transports[transport.id] = transport; // Send parameters back to client to create local mirrored transport callback({ id: transport.id, iceParameters: transport.iceParameters, iceCandidates: transport.iceCandidates, dtlsParameters: transport.dtlsParameters, }); } catch (e) { console.error(e); callback({ error: e.message }); } }); // --- 3. Connect Transport --- // Client sends its local DTLS parameters to establish the secure connection socket.on("connectTransport", async ({ transportId, dtlsParameters }, callback) => { try { const transport = clients[socket.id].transports[transportId]; if (!transport) throw new Error("Transport not found"); await transport.connect({ dtlsParameters }); callback(); } catch (e) { console.error(e); callback({ error: e.message }); } }); // --- 4. Produce (Broadcaster sending media TO server) --- socket.on("produce", async ({ transportId, kind, rtpParameters }, callback) => { try { if (socket.id !== broadcasterSocketId) { throw new Error("Only the authenticated broadcaster can produce media."); } const transport = clients[socket.id].transports[transportId]; if (!transport) throw new Error("Transport not found"); const producer = await transport.produce({ kind, rtpParameters }); // Store globally so viewers know what to consume producers[kind] = producer; producer.on("transportclose", () => { producer.close(); }); // Notify ALL existing viewers that a new track is available socket.broadcast.emit("newProducer", { producerId: producer.id, kind: producer.kind }); // Return the ID back to the broadcaster client callback({ id: producer.id }); } catch (e) { console.error(e); callback({ error: e.message }); } }); // --- 5. Consume (Viewers receiving media FROM server) --- socket.on("consume", async ({ transportId, producerId, rtpCapabilities }, callback) => { try { const transport = clients[socket.id].transports[transportId]; if (!transport) throw new Error("Transport not found"); if (!router.canConsume({ producerId, rtpCapabilities })) { throw new Error("Client cannot consume this producer."); } const consumer = await transport.consume({ producerId, rtpCapabilities, paused: true, // important: start paused until client confirms ready }); clients[socket.id].consumers[consumer.id] = consumer; consumer.on("transportclose", () => { consumer.close(); }); consumer.on("producerclose", () => { socket.emit("producerClosed", { consumerId: consumer.id }); consumer.close(); delete clients[socket.id].consumers[consumer.id]; }); callback({ id: consumer.id, producerId: consumer.producerId, kind: consumer.kind, rtpParameters: consumer.rtpParameters, }); } catch (e) { console.error(e); callback({ error: e.message }); } }); // Client says they successfully created the local consumer, we can resume sending RTP socket.on("resumeConsumer", async ({ consumerId }, callback) => { try { const consumer = clients[socket.id].consumers[consumerId]; if (!consumer) throw new Error("Consumer not found"); await consumer.resume(); callback(); } catch (e) { console.error(e); callback({ error: e.message }); } }); // Helper for new viewers: "What streams are currently running?" socket.on("getProducers", (callback) => { // Return array of currently active producer IDs and their kinds const activeProducers = Object.values(producers) .filter(p => !p.closed) .map(p => ({ producerId: p.id, kind: p.kind })); callback(activeProducers); }); // Viewer count: return number of connected viewers (non-broadcaster sockets) socket.on("getViewerCount", (callback) => { const count = Object.keys(clients).filter(id => id !== broadcasterSocketId).length; callback(count); }); socket.on("disconnect", () => { console.log("user disconnected", socket.id); // If the broadcaster disconnected, clean up producers if (socket.id === broadcasterSocketId) { Object.keys(producers).forEach(kind => { producers[kind].close(); }); producers = {}; broadcasterSocketId = null; socket.broadcast.emit("broadcasterDisconnected"); } // Clean up this client's transports and consumers if (clients[socket.id]) { Object.values(clients[socket.id].transports).forEach(t => t.close()); delete clients[socket.id]; } // Notify broadcaster about updated viewer count if (broadcasterSocketId && io.sockets.sockets.get(broadcasterSocketId)) { const count = Object.keys(clients).filter(id => id !== broadcasterSocketId).length; io.to(broadcasterSocketId).emit("viewerCount", count); } }); }); const PORT = process.env.PORT || 3000; startMediasoup().then(() => { server.listen(PORT, () => { console.log(`Mediasoup SFU listening on *:${PORT}`); console.log(`NOTE: If running on a network, set ANNOUNCED_IP to the server's public IP`); }); });