const cluster = require('cluster'); const numCPUs = require('os').cpus().length; const net = require('net'); const fs = require('fs'); const http = require('http'); const httpProxy = require('http-proxy'); const HyperDHTServer = require('hyperdht'); const b32 = require('hi-base32'); // Configuration constants const CONFIG = { PORT: 8081, WORKER_MULTIPLIER: 4, PROXY_TIMEOUT: 360000, // 6 minutes TUNNEL_TIMEOUT: 3600000, // Increased to 1 hour for long-lived WebSocket connections DEBUG: false, // Keep enabled for diagnostics CONINFO: false, MAX_SOCKETS: 500, MAX_TUNNELS: 1000, // Increased to handle high load PORT_RANGE_START: 10000, PORT_RANGE_END: 20000, CONNECT_RETRY_MS: 100, MAX_CONNECT_RETRIES: 3, TUNNEL_FORCE_CLOSE_MS: 10000, // Used for HTTP requests }; // Shared HTTP agent const agent = new http.Agent({ maxSockets: CONFIG.MAX_SOCKETS, keepAlive: true, keepAliveMsecs: 10000, timeout: 30000, }); // Static 404 content const content = fs.readFileSync('404.txt', 'utf8'); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); const workerCount = numCPUs * CONFIG.WORKER_MULTIPLIER; console.log(`Starting ${workerCount} workers`); for (let i = 0; i < workerCount; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} died with code ${code}, signal ${signal}`); console.log('Starting new worker...'); cluster.fork(); }); } else { const dhtServer = new HyperDHTServer(); const activeTunnels = new Map(); // Map tunnelServer to metadata const usedPorts = new Set(); const startServer = async () => { console.log(`Worker ${process.pid} started`); await dhtServer.ready(); const proxy = httpProxy.createProxyServer({ ws: true, agent, timeout: CONFIG.PROXY_TIMEOUT, }); const server = http.createServer(async (req, res) => { let tunnelServer = null; try { const split = req.headers.host?.split('.'); if (!split || split.length < 1) { res.writeHead(400, { 'Content-Type': 'text/html' }); res.end('Invalid host header'); return; } const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); if (publicKey.length < 32) { console.log('Invalid public key'); res.writeHead(418, { 'Content-Type': 'text/html' }); res.end(content); return; } if (activeTunnels.size >= CONFIG.MAX_TUNNELS) { throw new Error(`Max tunnels reached (${activeTunnels.size})`); } const port = await getAvailablePort(); tunnelServer = await createTunnelServer(publicKey, port); const tunnelId = `tunnel-${port}-${Date.now()}`; activeTunnels.set(tunnelServer, { id: tunnelId, port, created: Date.now() }); if (CONFIG.DEBUG) console.log(`Created ${tunnelId}, active tunnels: ${activeTunnels.size}`); const forceCloseTimeout = setTimeout(() => { // console.warn(`Force closing ${tunnelId} after ${CONFIG.TUNNEL_FORCE_CLOSE_MS}ms`); closeTunnel(tunnelServer); }, CONFIG.TUNNEL_FORCE_CLOSE_MS); await proxyWithRetry(req, res, port, proxy, () => { clearTimeout(forceCloseTimeout); closeTunnel(tunnelServer); }); res.on('finish', () => { clearTimeout(forceCloseTimeout); closeTunnel(tunnelServer); }); res.on('error', (err) => { if (CONFIG.DEBUG) console.error(`Response error for ${tunnelId}:`, err.message); clearTimeout(forceCloseTimeout); closeTunnel(tunnelServer); }); } catch (e) { console.error(`HTTP Request Error for ${tunnelServer ? activeTunnels.get(tunnelServer)?.id : 'unknown'}: ${e.message}`); if (!res.headersSent) { res.writeHead(500, { 'Content-Type': 'text/html' }); res.end('Internal Server Error'); } closeTunnel(tunnelServer); } }); server.on('upgrade', async (req, socket, head) => { let tunnelServer = null; try { const split = req.headers.host?.split('.'); if (!split || split.length < 1) { socket.destroy(); return; } const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); if (publicKey.length < 32) { console.log('Invalid public key'); socket.destroy(); return; } if (activeTunnels.size >= CONFIG.MAX_TUNNELS) { throw new Error(`Max tunnels reached (${activeTunnels.size})`); } const port = await getAvailablePort(); tunnelServer = await createTunnelServer(publicKey, port); const tunnelId = `ws-tunnel-${port}-${Date.now()}`; activeTunnels.set(tunnelServer, { id: tunnelId, port, created: Date.now() }); if (CONFIG.DEBUG) console.log(`Created ${tunnelId}, active tunnels: ${activeTunnels.size}`); await proxyWsWithRetry(req, socket, head, port, proxy, () => { closeTunnel(tunnelServer); }); socket.on('close', () => { closeTunnel(tunnelServer); }); socket.on('error', (err) => { if (CONFIG.DEBUG) console.error(`WebSocket error for ${tunnelId}:`, err.message); closeTunnel(tunnelServer); }); } catch (e) { console.error(`WebSocket Upgrade Error for ${tunnelServer ? activeTunnels.get(tunnelServer)?.id : 'unknown'}: ${e.message}`); socket.destroy(); closeTunnel(tunnelServer); } }); server.listen(CONFIG.PORT, () => { console.log(`Worker ${process.pid} listening on port ${CONFIG.PORT}`); }); server.on('error', (err) => { console.error('Server Error:', err.message); }); // Periodic tunnel diagnostics if (CONFIG.DEBUG) { setInterval(() => { if (activeTunnels.size > 0) { console.log(`Active tunnels: ${activeTunnels.size}`); for (const [server, meta] of activeTunnels) { server.getConnections((err, count) => { if (err) { console.error(`Error getting connections for ${meta.id}:`, err.message); } else { console.log(`- ${meta.id}: port ${meta.port}, age ${Date.now() - meta.created}ms, connections ${count}`); } }); } } }, 5000); // Reduced to 5 seconds for faster feedback } }; async function createTunnelServer(publicKey, port) { return new Promise((resolve, reject) => { const clientSockets = new Set(); // Track client sockets for cleanup const tunnelServer = net.createServer((servsock) => { clientSockets.add(servsock); let socketConn = null; try { socketConn = dhtServer.connect(publicKey); } catch (e) { console.error('DHT Connect Error:', e.message); servsock.destroy(); clientSockets.delete(servsock); return; } let open = { local: true, remote: true }; servsock.setTimeout(CONFIG.TUNNEL_TIMEOUT); socketConn.setTimeout(CONFIG.TUNNEL_TIMEOUT); servsock.on('data', (data) => { if (open.remote && socketConn) socketConn.write(data); }); socketConn.on('data', (data) => { if (open.local) servsock.write(data); }); const closeRemote = () => { if (open.remote && socketConn) { socketConn.end(); socketConn.destroy(); open.remote = false; } }; const closeLocal = () => { if (open.local) { servsock.end(); servsock.destroy(); open.local = false; clientSockets.delete(servsock); } }; servsock.on('error', (err) => { if (CONFIG.DEBUG) console.error('Local Socket Error:', err.message); closeRemote(); }); servsock.on('finish', closeRemote); servsock.on('end', closeRemote); servsock.on('timeout', () => { if (CONFIG.DEBUG) console.log('Local socket timeout'); closeRemote(); closeLocal(); }); socketConn.on('error', (err) => { if (CONFIG.DEBUG) console.error('Remote Socket Error:', err.message); closeLocal(); }); socketConn.on('finish', closeLocal); socketConn.on('end', closeLocal); socketConn.on('timeout', () => { if (CONFIG.DEBUG) console.log('Remote socket timeout'); closeRemote(); closeLocal(); }); }); tunnelServer.on('error', (err) => { console.error('Tunnel Server Error:', err.message); closeTunnel(tunnelServer); reject(err); }); tunnelServer.listen(port, '127.0.0.1', () => { if (CONFIG.DEBUG) console.log(`Tunnel server listening on port ${port}`); resolve(tunnelServer); }); // Store clientSockets for cleanup tunnelServer.__clientSockets = clientSockets; tunnelServer.unref(); }); } function closeTunnel(tunnelServer) { if (tunnelServer && activeTunnels.has(tunnelServer)) { const meta = activeTunnels.get(tunnelServer); try { // Close all client sockets if (tunnelServer.__clientSockets) { for (const socket of tunnelServer.__clientSockets) { try { socket.end(); socket.destroy(); } catch (e) { if (CONFIG.DEBUG) console.error(`Error closing socket for ${meta.id}:`, e.message); } } tunnelServer.__clientSockets.clear(); } tunnelServer.close(() => { if (CONFIG.DEBUG) console.log(`Closed ${meta.id}, active tunnels: ${activeTunnels.size - 1}`); }); activeTunnels.delete(tunnelServer); } catch (e) { console.error(`Close Tunnel Error for ${meta.id}:`, e.message); } } } async function proxyWithRetry(req, res, port, proxy, cleanup) { let retries = 0; while (retries < CONFIG.MAX_CONNECT_RETRIES) { try { await new Promise((resolve, reject) => { proxy.web(req, res, { target: `http://127.0.0.1:${port}` }, (err) => { if (err) reject(err); else resolve(); }); }); return; } catch (e) { retries++; if (CONFIG.DEBUG) console.error(`Proxy Retry ${retries}/${CONFIG.MAX_CONNECT_RETRIES}:`, e.message); if (retries >= CONFIG.MAX_CONNECT_RETRIES) { console.error('Proxy Web Error:', e.message); if (!res.headersSent) { res.writeHead(404, { 'Content-Type': 'text/html' }); res.end(content); } cleanup(); return; } await new Promise((resolve) => setTimeout(resolve, CONFIG.CONNECT_RETRY_MS)); } } } async function proxyWsWithRetry(req, socket, head, port, proxy, cleanup) { let retries = 0; while (retries < CONFIG.MAX_CONNECT_RETRIES) { try { await new Promise((resolve, reject) => { proxy.ws(req, socket, head, { target: `http://127.0.0.1:${port}` }, (err) => { if (err) reject(err); else resolve(); }); }); return; } catch (e) { retries++; if (CONFIG.DEBUG) console.error(`WS Proxy Retry ${retries}/${CONFIG.MAX_CONNECT_RETRIES}:`, e.message); if (retries >= CONFIG.MAX_CONNECT_RETRIES) { console.error('Proxy WS Error:', e.message); socket.destroy(); cleanup(); return; } await new Promise((resolve) => setTimeout(resolve, CONFIG.CONNECT_RETRY_MS)); } } } async function getAvailablePort() { return new Promise((resolve, reject) => { const maxAttempts = 10; let attempts = 0; const tryPort = () => { attempts++; const port = CONFIG.PORT_RANGE_START + Math.floor(Math.random() * (CONFIG.PORT_RANGE_END - CONFIG.PORT_RANGE_START)); if (usedPorts.has(port)) { if (attempts >= maxAttempts) { reject(new Error('No available ports')); } else { setImmediate(tryPort); } return; } const tester = net.createServer() .once('error', (err) => { if (err.code === 'EADDRINUSE') { if (attempts >= maxAttempts) { reject(new Error('No available ports')); } else { setImmediate(tryPort); } } else { reject(err); } }) .once('listening', () => { tester.close(() => { usedPorts.add(port); setTimeout(() => usedPorts.delete(port), 5000); resolve(port); }); }) .listen(port, '127.0.0.1'); }; tryPort(); }); } startServer().catch((err) => { console.error('Worker Startup Error:', err.message); process.exit(1); }); process.on('uncaughtException', (err) => { console.error('Uncaught Exception:', err.message); process.exit(1); }); }