// Import necessary modules for the application const axios = require('axios'); // Axios is used to make HTTP requests to external APIs or services const { exec } = require('child_process'); // exec is used to execute shell commands in a child process const moment = require('moment'); // Moment.js is a library for parsing, validating, manipulating, and formatting dates const fs = require('fs'); // File system module for interacting with the file system const path = require('path'); // Path module for handling and transforming file paths const Tail = require('tail').Tail; // Tail module is used for monitoring log files and reacting to new lines as they are added // Configuration constants const LOG_DIRECTORY = '/dockerData/logs'; // Directory where NGINX logs are stored const BACKEND_URL = 'http://127.0.0.1:3001'; // URL for the backend process that handles log processing const DISCORD_WEBHOOK_URL = 'WEBHOOKURL'; // URL of the Discord webhook for sending alerts and notifications // Environment-dependent configuration const DEBUG = process.env.DEBUG === 'true'; // Enable or disable debug logging based on environment variable const LOG_BUFFER_LIMIT = 15; // Number of log lines to accumulate before sending them to the backend const TIME_LIMIT = 10 * 60 * 1000; // Time interval (in milliseconds) to send logs even if buffer is not full (10 minutes) let logBuffer = []; // Array to store log lines temporarily before sending to backend let logTails = []; // Array to store active Tail instances for each log file being monitored let isSendingLogs = false; // Flag to prevent multiple simultaneous log sending operations // List of IP addresses to ignore in logs (e.g., trusted IPs, public DNS servers) const ignoredIPs = ['1.1.1.1', '1.0.0.1', '8.8.8.8', '8.8.4.4']; // List of IP subnets to ignore, commonly used to filter out traffic from known sources like Cloudflare const ignoredSubnets = [ '173.245.48.0/20', '103.21.244.0/22', '103.22.200.0/22', '103.31.4.0/22', '141.101.64.0/18', '108.162.192.0/18', '190.93.240.0/20', '188.114.96.0/20', '197.234.240.0/22', '198.41.128.0/17', '162.158.0.0/15', '104.16.0.0/13', '104.24.0.0/14', '172.64.0.0/13', '131.0.72.0/22' ]; // List of specific log files to ignore (e.g., specific proxy logs) const ignoredFiles = ['proxy-host-149_access.log', 'proxy-host-2_access.log', 'proxy-host-99_access.log']; // Function to get current timestamp in a formatted string (e.g., 'YYYY-MM-DD HH:mm:ss') const getTimestamp = () => moment().format('YYYY-MM-DD HH:mm:ss'); // Logging functions for different log levels (INFO, WARN, ERROR, SUCCESS, DEBUG) const log = { info: (message) => console.log(`[${getTimestamp()}] [INFO] ${message}`), // Log informational messages warn: (message) => console.log(`[${getTimestamp()}] [WARN] ${message}`), // Log warning messages error: (message) => console.log(`[${getTimestamp()}] [ERROR] ${message}`), // Log error messages success: (message) => console.log(`[${getTimestamp()}] [SUCCESS] ${message}`), // Log success messages debug: (message) => { if (DEBUG) { // Log debug messages only if DEBUG mode is enabled console.log(`[${getTimestamp()}] [DEBUG] ${message}`); } } }; // Function to check if an IP address is in the ignored list or subnets const isIgnoredIP = async (ip) => { if (ignoredIPs.includes(ip)) { return true; // Immediately return true if the IP is in the ignored IPs list } const { default: CIDR } = await import('ip-cidr'); // Dynamically import the ip-cidr module for CIDR range checking return ignoredSubnets.some((subnet) => new CIDR(subnet).contains(ip)); // Check if the IP is within any ignored subnets }; // Function to read and monitor log files in the specified directory using the Tail module const readLogs = () => { log.info('Initiating log reading process...'); // Stop and clear any existing Tail instances logTails.forEach(tail => tail.unwatch()); logTails = []; // Read the directory to get all log files fs.readdir(LOG_DIRECTORY, (err, files) => { if (err) { log.error(`Error reading directory: ${err}`); // Log an error if the directory cannot be read return; } // Filter log files, excluding those in the ignoredFiles list const logFiles = files.filter(file => file.endsWith('.log') && !ignoredFiles.includes(file)); if (logFiles.length === 0) { log.warn(`No log files found in directory: ${LOG_DIRECTORY}`); // Warn if no log files are found return; } log.info(`Found ${logFiles.length} log files to tail.`); // Log the number of log files to be monitored // For each log file, start a new Tail instance to monitor the file logFiles.forEach(file => { const filePath = path.join(LOG_DIRECTORY, file); // Create the full path to the log file log.info(`Starting to read log from: ${filePath}`); // Log the start of monitoring for this file try { const tail = new Tail(filePath); // Create a new Tail instance for the log file // Event listener for new lines added to the log file tail.on('line', async (line) => { if (line.includes('git.ssh.surf')) { log.debug(`Ignoring line involving git.ssh.surf: ${line}`); // Ignore lines related to specific domains return; } log.debug(`Read line: ${line}`); // Debug log for each line read const ipMatch = line.match(/\[Client (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]/); // Regex to extract client IP from the log line if (ipMatch) { const ip = ipMatch[1]; const isIgnored = await isIgnoredIP(ip); // Check if the IP should be ignored if (isIgnored) { log.debug(`Ignored line with IP: ${ip}`); // Debug log for ignored IPs return; } } // Add the line to the log buffer logBuffer.push(line); // If buffer reaches the limit, send logs to the backend if (logBuffer.length >= LOG_BUFFER_LIMIT) { await sendLogsToBackend(); } }); // Event listener for errors in Tail instance tail.on('error', (error) => { log.error(`Tail error: ${error}`); // Log errors that occur while tailing the file }); tail.watch(); // Start watching the log file for new lines log.debug(`Started tailing file: ${filePath}`); // Debug log indicating the file is being monitored logTails.push(tail); // Add the Tail instance to the list of active Tails } catch (ex) { log.error(`Failed to tail file ${filePath}: ${ex}`); // Log any exceptions that occur while starting the Tail } }); }); }; // Function to count the number of tokens in a message using llama-tokenizer-js async function countLlamaTokens(messages) { const llamaTokenizer = await import('llama-tokenizer-js'); // Dynamically import the tokenizer module let totalTokens = 0; // Initialize token counter for (const message of messages) { if (message.role === 'user' || message.role === 'assistant') { const encodedTokens = llamaTokenizer.default.encode(message.content); // Encode message content to count tokens totalTokens += encodedTokens.length; // Accumulate the total number of tokens } } return totalTokens; // Return the total token count } // Function to trim conversation history to fit within token limits async function trimConversationHistory(messages, maxLength, tolerance) { let tokenLength = await countLlamaTokens(messages); // Get the current token length if (tokenLength > maxLength + tolerance) { const diff = tokenLength - (maxLength + tolerance); // Calculate how many tokens need to be removed let removedTokens = 0; // Iterate over the messages in reverse order to remove older messages first for (let i = messages.length - 1; i >= 0; i--) { const message = messages[i]; const messageTokens = await countLlamaTokens([message]); // Count tokens in the current message if (removedTokens + messageTokens <= diff) { messages.splice(i, 1); // Remove the message if it helps reduce the token count sufficiently removedTokens += messageTokens; console.log(`${getTimestamp()} [CLEANUP] ${removedTokens} removed | After Resize: ${await countLlamaTokens(messages)}`); } else { const messagesToRemove = Math.floor(diff / messageTokens); // Determine how many messages need to be removed for (let j = 0; j < messagesToRemove; j++) { messages.splice(i, 1); // Remove the determined number of messages removedTokens += messageTokens; } break; // Exit the loop once enough tokens have been removed } } } } // Function to send accumulated log buffer to the backend server const sendLogsToBackend = async () => { if (logBuffer.length === 0) { log.info('Log buffer is empty, skipping sending to backend'); // Log if there are no logs to send return; } if (isSendingLogs) { log.info('Log sending is already in progress, skipping...'); // Prevent concurrent log sending operations return; } isSendingLogs = true; // Set the flag to indicate logs are being sent log.info('Sending logs to backend...'); // Log the start of the log sending process try { const messages = [{ role: 'user', content: logBuffer.join('\n') }]; // Combine the log buffer into a single message await trimConversationHistory(messages, 2000, 100); // Trim the message if it exceeds token limits const response = await axios.post(BACKEND_URL, { message: messages.map(msg => msg.content).join('\n') }); // Send the logs to the backend // Check the response for any alerts, actions, or reports if (response.data.content.includes('ALERT') || response.data.content.includes('ACTION') || response.data.content.includes('REPORT')) { log.warn('ALERT detected in response'); // Log if an alert is detected const ips = extractIPsFromAlert(response.data.content); // Extract IP addresses from the alert message if (ips.length > 0) { const nonIgnoredIPs = []; for (const ip of ips) { if (await isIgnoredIP(ip)) { log.debug(`Skipping banning for ignored IP: ${ip}`); // Skip banning if the IP is in the ignored list continue; } log.info(`Detected IP for banning: ${ip}`); // Log the IP address that will be banned await banIP(ip); // Execute the ban command for the IP await delay(3000); // Add a 3-second delay between bans to avoid overloading the system nonIgnoredIPs.push(ip); // Keep track of banned IPs that are not ignored } await sendAlertToDiscord(response.data.content, nonIgnoredIPs); // Send the alert message to Discord } else { log.warn('No IPs detected for banning.'); // Log if no IPs were found for banning await sendAlertToDiscord(response.data.content, []); // Still send the alert to Discord, even without IPs } } else if (response.data.content.includes('GENERAL')) { await sendGeneralToDiscord(response.data.content); // Send general information to Discord if present } else { log.info('No alerts detected in response'); // Log if no significant alerts are found log.info(`Response:\n ${response.data.content}`); // Log the response content for review } // Clear the log buffer after successful sending logBuffer = []; log.info('Log buffer cleared'); // Reset the conversation history on the backend to start fresh await resetConversationHistory(); } catch (error) { log.error(`Error sending logs to backend: ${error.message}`); // Log any errors that occur during the process } finally { isSendingLogs = false; // Reset the flag to allow new log sending operations } }; // Function to extract IP addresses from the alert message using a regular expression const extractIPsFromAlert = (message) => { const ipPattern = /\|\|(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\|\|/g; // Regex to find IPs wrapped in ||...|| let matches; const uniqueIPs = new Set(); // Use a Set to store unique IP addresses while ((matches = ipPattern.exec(message)) !== null) { uniqueIPs.add(matches[1]); // Add each found IP to the Set } return Array.from(uniqueIPs); // Convert the Set to an array of unique IPs }; // Function to ban an IP address using a shell command const banIP = (ip) => { return new Promise((resolve, reject) => { log.info(`Banning IP address: ${ip}`); // Log the IP address being banned exec(`/usr/bin/banIP ${ip}`, (error, stdout, stderr) => { if (error) { log.error(`Error banning IP address: ${error.message}`); // Log any errors that occur during the ban reject(error); // Reject the promise if an error occurs return; } if (stderr) { log.warn(`stderr: ${stderr}`); // Log any warnings or errors from the command's stderr } log.success(`IP address ${ip} has been banned`); // Log a success message if the ban was successful resolve(); // Resolve the promise to indicate success }); }); }; // Function to send alert messages to a Discord channel via webhook const sendAlertToDiscord = async (alertMessage, ips) => { log.info('Sending alert to Discord...'); // Log the start of the Discord alert sending process try { await axios.post(DISCORD_WEBHOOK_URL, { embeds: [ { title: 'Alert Detected', // Title for the Discord embed description: alertMessage, // The alert message content color: 15158332, // Red color for alerts fields: ips.filter(ip => !ignoredIPs.includes(ip)).map(ip => ({ name: 'Banned IP', // Field name in the Discord embed value: ip, // The banned IP address inline: true // Display fields inline for better readability })), timestamp: new Date() // Timestamp for when the alert was sent } ] }); log.success('Alert sent to Discord'); // Log a success message if the alert was successfully sent } catch (error) { log.error(`Error sending alert to Discord: ${error.message}`); // Log any errors that occur during the Discord alert sending process } }; // Function to send general information messages to a Discord channel via webhook const sendGeneralToDiscord = async (generalMessage) => { log.info('Sending general information to Discord...'); // Log the start of the general information sending process try { await axios.post(DISCORD_WEBHOOK_URL, { embeds: [ { title: 'General Information', // Title for the Discord embed description: generalMessage, // The general information content color: 3066993, // Blue color for general information timestamp: new Date() // Timestamp for when the information was sent } ] }); log.success('General information sent to Discord'); // Log a success message if the general information was successfully sent } catch (error) { log.error(`Error sending general information to Discord: ${error.message}`); // Log any errors that occur during the Discord general information sending process } }; // Function to reset the conversation history in the backend const resetConversationHistory = async () => { log.info('Resetting conversation history...'); // Log the start of the conversation history reset process try { await axios.post(`${BACKEND_URL.replace('/api/v1/chat', '')}/api/v1/reset-conversation`); // Send a request to reset the conversation history log.success('Conversation history reset'); // Log a success message if the reset was successful } catch (error) { log.error(`Error resetting conversation history: ${error.message}`); // Log any errors that occur during the conversation history reset process } }; // Utility function to introduce a delay between operations, useful for rate limiting const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); // Create a promise that resolves after the specified delay // Start reading logs continuously from the specified directory readLogs(); // Set up an interval to send logs to the backend if buffer limit is reached or every 10 minutes setInterval(sendLogsToBackend, TIME_LIMIT);