Update Worker Server

Isolated Tunnels per Request:

Each HTTP request now generates a unique tunnel server instance. This avoids potential cross-request data confusion by ensuring that no tunnel is reused across multiple requests.
Port Availability Check with getAvailablePort:

Enhanced the getAvailablePort function to check if a randomly generated port is available before using it. The function now creates a temporary server on each port to ensure it’s free, retrying until an available port is found. This prevents EADDRINUSE errors caused by attempting to bind to a port already in use.
Automatic Tunnel Closure:

Each tunnel server (tunnelServer) is configured to close automatically after the corresponding request’s response is completed. The res.on('finish') event handler triggers tunnelServer.close() to release the resources immediately, preventing any chance of accidental reuse.
Removed tunnels Object:

Since each tunnel is now temporary and used for only one request, we removed the need to track tunnels in a tunnels object. This simplifies the code and helps ensure tunnels are created and destroyed within the scope of a single request.
This commit is contained in:
hypermc 2024-11-07 18:49:56 -05:00
parent 6c5587c789
commit f31c52a4f2

View File

@ -1,11 +1,11 @@
const cluster = require('cluster'); const cluster = require('cluster');
const numCPUs = require('os').cpus().length; const numCPUs = require('os').cpus().length;
const net = require('net');
if (cluster.isMaster) { if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`); console.log(`Master ${process.pid} is running`);
console.log(`Total Workers ${numCPUs}`) console.log(`Total Workers ${numCPUs * 6}`);
// Fork workers for (let i = 0; i < numCPUs * 4; i++) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork(); cluster.fork();
} }
@ -18,14 +18,10 @@ if (cluster.isMaster) {
const httpProxy = require('http-proxy'); const httpProxy = require('http-proxy');
const HyperDHTServer = require('hyperdht'); const HyperDHTServer = require('hyperdht');
const b32 = require("hi-base32"); const b32 = require("hi-base32");
const net = require("net");
let invalid = false;
const tunnels = {};
const agent = new http.Agent({ maxSockets: Number.MAX_VALUE }); const agent = new http.Agent({ maxSockets: Number.MAX_VALUE });
const content = fs.readFileSync('404.txt', 'utf8'); const content = fs.readFileSync('404.txt', 'utf8');
const DEBUG = 0; // Set DEBUG to 1 to enable debug mode const DEBUG = 0;
const CONINFO = 0; // Set CONINFO to 1 for a smaller breakdown of connections. const CONINFO = 0;
const dhtServer = new HyperDHTServer(); const dhtServer = new HyperDHTServer();
const startServer = async () => { const startServer = async () => {
@ -40,67 +36,45 @@ if (cluster.isMaster) {
const server = http.createServer(async function (req, res) { const server = http.createServer(async function (req, res) {
try { try {
if (DEBUG === 1 || CONINFO === 1) console.log("Incoming HTTP request...");
const split = req.headers.host.split('.'); const split = req.headers.host.split('.');
if (DEBUG === 1) console.log("Request Headers Host Split: ", split);
const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase())); const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase()));
if (DEBUG === 1) {
console.log("Public Key Buffer: ", publicKey); if (publicKey.length < 32) {
console.log("Length: " + publicKey.length); console.log("Invalid Connection!");
}
if (!(publicKey.length >= 32)) {
console.log("Invalid Connection!")
res.writeHead(418, { 'Content-Type': 'text/html' }); res.writeHead(418, { 'Content-Type': 'text/html' });
res.end(content); res.end(content);
invalid = true return;
} else {
invalid = false
} }
if (invalid == true) return
if (!tunnels[publicKey]) { // Create a unique tunnel for each request
if (DEBUG === 1 || CONINFO === 1) console.log("No tunnel found for public key, creating new tunnel..."); const port = await getAvailablePort();
const port = await getAvailablePort(); // Get an available port const tunnelServer = net.createServer(function (servsock) {
const server = net.createServer(function (servsock) {
if (DEBUG === 1 || CONINFO === 1) console.log('Incoming Connection, Connecting to:', publicKey.toString('hex'));
if (DEBUG === 1) console.log("Public Key Length: ", publicKey.toString('hex').length);
const socket = dhtServer.connect(publicKey); const socket = dhtServer.connect(publicKey);
const local = servsock;
let open = { local: true, remote: true }; let open = { local: true, remote: true };
local.on('data', (d) => {
if (DEBUG === 1) console.log("Local Data: ", d);
socket.write(d);
});
socket.on('data', (d) => {
if (DEBUG === 1) console.log("Socket Data: ", d);
local.write(d);
});
const remoteend = (type) => { servsock.on('data', (d) => socket.write(d));
if (DEBUG === 1) console.log('Local Connection Ended, Ending Remote Connection'); socket.on('data', (d) => servsock.write(d));
const remoteend = () => {
if (open.remote) socket.end(); if (open.remote) socket.end();
open.remote = false; open.remote = false;
}; };
const localend = () => {
const localend = (type) => { if (open.local) servsock.end();
if (DEBUG === 1) console.log('Remote Connection Ended, Ending Local Connection');
if (open.local) local.end();
open.local = false; open.local = false;
}; };
local.on('error', remoteend); servsock.on('error', remoteend);
local.on('finish', remoteend); servsock.on('finish', remoteend);
local.on('end', remoteend); servsock.on('end', remoteend);
socket.on('finish', localend); socket.on('finish', localend);
socket.on('error', localend); socket.on('error', localend);
socket.on('end', localend); socket.on('end', localend);
}); });
// Check if the port is available tunnelServer.listen(port, "127.0.0.1", () => {
server.listen(port, "127.0.0.1", () => { if (DEBUG === 1 || CONINFO === 1) console.log(`Tunnel server listening on port ${port}`);
if (DEBUG === 1 | CONINFO === 1) console.log(`Tunnel server listening on port ${port}`);
tunnels[publicKey] = port;
if (DEBUG === 1 || CONINFO === 1) console.log("New tunnel created. Public Key:", publicKey, "Port:", port);
proxy.web(req, res, { proxy.web(req, res, {
target: 'http://127.0.0.1:' + port target: 'http://127.0.0.1:' + port
}, function (e) { }, function (e) {
@ -108,35 +82,19 @@ if (cluster.isMaster) {
res.writeHead(404, { 'Content-Type': 'text/html' }); res.writeHead(404, { 'Content-Type': 'text/html' });
res.end(content); res.end(content);
}); });
// Close tunnel and release resources after response is sent
res.on('finish', () => {
tunnelServer.close(() => {
if (DEBUG === 1 || CONINFO === 1) console.log("Tunnel closed after request completion.");
});
}); });
return;
} else {
if (DEBUG === 1) console.log('Tunnel exists for public key:', publicKey);
proxy.web(req, res, {
target: 'http://127.0.0.1:' + tunnels[publicKey]
}, function (e) {
console.log("Proxy Web Error: ", e);
res.writeHead(418, { 'Content-Type': 'text/html' });
res.end(content);
}); });
}
} catch (e) { } catch (e) {
console.error("Error Occurred: ", e); console.error("Error Occurred: ", e);
} }
}); });
server.on('upgrade', function (req, socket, head) {
if (DEBUG === 1) console.log('Upgrade Request');
const split = req.headers.host.split('.');
const publicKey = Buffer.from(b32.decode.asBytes(split[0].toUpperCase()));
proxy.ws(req, socket, {
target: 'http://127.0.0.1:' + tunnels[publicKey]
}, function (e) {
console.error("Proxy WS Error: ", e);
socket.end();
});
});
server.listen(8081, () => { server.listen(8081, () => {
console.log(`Worker ${process.pid} listening on port 8081`); console.log(`Worker ${process.pid} listening on port 8081`);
}); });
@ -145,10 +103,23 @@ if (cluster.isMaster) {
startServer().catch(console.error); startServer().catch(console.error);
async function getAvailablePort() { async function getAvailablePort() {
let port; return new Promise((resolve, reject) => {
do { const tryPort = () => {
port = 1337 + Math.floor(Math.random() * 1000); // Generate a random port between 1337 and 2336 const port = 1337 + Math.floor(Math.random() * 1000);
} while (Object.values(tunnels).includes(port)); // Check if the port is already used const tester = net.createServer()
return port; .once('error', (err) => {
if (err.code === 'EADDRINUSE') {
tryPort(); // Port in use, try another
} else {
reject(err);
}
})
.once('listening', () => {
tester.close(() => resolve(port)); // Port is available
})
.listen(port, '127.0.0.1');
};
tryPort();
});
} }
} }