let events = require(`events`) let fs = require('fs') let path = require('path') // const environment = process.env['NODE_ENV'] || 'development' class devNull { info() { }; error() { }; }; class Tail extends events.EventEmitter { constructor(filename, options = {}) { super(); this.filename = filename; this.absPath = path.dirname(this.filename); this.separator = (options.separator !== undefined) ? options.separator : /[\r]{0,1}\n/;// null is a valid param this.fsWatchOptions = options.fsWatchOptions || {}; this.follow = options['follow'] != undefined ? options['follow'] : true; this.logger = options.logger || new devNull(); this.useWatchFile = options.useWatchFile || false; this.flushAtEOF = options.flushAtEOF || false; this.encoding = options.encoding || 'utf-8'; const fromBeginning = options.fromBeginning || false; this.nLines = options.nLines || undefined; this.logger.info(`Tail starting...`) this.logger.info(`filename: ${this.filename}`); this.logger.info(`encoding: ${this.encoding}`); try { fs.accessSync(this.filename, fs.constants.F_OK); } catch (err) { if (err.code == 'ENOENT') { throw err } } this.buffer = ''; this.internalDispatcher = new events.EventEmitter(); this.queue = []; this.isWatching = false; this.pos = 0; // this.internalDispatcher.on('next',this.readBlock); this.internalDispatcher.on('next', () => { this.readBlock(); }); this.logger.info(`fromBeginning: ${fromBeginning}`); let startingCursor; if (fromBeginning) { startingCursor = 0; } else if (this.nLines !== undefined) { const data = fs.readFileSync(this.filename, { flag: 'r', encoding: this.encoding }); const tokens = data.split(this.separator); const dropLastToken = (tokens[tokens.length - 1] === '') ? 1 : 0;//if the file ends with empty line ignore line NL if (tokens.length - this.nLines - dropLastToken <= 0) { //nLines is bigger than avaiable tokens: tail from the begin startingCursor = 0; } else { const match = data.match(new RegExp(`(?:[^\r\n]*[\r]{0,1}\n){${tokens.length - this.nLines - dropLastToken}}`)); startingCursor = (match && match.length) ? Buffer.byteLength(match[0], this.encoding) : this.latestPosition(); } } else { startingCursor = this.latestPosition(); } if (startingCursor === undefined) throw new Error("Tail can't initialize."); const flush = fromBeginning || (this.nLines != undefined); try { this.watch(startingCursor, flush); } catch (err) { this.logger.error(`watch for ${this.filename} failed: ${err}`); this.emit("error", `watch for ${this.filename} failed: ${err}`); } } latestPosition() { try { return fs.statSync(this.filename).size; } catch (err) { this.logger.error(`size check for ${this.filename} failed: ${err}`); this.emit("error", `size check for ${this.filename} failed: ${err}`); throw err; } } readBlock() { if (this.queue.length >= 1) { const block = this.queue[0]; if (block.end > block.start) { let stream = fs.createReadStream(this.filename, { start: block.start, end: block.end - 1, encoding: this.encoding }); stream.on('error', (error) => { this.logger.error(`Tail error: ${error}`); this.emit('error', error); }); stream.on('end', () => { let _ = this.queue.shift(); if (this.queue.length > 0) { this.internalDispatcher.emit('next'); } if (this.flushAtEOF && this.buffer.length > 0) { this.emit('line', this.buffer); this.buffer = ""; } }); stream.on('data', (d) => { if (this.separator === null) { this.emit("line", d); } else { this.buffer += d; let parts = this.buffer.split(this.separator); this.buffer = parts.pop(); for (const chunk of parts) { this.emit("line", chunk); } } }); } } } change() { let p = this.latestPosition() if (p < this.currentCursorPos) {//scenario where text is not appended but it's actually a w+ this.currentCursorPos = p } else if (p > this.currentCursorPos) { this.queue.push({ start: this.currentCursorPos, end: p }); this.currentCursorPos = p if (this.queue.length == 1) { this.internalDispatcher.emit("next"); } } } watch(startingCursor, flush) { if (this.isWatching) return; this.logger.info(`filesystem.watch present? ${fs.watch != undefined}`); this.logger.info(`useWatchFile: ${this.useWatchFile}`); this.isWatching = true; this.currentCursorPos = startingCursor; //force a file flush is either fromBegining or nLines flags were passed. if (flush) this.change(); if (!this.useWatchFile && fs.watch) { this.logger.info(`watch strategy: watch`); this.watcher = fs.watch(this.filename, this.fsWatchOptions, (e, filename) => { this.watchEvent(e, filename); }); } else { this.logger.info(`watch strategy: watchFile`); fs.watchFile(this.filename, this.fsWatchOptions, (curr, prev) => { this.watchFileEvent(curr, prev) }); } } rename(filename) { //TODO //MacOS sometimes throws a rename event for no reason. //Different platforms might behave differently. //see https://nodejs.org/api/fs.html#fs_fs_watch_filename_options_listener //filename might not be present. //https://nodejs.org/api/fs.html#fs_filename_argument //Better solution would be check inode but it will require a timeout and // a sync file read. if (filename === undefined || filename !== this.filename) { this.unwatch(); if (this.follow) { this.filename = path.join(this.absPath, filename); this.rewatchId = setTimeout((() => { try { this.watch(this.currentCursorPos); } catch (ex) { this.logger.error(`'rename' event for ${this.filename}. File not available anymore.`); this.emit("error", ex); } }), 1000); } else { this.logger.error(`'rename' event for ${this.filename}. File not available anymore.`); this.emit("error", `'rename' event for ${this.filename}. File not available anymore.`); } } else { // this.logger.info("rename event but same filename") } } watchEvent(e, evtFilename) { try { if (e === 'change') { this.change(); } else if (e === 'rename') { this.rename(evtFilename); } } catch (err) { this.logger.error(`watchEvent for ${this.filename} failed: ${err}`); this.emit("error", `watchEvent for ${this.filename} failed: ${err}`); } } watchFileEvent(curr, prev) { if (curr.size > prev.size) { this.currentCursorPos = curr.size; //Update this.currentCursorPos so that a consumer can determine if entire file has been handled this.queue.push({ start: prev.size, end: curr.size }); if (this.queue.length == 1) { this.internalDispatcher.emit("next"); } } } unwatch() { if (this.watcher) { this.watcher.close(); } else { fs.unwatchFile(this.filename); } if (this.rewatchId) { clearTimeout(this.rewatchId); this.rewatchId = undefined; } this.isWatching = false; this.queue = [];// TODO: is this correct behaviour? if (this.logger) { this.logger.info(`Unwatch ${this.filename}`); } } } exports.Tail = Tail