Files
yass/server.js

285 lines
8.4 KiB
JavaScript

const express = require("express");
const app = express();
const http = require("http");
const server = http.createServer(app);
const { Server } = require("socket.io");
const io = new Server(server, {
cors: {
origin: "*",
methods: ["GET", "POST"]
}
});
const mediasoup = require("mediasoup");
// Password setting via environment variable, defaulting to "secret"
const BROADCASTER_PASSWORD = process.env.BROADCASTER_PASSWORD;
// Default to server IP if run across network; 127.0.0.1 for local testing
const ANNOUNCED_IP = process.env.ANNOUNCED_IP || '127.0.0.1';
// --- MEDIASOUP SETUP ---
let worker;
let router;
// State tracking
let broadcasterSocketId = null;
let producers = {}; // e.g. { video: producer1, audio: producer2 }
// Map of socket.id -> { transports: {}, consumers: {} }
let clients = {};
// Mediasoup media codecs
const mediaCodecs = [
{
kind: 'audio',
mimeType: 'audio/opus',
clockRate: 48000,
channels: 2,
parameters: {
useinbandfec: 1,
minptime: 10
}
},
{
kind: 'video',
mimeType: 'video/H264',
clockRate: 90000,
parameters: {
'packetization-mode': 1,
'profile-level-id': '42e01f',
'level-asymmetry-allowed': 1
}
},
{
kind: 'video',
mimeType: 'video/VP8',
clockRate: 90000,
parameters: {}
}
];
async function startMediasoup() {
worker = await mediasoup.createWorker({
logLevel: 'warn',
rtcMinPort: 40000,
rtcMaxPort: 49999,
});
worker.on('died', () => {
console.error('mediasoup worker died, exiting in 2 seconds... [pid:%d]', worker.pid);
setTimeout(() => process.exit(1), 2000);
});
router = await worker.createRouter({ mediaCodecs });
console.log("Mediasoup router created.");
}
// Serve static viewer files
app.use(express.static("public"));
io.on("connection", (socket) => {
console.log("a user connected:", socket.id);
clients[socket.id] = { transports: {}, consumers: {} };
// --- 1. Router Capabilities ---
// Clients need these to initialize their mediasoup-client Device
socket.on("getRouterRtpCapabilities", (callback) => {
try {
callback(router.rtpCapabilities);
} catch (e) {
callback({ error: e.message });
}
});
// --- Broadcaster Auth ---
socket.on("broadcaster", (password) => {
if (password !== BROADCASTER_PASSWORD) {
socket.emit("authError", "Invalid broadcaster password.");
return;
}
broadcasterSocketId = socket.id;
console.log("Broadcaster authenticated:", socket.id);
socket.broadcast.emit("broadcasterConnected");
});
// --- 2. Create WebRTC Transport ---
socket.on("createWebRtcTransport", async ({ direction }, callback) => {
try {
const transport = await router.createWebRtcTransport({
listenIps: [{ ip: '0.0.0.0', announcedIp: ANNOUNCED_IP }],
enableUdp: true,
enableTcp: true,
preferUdp: true,
});
transport.on("dtlsstatechange", dtlsState => {
if (dtlsState === "closed") transport.close();
});
transport.on("routerclose", () => transport.close());
// Store the transport server-side tied to this socket
clients[socket.id].transports[transport.id] = transport;
// Send parameters back to client to create local mirrored transport
callback({
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
});
} catch (e) {
console.error(e);
callback({ error: e.message });
}
});
// --- 3. Connect Transport ---
// Client sends its local DTLS parameters to establish the secure connection
socket.on("connectTransport", async ({ transportId, dtlsParameters }, callback) => {
try {
const transport = clients[socket.id].transports[transportId];
if (!transport) throw new Error("Transport not found");
await transport.connect({ dtlsParameters });
callback();
} catch (e) {
console.error(e);
callback({ error: e.message });
}
});
// --- 4. Produce (Broadcaster sending media TO server) ---
socket.on("produce", async ({ transportId, kind, rtpParameters }, callback) => {
try {
if (socket.id !== broadcasterSocketId) {
throw new Error("Only the authenticated broadcaster can produce media.");
}
const transport = clients[socket.id].transports[transportId];
if (!transport) throw new Error("Transport not found");
const producer = await transport.produce({ kind, rtpParameters });
// Store globally so viewers know what to consume
producers[kind] = producer;
producer.on("transportclose", () => {
producer.close();
});
// Notify ALL existing viewers that a new track is available
socket.broadcast.emit("newProducer", { producerId: producer.id, kind: producer.kind });
// Return the ID back to the broadcaster client
callback({ id: producer.id });
} catch (e) {
console.error(e);
callback({ error: e.message });
}
});
// --- 5. Consume (Viewers receiving media FROM server) ---
socket.on("consume", async ({ transportId, producerId, rtpCapabilities }, callback) => {
try {
const transport = clients[socket.id].transports[transportId];
if (!transport) throw new Error("Transport not found");
if (!router.canConsume({ producerId, rtpCapabilities })) {
throw new Error("Client cannot consume this producer.");
}
const consumer = await transport.consume({
producerId,
rtpCapabilities,
paused: true, // important: start paused until client confirms ready
});
clients[socket.id].consumers[consumer.id] = consumer;
consumer.on("transportclose", () => {
consumer.close();
});
consumer.on("producerclose", () => {
socket.emit("producerClosed", { consumerId: consumer.id });
consumer.close();
delete clients[socket.id].consumers[consumer.id];
});
callback({
id: consumer.id,
producerId: consumer.producerId,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
});
} catch (e) {
console.error(e);
callback({ error: e.message });
}
});
// Client says they successfully created the local consumer, we can resume sending RTP
socket.on("resumeConsumer", async ({ consumerId }, callback) => {
try {
const consumer = clients[socket.id].consumers[consumerId];
if (!consumer) throw new Error("Consumer not found");
await consumer.resume();
callback();
} catch (e) {
console.error(e);
callback({ error: e.message });
}
});
// Helper for new viewers: "What streams are currently running?"
socket.on("getProducers", (callback) => {
// Return array of currently active producer IDs and their kinds
const activeProducers = Object.values(producers)
.filter(p => !p.closed)
.map(p => ({
producerId: p.id,
kind: p.kind
}));
callback(activeProducers);
});
// Viewer count: return number of connected viewers (non-broadcaster sockets)
socket.on("getViewerCount", (callback) => {
const count = Object.keys(clients).filter(id => id !== broadcasterSocketId).length;
callback(count);
});
socket.on("disconnect", () => {
console.log("user disconnected", socket.id);
// If the broadcaster disconnected, clean up producers
if (socket.id === broadcasterSocketId) {
Object.keys(producers).forEach(kind => {
producers[kind].close();
});
producers = {};
broadcasterSocketId = null;
socket.broadcast.emit("broadcasterDisconnected");
}
// Clean up this client's transports and consumers
if (clients[socket.id]) {
Object.values(clients[socket.id].transports).forEach(t => t.close());
delete clients[socket.id];
}
// Notify broadcaster about updated viewer count
if (broadcasterSocketId && io.sockets.sockets.get(broadcasterSocketId)) {
const count = Object.keys(clients).filter(id => id !== broadcasterSocketId).length;
io.to(broadcasterSocketId).emit("viewerCount", count);
}
});
});
const PORT = process.env.PORT || 3000;
startMediasoup().then(() => {
server.listen(PORT, () => {
console.log(`Mediasoup SFU listening on *:${PORT}`);
console.log(`NOTE: If running on a network, set ANNOUNCED_IP to the server's public IP`);
});
});