diff --git a/worker-server.js b/worker-server.js index 101a811..aebe41c 100644 --- a/worker-server.js +++ b/worker-server.js @@ -1,11 +1,11 @@ const cluster = require('cluster'); const numCPUs = require('os').cpus().length; +const net = require('net'); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); - console.log(`Total Workers ${numCPUs}`) - // Fork workers - for (let i = 0; i < numCPUs; i++) { + console.log(`Total Workers ${numCPUs * 6}`); + for (let i = 0; i < numCPUs * 4; i++) { cluster.fork(); } @@ -18,14 +18,10 @@ if (cluster.isMaster) { const httpProxy = require('http-proxy'); const HyperDHTServer = require('hyperdht'); const b32 = require("hi-base32"); - const net = require("net"); - let invalid = false; - - const tunnels = {}; const agent = new http.Agent({ maxSockets: Number.MAX_VALUE }); const content = fs.readFileSync('404.txt', 'utf8'); - const DEBUG = 0; // Set DEBUG to 1 to enable debug mode - const CONINFO = 0; // Set CONINFO to 1 for a smaller breakdown of connections. + const DEBUG = 0; + const CONINFO = 0; const dhtServer = new HyperDHTServer(); const startServer = async () => { @@ -40,103 +36,65 @@ if (cluster.isMaster) { const server = http.createServer(async function (req, res) { try { - if (DEBUG === 1 || CONINFO === 1) console.log("Incoming HTTP request..."); const split = req.headers.host.split('.'); - if (DEBUG === 1) console.log("Request Headers Host Split: ", split); const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); - if (DEBUG === 1) { - console.log("Public Key Buffer: ", publicKey); - console.log("Length: " + publicKey.length); - } - if (!(publicKey.length >= 32)) { - console.log("Invalid Connection!") + + if (publicKey.length < 32) { + console.log("Invalid Connection!"); res.writeHead(418, { 'Content-Type': 'text/html' }); res.end(content); - invalid = true - } else { - invalid = false - } - if (invalid == true) return - if (!tunnels[publicKey]) { - if (DEBUG === 1 || CONINFO === 1) console.log("No tunnel found for public key, creating new tunnel..."); - const port = await getAvailablePort(); // Get an available port - const server = net.createServer(function (servsock) { - if (DEBUG === 1 || CONINFO === 1) console.log('Incoming Connection, Connecting to:', publicKey.toString('hex')); - if (DEBUG === 1) console.log("Public Key Length: ", publicKey.toString('hex').length); - const socket = dhtServer.connect(publicKey); - - const local = servsock; - let open = { local: true, remote: true }; - local.on('data', (d) => { - if (DEBUG === 1) console.log("Local Data: ", d); - socket.write(d); - }); - socket.on('data', (d) => { - if (DEBUG === 1) console.log("Socket Data: ", d); - local.write(d); - }); - - const remoteend = (type) => { - if (DEBUG === 1) console.log('Local Connection Ended, Ending Remote Connection'); - if (open.remote) socket.end(); - open.remote = false; - }; - - const localend = (type) => { - if (DEBUG === 1) console.log('Remote Connection Ended, Ending Local Connection'); - if (open.local) local.end(); - open.local = false; - }; - - local.on('error', remoteend); - local.on('finish', remoteend); - local.on('end', remoteend); - socket.on('finish', localend); - socket.on('error', localend); - socket.on('end', localend); - }); - - // Check if the port is available - server.listen(port, "127.0.0.1", () => { - if (DEBUG === 1 | CONINFO === 1) console.log(`Tunnel server listening on port ${port}`); - tunnels[publicKey] = port; - if (DEBUG === 1 || CONINFO === 1) console.log("New tunnel created. Public Key:", publicKey, "Port:", port); - proxy.web(req, res, { - target: 'http://127.0.0.1:' + port - }, function (e) { - console.log("Proxy Web Error: ", e); - res.writeHead(404, { 'Content-Type': 'text/html' }); - res.end(content); - }); - }); return; - } else { - if (DEBUG === 1) console.log('Tunnel exists for public key:', publicKey); + } + + // Create a unique tunnel for each request + const port = await getAvailablePort(); + const tunnelServer = net.createServer(function (servsock) { + const socket = dhtServer.connect(publicKey); + let open = { local: true, remote: true }; + + servsock.on('data', (d) => socket.write(d)); + socket.on('data', (d) => servsock.write(d)); + + const remoteend = () => { + if (open.remote) socket.end(); + open.remote = false; + }; + const localend = () => { + if (open.local) servsock.end(); + open.local = false; + }; + + servsock.on('error', remoteend); + servsock.on('finish', remoteend); + servsock.on('end', remoteend); + socket.on('finish', localend); + socket.on('error', localend); + socket.on('end', localend); + }); + + tunnelServer.listen(port, "127.0.0.1", () => { + if (DEBUG === 1 || CONINFO === 1) console.log(`Tunnel server listening on port ${port}`); + proxy.web(req, res, { - target: 'http://127.0.0.1:' + tunnels[publicKey] + target: 'http://127.0.0.1:' + port }, function (e) { console.log("Proxy Web Error: ", e); - res.writeHead(418, { 'Content-Type': 'text/html' }); + res.writeHead(404, { 'Content-Type': 'text/html' }); res.end(content); }); - } + + // Close tunnel and release resources after response is sent + res.on('finish', () => { + tunnelServer.close(() => { + if (DEBUG === 1 || CONINFO === 1) console.log("Tunnel closed after request completion."); + }); + }); + }); } catch (e) { console.error("Error Occurred: ", e); } }); - server.on('upgrade', function (req, socket, head) { - if (DEBUG === 1) console.log('Upgrade Request'); - const split = req.headers.host.split('.'); - const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); - proxy.ws(req, socket, { - target: 'http://127.0.0.1:' + tunnels[publicKey] - }, function (e) { - console.error("Proxy WS Error: ", e); - socket.end(); - }); - }); - server.listen(8081, () => { console.log(`Worker ${process.pid} listening on port 8081`); }); @@ -145,10 +103,23 @@ if (cluster.isMaster) { startServer().catch(console.error); async function getAvailablePort() { - let port; - do { - port = 1337 + Math.floor(Math.random() * 1000); // Generate a random port between 1337 and 2336 - } while (Object.values(tunnels).includes(port)); // Check if the port is already used - return port; + return new Promise((resolve, reject) => { + const tryPort = () => { + const port = 1337 + Math.floor(Math.random() * 1000); + const tester = net.createServer() + .once('error', (err) => { + if (err.code === 'EADDRINUSE') { + tryPort(); // Port in use, try another + } else { + reject(err); + } + }) + .once('listening', () => { + tester.close(() => resolve(port)); // Port is available + }) + .listen(port, '127.0.0.1'); + }; + tryPort(); + }); } }