diff --git a/worker-server.js b/worker-server.js index 959cde1..26b4151 100644 --- a/worker-server.js +++ b/worker-server.js @@ -1,186 +1,418 @@ const cluster = require('cluster'); const numCPUs = require('os').cpus().length; const net = require('net'); +const fs = require('fs'); +const http = require('http'); +const httpProxy = require('http-proxy'); +const HyperDHTServer = require('hyperdht'); +const b32 = require('hi-base32'); + +// Configuration constants +const CONFIG = { + PORT: 8081, + WORKER_MULTIPLIER: 4, + PROXY_TIMEOUT: 360000, // 6 minutes + TUNNEL_TIMEOUT: 20000, // Reduced to 20 seconds for faster socket cleanup + DEBUG: false, // Keep enabled for diagnostics + CONINFO: false, + MAX_SOCKETS: 500, + MAX_TUNNELS: 1000, // Increased to handle high load + PORT_RANGE_START: 10000, + PORT_RANGE_END: 20000, + CONNECT_RETRY_MS: 100, + MAX_CONNECT_RETRIES: 3, + TUNNEL_FORCE_CLOSE_MS: 10000, // Reduced to 10 seconds for faster cleanup +}; + +// Shared HTTP agent +const agent = new http.Agent({ + maxSockets: CONFIG.MAX_SOCKETS, + keepAlive: true, + keepAliveMsecs: 10000, + timeout: 30000, +}); + +// Static 404 content +const content = fs.readFileSync('404.txt', 'utf8'); if (cluster.isMaster) { - console.log(`Master ${process.pid} is running`); - console.log(`Total Workers ${numCPUs * 6}`); - for (let i = 0; i < numCPUs * 4; i++) { - cluster.fork(); - } + console.log(`Master ${process.pid} is running`); + const workerCount = numCPUs * CONFIG.WORKER_MULTIPLIER; + console.log(`Starting ${workerCount} workers`); - cluster.on('exit', (worker, code, signal) => { - console.log(`Worker ${worker.process.pid} died`); - }); + for (let i = 0; i < workerCount; i++) { + cluster.fork(); + } + + cluster.on('exit', (worker, code, signal) => { + console.log(`Worker ${worker.process.pid} died with code ${code}, signal ${signal}`); + console.log('Starting new worker...'); + cluster.fork(); + }); } else { - const fs = require('fs'); - const http = require('http'); - const httpProxy = require('http-proxy'); - const HyperDHTServer = require('hyperdht'); - const b32 = require("hi-base32"); - const agent = new http.Agent({ maxSockets: Number.MAX_VALUE }); - const content = fs.readFileSync('404.txt', 'utf8'); - const DEBUG = 0; - const CONINFO = 0; - const dhtServer = new HyperDHTServer(); + const dhtServer = new HyperDHTServer(); + const activeTunnels = new Map(); // Map tunnelServer to metadata + const usedPorts = new Set(); - const startServer = async () => { - console.log(`Worker ${process.pid} started`); - await dhtServer.ready(); + const startServer = async () => { + console.log(`Worker ${process.pid} started`); + await dhtServer.ready(); - const proxy = httpProxy.createProxyServer({ - ws: true, - agent: agent, - timeout: 360000 + const proxy = httpProxy.createProxyServer({ + ws: true, + agent, + timeout: CONFIG.PROXY_TIMEOUT, + }); + + const server = http.createServer(async (req, res) => { + let tunnelServer = null; + try { + const split = req.headers.host?.split('.'); + if (!split || split.length < 1) { + res.writeHead(400, { 'Content-Type': 'text/html' }); + res.end('Invalid host header'); + return; + } + const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); + if (publicKey.length < 32) { + console.log('Invalid public key'); + res.writeHead(418, { 'Content-Type': 'text/html' }); + res.end(content); + return; + } + + if (activeTunnels.size >= CONFIG.MAX_TUNNELS) { + throw new Error(`Max tunnels reached (${activeTunnels.size})`); + } + + const port = await getAvailablePort(); + tunnelServer = await createTunnelServer(publicKey, port); + const tunnelId = `tunnel-${port}-${Date.now()}`; + activeTunnels.set(tunnelServer, { id: tunnelId, port, created: Date.now() }); + if (CONFIG.DEBUG) console.log(`Created ${tunnelId}, active tunnels: ${activeTunnels.size}`); + + const forceCloseTimeout = setTimeout(() => { + // console.warn(`Force closing ${tunnelId} after ${CONFIG.TUNNEL_FORCE_CLOSE_MS}ms`); + closeTunnel(tunnelServer); + }, CONFIG.TUNNEL_FORCE_CLOSE_MS); + + await proxyWithRetry(req, res, port, proxy, () => { + clearTimeout(forceCloseTimeout); + closeTunnel(tunnelServer); }); - const server = http.createServer(async function (req, res) { - try { - const split = req.headers.host.split('.'); - const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); + res.on('finish', () => { + clearTimeout(forceCloseTimeout); + closeTunnel(tunnelServer); + }); + res.on('error', (err) => { + if (CONFIG.DEBUG) console.error(`Response error for ${tunnelId}:`, err.message); + clearTimeout(forceCloseTimeout); + closeTunnel(tunnelServer); + }); + } catch (e) { + console.error(`HTTP Request Error for ${tunnelServer ? activeTunnels.get(tunnelServer)?.id : 'unknown'}: ${e.message}`); + if (!res.headersSent) { + res.writeHead(500, { 'Content-Type': 'text/html' }); + res.end('Internal Server Error'); + } + closeTunnel(tunnelServer); + } + }); - if (publicKey.length < 32) { - console.log("Invalid Connection!"); - res.writeHead(418, { 'Content-Type': 'text/html' }); - res.end(content); - return; - } + server.on('upgrade', async (req, socket, head) => { + let tunnelServer = null; + try { + const split = req.headers.host?.split('.'); + if (!split || split.length < 1) { + socket.destroy(); + return; + } + const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); + if (publicKey.length < 32) { + console.log('Invalid public key'); + socket.destroy(); + return; + } - // Create a unique tunnel for each request - const port = await getAvailablePort(); - const tunnelServer = net.createServer(function (servsock) { - const socket = dhtServer.connect(publicKey); - let open = { local: true, remote: true }; + if (activeTunnels.size >= CONFIG.MAX_TUNNELS) { + throw new Error(`Max tunnels reached (${activeTunnels.size})`); + } - servsock.on('data', (d) => socket.write(d)); - socket.on('data', (d) => servsock.write(d)); + const port = await getAvailablePort(); + tunnelServer = await createTunnelServer(publicKey, port); + const tunnelId = `ws-tunnel-${port}-${Date.now()}`; + activeTunnels.set(tunnelServer, { id: tunnelId, port, created: Date.now() }); + if (CONFIG.DEBUG) console.log(`Created ${tunnelId}, active tunnels: ${activeTunnels.size}`); - const remoteend = () => { - if (open.remote) socket.end(); - open.remote = false; - }; - const localend = () => { - if (open.local) servsock.end(); - open.local = false; - }; + const forceCloseTimeout = setTimeout(() => { + console.warn(`Force closing ${tunnelId} after ${CONFIG.TUNNEL_FORCE_CLOSE_MS}ms`); + closeTunnel(tunnelServer); + }, CONFIG.TUNNEL_FORCE_CLOSE_MS); - servsock.on('error', remoteend); - servsock.on('finish', remoteend); - servsock.on('end', remoteend); - socket.on('finish', localend); - socket.on('error', localend); - socket.on('end', localend); - }); - - tunnelServer.listen(port, "127.0.0.1", () => { - if (DEBUG === 1 || CONINFO === 1) console.log(`Tunnel server listening on port ${port}`); - - proxy.web(req, res, { - target: 'http://127.0.0.1:' + port - }, function (e) { - console.log("Proxy Web Error: ", e); - res.writeHead(404, { 'Content-Type': 'text/html' }); - res.end(content); - }); - - // Close tunnel and release resources after response is sent - res.on('finish', () => { - tunnelServer.close(() => { - if (DEBUG === 1 || CONINFO === 1) console.log("Tunnel closed after request completion."); - }); - }); - }); - } catch (e) { - console.error("Error Occurred: ", e); - } + await proxyWsWithRetry(req, socket, head, port, proxy, () => { + clearTimeout(forceCloseTimeout); + closeTunnel(tunnelServer); }); - // Add WebSocket upgrade handling - server.on('upgrade', async function (req, socket, head) { - try { - const split = req.headers.host.split('.'); - const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); - - if (publicKey.length < 32) { - console.log("Invalid Connection!"); - socket.destroy(); - return; - } - - // Create a unique tunnel for each WebSocket connection - const port = await getAvailablePort(); - const tunnelServer = net.createServer(function (servsock) { - const socketConn = dhtServer.connect(publicKey); - let open = { local: true, remote: true }; - - servsock.on('data', (d) => socketConn.write(d)); - socketConn.on('data', (d) => servsock.write(d)); - - const remoteend = () => { - if (open.remote) socketConn.end(); - open.remote = false; - }; - const localend = () => { - if (open.local) servsock.end(); - open.local = false; - }; - - servsock.on('error', remoteend); - servsock.on('finish', remoteend); - servsock.on('end', remoteend); - socketConn.on('finish', localend); - socketConn.on('error', localend); - socketConn.on('end', localend); - }); - - tunnelServer.listen(port, "127.0.0.1", () => { - if (DEBUG === 1 || CONINFO === 1) console.log(`WebSocket tunnel server listening on port ${port}`); - - proxy.ws(req, socket, head, { - target: 'http://127.0.0.1:' + port - }, function (e) { - console.log("Proxy WS Error: ", e); - socket.destroy(); - }); - - // Clean up tunnel after WebSocket disconnects - socket.on('close', () => { - tunnelServer.close(() => { - if (DEBUG === 1 || CONINFO === 1) console.log("WebSocket tunnel closed after disconnect."); - }); - }); - }); - } catch (e) { - console.error("Error Occurred: ", e); - socket.destroy(); - } + socket.on('close', () => { + clearTimeout(forceCloseTimeout); + closeTunnel(tunnelServer); }); - - server.listen(8081, () => { - console.log(`Worker ${process.pid} listening on port 8081`); + socket.on('error', (err) => { + if (CONFIG.DEBUG) console.error(`WebSocket error for ${tunnelId}:`, err.message); + clearTimeout(forceCloseTimeout); + closeTunnel(tunnelServer); }); - }; + } catch (e) { + console.error(`WebSocket Upgrade Error for ${tunnelServer ? activeTunnels.get(tunnelServer)?.id : 'unknown'}: ${e.message}`); + socket.destroy(); + closeTunnel(tunnelServer); + } + }); - startServer().catch(console.error); + server.listen(CONFIG.PORT, () => { + console.log(`Worker ${process.pid} listening on port ${CONFIG.PORT}`); + }); - async function getAvailablePort() { - return new Promise((resolve, reject) => { - const tryPort = () => { - const port = 1337 + Math.floor(Math.random() * 1000); - const tester = net.createServer() - .once('error', (err) => { - if (err.code === 'EADDRINUSE') { - tryPort(); // Port in use, try another - } else { - reject(err); - } - }) - .once('listening', () => { - tester.close(() => resolve(port)); // Port is available - }) - .listen(port, '127.0.0.1'); - }; - tryPort(); - }); + server.on('error', (err) => { + console.error('Server Error:', err.message); + }); + + // Periodic tunnel diagnostics + if (CONFIG.DEBUG) { + setInterval(() => { + if (activeTunnels.size > 0) { + console.log(`Active tunnels: ${activeTunnels.size}`); + for (const [server, meta] of activeTunnels) { + server.getConnections((err, count) => { + if (err) { + console.error(`Error getting connections for ${meta.id}:`, err.message); + } else { + console.log(`- ${meta.id}: port ${meta.port}, age ${Date.now() - meta.created}ms, connections ${count}`); + } + }); + } + } + }, 5000); // Reduced to 5 seconds for faster feedback } + }; + + async function createTunnelServer(publicKey, port) { + return new Promise((resolve, reject) => { + const clientSockets = new Set(); // Track client sockets for cleanup + const tunnelServer = net.createServer((servsock) => { + clientSockets.add(servsock); + let socketConn = null; + try { + socketConn = dhtServer.connect(publicKey); + } catch (e) { + console.error('DHT Connect Error:', e.message); + servsock.destroy(); + clientSockets.delete(servsock); + return; + } + + let open = { local: true, remote: true }; + servsock.setTimeout(CONFIG.TUNNEL_TIMEOUT); + socketConn.setTimeout(CONFIG.TUNNEL_TIMEOUT); + + servsock.on('data', (data) => { + if (open.remote && socketConn) socketConn.write(data); + }); + socketConn.on('data', (data) => { + if (open.local) servsock.write(data); + }); + + const closeRemote = () => { + if (open.remote && socketConn) { + socketConn.end(); + socketConn.destroy(); + open.remote = false; + } + }; + const closeLocal = () => { + if (open.local) { + servsock.end(); + servsock.destroy(); + open.local = false; + clientSockets.delete(servsock); + } + }; + + servsock.on('error', (err) => { + if (CONFIG.DEBUG) console.error('Local Socket Error:', err.message); + closeRemote(); + }); + servsock.on('finish', closeRemote); + servsock.on('end', closeRemote); + servsock.on('timeout', () => { + if (CONFIG.DEBUG) console.log('Local socket timeout'); + closeRemote(); + closeLocal(); + }); + + socketConn.on('error', (err) => { + if (CONFIG.DEBUG) console.error('Remote Socket Error:', err.message); + closeLocal(); + }); + socketConn.on('finish', closeLocal); + socketConn.on('end', closeLocal); + socketConn.on('timeout', () => { + if (CONFIG.DEBUG) console.log('Remote socket timeout'); + closeRemote(); + closeLocal(); + }); + }); + + tunnelServer.on('error', (err) => { + console.error('Tunnel Server Error:', err.message); + closeTunnel(tunnelServer); + reject(err); + }); + + tunnelServer.listen(port, '127.0.0.1', () => { + if (CONFIG.DEBUG) console.log(`Tunnel server listening on port ${port}`); + resolve(tunnelServer); + }); + + // Store clientSockets for cleanup + tunnelServer.__clientSockets = clientSockets; + tunnelServer.unref(); + }); + } + + function closeTunnel(tunnelServer) { + if (tunnelServer && activeTunnels.has(tunnelServer)) { + const meta = activeTunnels.get(tunnelServer); + try { + // Close all client sockets + if (tunnelServer.__clientSockets) { + for (const socket of tunnelServer.__clientSockets) { + try { + socket.end(); + socket.destroy(); + } catch (e) { + if (CONFIG.DEBUG) console.error(`Error closing socket for ${meta.id}:`, e.message); + } + } + tunnelServer.__clientSockets.clear(); + } + + tunnelServer.close(() => { + if (CONFIG.DEBUG) console.log(`Closed ${meta.id}, active tunnels: ${activeTunnels.size - 1}`); + }); + activeTunnels.delete(tunnelServer); + } catch (e) { + console.error(`Close Tunnel Error for ${meta.id}:`, e.message); + } + } + } + + async function proxyWithRetry(req, res, port, proxy, cleanup) { + let retries = 0; + while (retries < CONFIG.MAX_CONNECT_RETRIES) { + try { + await new Promise((resolve, reject) => { + proxy.web(req, res, { target: `http://127.0.0.1:${port}` }, (err) => { + if (err) reject(err); + else resolve(); + }); + }); + return; + } catch (e) { + retries++; + if (CONFIG.DEBUG) console.error(`Proxy Retry ${retries}/${CONFIG.MAX_CONNECT_RETRIES}:`, e.message); + if (retries >= CONFIG.MAX_CONNECT_RETRIES) { + console.error('Proxy Web Error:', e.message); + if (!res.headersSent) { + res.writeHead(404, { 'Content-Type': 'text/html' }); + res.end(content); + } + cleanup(); + return; + } + await new Promise((resolve) => setTimeout(resolve, CONFIG.CONNECT_RETRY_MS)); + } + } + } + + async function proxyWsWithRetry(req, socket, head, port, proxy, cleanup) { + let retries = 0; + while (retries < CONFIG.MAX_CONNECT_RETRIES) { + try { + await new Promise((resolve, reject) => { + proxy.ws(req, socket, head, { target: `http://127.0.0.1:${port}` }, (err) => { + if (err) reject(err); + else resolve(); + }); + }); + return; + } catch (e) { + retries++; + if (CONFIG.DEBUG) console.error(`WS Proxy Retry ${retries}/${CONFIG.MAX_CONNECT_RETRIES}:`, e.message); + if (retries >= CONFIG.MAX_CONNECT_RETRIES) { + console.error('Proxy WS Error:', e.message); + socket.destroy(); + cleanup(); + return; + } + await new Promise((resolve) => setTimeout(resolve, CONFIG.CONNECT_RETRY_MS)); + } + } + } + + async function getAvailablePort() { + return new Promise((resolve, reject) => { + const maxAttempts = 10; + let attempts = 0; + + const tryPort = () => { + attempts++; + const port = CONFIG.PORT_RANGE_START + + Math.floor(Math.random() * (CONFIG.PORT_RANGE_END - CONFIG.PORT_RANGE_START)); + + if (usedPorts.has(port)) { + if (attempts >= maxAttempts) { + reject(new Error('No available ports')); + } else { + setImmediate(tryPort); + } + return; + } + + const tester = net.createServer() + .once('error', (err) => { + if (err.code === 'EADDRINUSE') { + if (attempts >= maxAttempts) { + reject(new Error('No available ports')); + } else { + setImmediate(tryPort); + } + } else { + reject(err); + } + }) + .once('listening', () => { + tester.close(() => { + usedPorts.add(port); + setTimeout(() => usedPorts.delete(port), 5000); + resolve(port); + }); + }) + .listen(port, '127.0.0.1'); + }; + tryPort(); + }); + } + + startServer().catch((err) => { + console.error('Worker Startup Error:', err.message); + process.exit(1); + }); + + process.on('uncaughtException', (err) => { + console.error('Uncaught Exception:', err.message); + process.exit(1); + }); } \ No newline at end of file