# -*- coding: utf-8 -*- """Group Chat Logger This bot is a modified version of the echo2 bot found here: https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/echobot2.py This bot logs all messages sent in a Telegram Group to a database. """ #from __future__ import print_function import os import sys import re import unidecode import locale import traceback from time import strftime from datetime import datetime, timedelta import requests import telegram from telegram.ext import Updater, CommandHandler, MessageHandler, Filters from model import User, Message, MessageHide, UserBan, session from mwt import MWT from googletrans import Translator from textblob import TextBlob # Used with monetary formatting locale.setlocale(locale.LC_ALL, '') # Price data cache duration CACHE_DURATION = timedelta(minutes=15) # CMC IDs can be retrived at: # https://pro-api.coinmarketcap.com/v1/cryptocurrency/map?symbol=[SYMBOL] CMC_SYMBOL_TO_ID = { 'OGN': 5117, 'USDT': 825, 'USDC': 3408, 'DAI': 4943, } CMC_API_KEY = os.environ.get('CMC_API_KEY') CMC_USD_QUOTE_URL = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest?id={}' CMC_BTC_QUOTE_URL = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest?id={}&convert=BTC' def first_of(attr, match, it): """ Return the first item in a set with an attribute that matches match """ if it is not None: for i in it: try: if getattr(i, attr) == match: return i except: pass return None def command_from_message(message, default=None): """ Extracts the first command from a Telegram Message """ if not message or not message.text: return default command = None text = message.text entities = message.entities command_def = first_of('type', 'bot_command', entities) if command_def: command = text[command_def.offset:command_def.length] return command or default def cmc_get_data(jso, cmc_id, pair_symbol='USD'): """ Pull relevant data from a response object """ if not jso: return None data = jso.get('data', {}) specific_data = data.get(str(cmc_id), {}) quote = specific_data.get('quote', {}) symbol_data = quote.get(pair_symbol, {}) return { 'price': symbol_data.get('price'), 'volume': symbol_data.get('volume_24h'), 'percent_change': symbol_data.get('percent_change_24h'), 'market_cap': symbol_data.get('market_cap'), } def decimal_format(v, decimals=2): if not v: v = 0 f = locale.format_string('%.{}f'.format(decimals), v, grouping=True) return '{}'.format(f) def monetary_format(v, decimals=2): return '${}'.format(decimal_format(v, decimals)) def btc_format(v): return decimal_format(v, decimals=8) class TokenData: def __init__(self, symbol, price=None, stamp=datetime.now()): self.symbol = symbol self._price = price self._btc_price = 0 self._percent_change = 0 self._btc_percent_change = 0 self._volume = 0 self._market_cap = 0 if price is not None: self.stamp = stamp else: self.stamp = None def _fetch_from_cmc(self, url_template): """ Get quote data for a specific known symbol """ jso = None cmc_id = CMC_SYMBOL_TO_ID.get(self.symbol) url = url_template.format(cmc_id) r = requests.get(url, headers={ 'X-CMC_PRO_API_KEY': CMC_API_KEY, 'Accept': 'application/json', }) if r.status_code != 200: print('Failed to fetch price data for id: {}'.format(cmc_id)) return None try: jso = r.json() except Exception: print('Error parsing JSON') return None return jso def update(self): """ Fetch price from binance """ jso = None data = None if self.stamp is None or ( self.stamp is not None and self.stamp < datetime.now() - CACHE_DURATION ): # CMC USD try: jso = self._fetch_from_cmc(CMC_USD_QUOTE_URL) data = cmc_get_data(jso, CMC_SYMBOL_TO_ID[self.symbol]) except Exception as err: print('Error fetching data: ', str(err)) print(traceback.format_exc()) if data is not None: self._price = data.get('price') self._percent_change = data.get('percent_change') self._volume = data.get('volume') self._market_cap = data.get('market_cap') self.stamp = datetime.now() # CMC BTC try: jso = self._fetch_from_cmc(CMC_BTC_QUOTE_URL) data = cmc_get_data(jso, CMC_SYMBOL_TO_ID[self.symbol], 'BTC') except Exception as err: print('Error fetching data: ', str(err)) print(traceback.format_exc()) if data is not None: self._btc_price = data.get('price') self._btc_percent_change = data.get('percent_change') @property def price(self): self.update() return self._price @property def btc_price(self): self.update() return self._btc_price @property def btc_percent_change(self): self.update() return self._btc_percent_change @property def volume(self): self.update() return self._volume @property def percent_change(self): self.update() pc = str(self._percent_change) if pc and not pc.startswith('-'): pc = '+{}'.format(pc) return pc @property def market_cap(self): self.update() return self._market_cap class TelegramMonitorBot: def __init__(self): self.debug = ( (os.environ.get('DEBUG') is not None) and (os.environ.get('DEBUG').lower() != "false")) # Are admins exempt from having messages checked? self.admin_exempt = ( (os.environ.get('ADMIN_EXEMPT') is not None) and (os.environ.get('ADMIN_EXEMPT').lower() != "false")) if (self.debug): print("🔵 debug:", self.debug) print("🔵 admin_exempt:", self.admin_exempt) print("🔵 TELEGRAM_BOT_POSTGRES_URL:", os.environ["TELEGRAM_BOT_POSTGRES_URL"]) print("🔵 TELEGRAM_BOT_TOKEN:", os.environ["TELEGRAM_BOT_TOKEN"]) print("🔵 NOTIFY_CHAT:", os.environ['NOTIFY_CHAT'] if 'NOTIFY_CHAT' in os.environ else "") print("🔵 MESSAGE_BAN_PATTERNS:\n", os.environ['MESSAGE_BAN_PATTERNS']) print("🔵 MESSAGE_HIDE_PATTERNS:\n", os.environ['MESSAGE_HIDE_PATTERNS']) print("🔵 NAME_BAN_PATTERNS:\n", os.environ['NAME_BAN_PATTERNS']) print("🔵 IGNORE_USER_IDS:\n", os.environ.get('IGNORE_USER_IDS')) # Channel to notify of violoations, e.g. '@channelname' self.notify_chat = os.environ['NOTIFY_CHAT'] if 'NOTIFY_CHAT' in os.environ else None # Ignore these user IDs if not os.environ.get('IGNORE_USER_IDS'): self.ignore_user_ids = [] else: self.ignore_user_ids = list(map(int, os.environ['IGNORE_USER_IDS'].split(','))) # List of chat ids that bot should monitor self.chat_ids = ( list(map(int, os.environ['CHAT_IDS'].split(','))) if "CHAT_IDS" in os.environ else []) # Regex for message patterns that cause user ban self.message_ban_patterns = os.environ['MESSAGE_BAN_PATTERNS'] self.message_ban_re = (re.compile( self.message_ban_patterns, re.IGNORECASE | re.VERBOSE) if self.message_ban_patterns else None) # Regex for message patterns that cause message to be hidden self.message_hide_patterns = os.environ['MESSAGE_HIDE_PATTERNS'] self.message_hide_re = (re.compile( self.message_hide_patterns, re.IGNORECASE | re.VERBOSE) if self.message_hide_patterns else None) # Regex for name patterns that cause user to be banned self.name_ban_patterns = os.environ['NAME_BAN_PATTERNS'] self.name_ban_re = (re.compile( self.name_ban_patterns, re.IGNORECASE | re.VERBOSE) if self.name_ban_patterns else None) # Mime type document check # NOTE: All gifs appear to be converted to video/mp4 mime_types = os.environ.get('ALLOWED_MIME_TYPES', 'video/mp4') self.allowed_mime_types = set(map(lambda s: s.strip(), mime_types.split(','))) # Comamnds self.available_commands = ['flip', 'unflip'] if CMC_API_KEY is not None: self.available_commands.append('price') print('Available commands: {}'.format(', '.join(self.available_commands))) # Cached token prices self.cached_prices = {} self.last_message_out = None @MWT(timeout=60*60) def get_admin_ids(self, bot, chat_id): """ Returns a list of admin IDs for a given chat. Results are cached for 1 hour. """ return [admin.user.id for admin in bot.get_chat_administrators(chat_id)] def ban_user(self, update): """ Ban user """ kick_success = update.message.chat.kick_member(update.message.from_user.id) def security_check_username(self, bot, update): """ Test username for security violations """ full_name = "{} {}".format( update.message.from_user.first_name, update.message.from_user.last_name) if self.name_ban_re and self.name_ban_re.search(full_name): # Logging log_message = "❌ 🙅‍♂️ BAN MATCH FULL NAME: {}".format(full_name.encode('utf-8')) if self.debug: update.message.reply_text(log_message) print(log_message) # Ban the user self.ban_user(update) # Log in database s = session() userBan = UserBan( user_id=update.message.from_user.id, reason=log_message) s.add(userBan) s.commit() s.close() # Notify channel if self.notify_chat: bot.send_message(chat_id=self.notify_chat, text=log_message) if self.name_ban_re and self.name_ban_re.search(update.message.from_user.username or ''): # Logging log_message = "❌ 🙅‍♂️ BAN MATCH USERNAME: {}".format(update.message.from_user.username.encode('utf-8')) if self.debug: update.message.reply_text(log_message) print(log_message) # Ban the user self.ban_user(update) # Log in database s = session() userBan = UserBan( user_id=update.message.from_user.id, reason=log_message) s.add(userBan) s.commit() s.close() # Notify channel if self.notify_chat: bot.send_message(chat_id=self.notify_chat, text=log_message) def security_check_message(self, bot, update): """ Test message for security violations """ if not update.message.text: return # Remove accents from letters (é->e, ñ->n, etc...) message = unidecode.unidecode(update.message.text) # TODO: Replace lookalike unicode characters: # https://github.com/wanderingstan/Confusables # Hide forwarded messages if update.message.forward_date is not None: # Logging log_message = "❌ HIDE FORWARDED: {}".format(update.message.text.encode('utf-8')) if self.debug: update.message.reply_text(log_message) print(log_message) # Delete the message update.message.delete() # Log in database s = session() messageHide = MessageHide( user_id=update.message.from_user.id, message=update.message.text) s.add(messageHide) s.commit() s.close() # Notify channel if self.notify_chat: bot.send_message(chat_id=self.notify_chat, text=log_message) if self.message_ban_re and self.message_ban_re.search(message): # Logging log_message = "❌ 🙅‍♂️ BAN MATCH: {}".format(update.message.text.encode('utf-8')) if self.debug: update.message.reply_text(log_message) print(log_message) # Any message that causes a ban gets deleted update.message.delete() # Ban the user self.ban_user(update) # Log in database s = session() userBan = UserBan( user_id=update.message.from_user.id, reason=update.message.text) s.add(userBan) s.commit() s.close() # Notify channel if self.notify_chat: bot.send_message(chat_id=self.notify_chat, text=log_message) elif self.message_hide_re and self.message_hide_re.search(message): # Logging log_message = "❌ 🙈 HIDE MATCH: {}".format(update.message.text.encode('utf-8')) if self.debug: update.message.reply_text(log_message) print(log_message) # Delete the message update.message.delete() # Log in database s = session() messageHide = MessageHide( user_id=update.message.from_user.id, message=update.message.text) s.add(messageHide) s.commit() s.close() # Notify channel if self.notify_chat: bot.send_message(chat_id=self.notify_chat, text=log_message) def attachment_check(self, bot, update): """ Hide messages with attachments (except photo or video) """ if (update.message.audio or update.message.document or update.message.game or update.message.voice): # Logging if update.message.document: # GIFs are documents and allowed mime_type = update.message.document.mime_type if mime_type and mime_type in self.allowed_mime_types: return log_message = "❌ HIDE DOCUMENT: {}".format(update.message.document.__dict__) else: log_message = "❌ HIDE NON-DOCUMENT ATTACHMENT" if self.debug: update.message.reply_text(log_message) print(log_message) # Delete the message update.message.delete() # Log in database s = session() messageHide = MessageHide( user_id=update.message.from_user.id, message=update.message.text) s.add(messageHide) s.commit() s.close() # Notify channel if self.notify_chat: bot.send_message(chat_id=self.notify_chat, text=log_message) def logger(self, bot, update): """ Primary Logger. Handles incoming bot messages and saves them to DB :param bot: telegram.Bot https://python-telegram-bot.readthedocs.io/en/stable/telegram.bot.html :param update: telegram.Update https://python-telegram-bot.readthedocs.io/en/stable/telegram.update.html """ if ( update.effective_user is None or update.effective_user.id in self.ignore_user_ids ): print("{}: Ignoring update.".format(update.update_id)) return try: message = update.message # message is optional if message is None: if update.effective_message is None: print("No message included in update") return message = update.effective_message if message: user = message.from_user # Limit bot to monitoring certain chats if message.chat_id not in self.chat_ids: from_user = "UNKNOWN" if user: from_user = user.id print("Message from user {} is from chat_id not being monitored: {}".format( from_user, message.chat_id) ) return if self.id_exists(user.id): self.log_message(user.id, message.text, message.chat_id) else: add_user_success = self.add_user( user.id, user.first_name, user.last_name, user.username) if add_user_success: self.log_message( user.id, message.text, message.chat_id) print("User added: {}".format(user.id)) else: print("Something went wrong adding the user {}".format(user.id), file=sys.stderr) user_name = ( user.username or "{} {}".format(user.first_name, user.last_name) or "").encode('utf-8') if message.text: print("{} {} ({}) : {}".format( strftime("%Y-%m-%dT%H:%M:%S"), user.id, user_name, update.message.text.encode('utf-8')) ) else: print("{} {} ({}) : non-message".format( strftime("%Y-%m-%dT%H:%M:%S"), user.id, user_name) ) else: print("Update and user not logged because no message was found") # Don't check admin activity is_admin = False if message: is_admin = message.from_user.id in self.get_admin_ids(bot, message.chat_id) if is_admin and self.admin_exempt: print("👮‍♂️ Skipping checks. User is admin: {}".format(user.id)) else: # Security checks self.attachment_check(bot, update) self.security_check_username(bot, update) self.security_check_message(bot, update) except Exception as e: print("Error[521]: {}".format(e)) print(traceback.format_exc()) print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e) # DB queries def id_exists(self, id_value): s = session() bool_set = False for id1 in s.query(User.id).filter_by(id=id_value): if id1: bool_set = True s.close() return bool_set def log_message(self, user_id, user_message, chat_id): if user_message is None: user_message = "[NO MESSAGE]" try: s = session() language_code = english_message = "" polarity = subjectivity = 0.0 try: # translate to English & log the original language translator = Translator() translated = translator.translate(user_message) language_code = translated.src english_message = translated.text # run basic sentiment analysis on the translated English string analysis = TextBlob(english_message) polarity = analysis.sentiment.polarity subjectivity = analysis.sentiment.subjectivity except Exception as e: print("Error translating message: {}".format(e)) msg1 = Message(user_id=user_id, message=user_message, chat_id=chat_id, language_code=language_code, english_message=english_message, polarity=polarity, subjectivity=subjectivity) s.add(msg1) s.commit() s.close() except Exception as e: print("Error logging message: {}".format(e)) print(traceback.format_exc()) def add_user(self, user_id, first_name, last_name, username): try: s = session() user = User( id=user_id, first_name=first_name, last_name=last_name, username=username) s.add(user) s.commit() s.close() return self.id_exists(user_id) except Exception as e: print("Error[347]: {}".format(e)) print(traceback.format_exc()) def handle_command(self, bot, update): """ Handles commands Note: Args reversed from docs? Maybe version differences? Docs say cb(update, context) but we're getting cb(bot, update). update: Update: https://python-telegram-bot.readthedocs.io/en/stable/telegram.update.html#telegram.Update context: CallbackContext: https://python-telegram-bot.readthedocs.io/en/stable/telegram.ext.callbackcontext.html hi: says hi price: prints the OGN price """ chat_id = None command = None message_id = update.effective_message.message_id command = command_from_message(update.effective_message) if update.effective_message.chat: chat_id = update.effective_message.chat.id print('command: {} seen in chat_id {}'.format(command, chat_id)) if command == '/hi': bot.send_message(chat_id, 'Yo whattup, @{}!'.format(update.effective_user.username)) elif command == '/flip': bot.send_message(chat_id, '╯°□°)╯︵ ┻━┻') elif command == '/unflip': bot.send_message(chat_id, '┬──┬ ¯\\_(՞▃՞ ¯\\_)') elif command == '/price': """ Price, 24 hour %, 24 hour volume, and market cap """ symbol = 'OGN' if symbol not in self.cached_prices: self.cached_prices[symbol] = TokenData(symbol) pdata = self.cached_prices[symbol] message = """ *Origin Token* (OGN) *USD Price*: {} ({}%) *BTC Price*: {} ({}%) *Market Cap*: {} *Volume(24h)*: {} @{}""".format( monetary_format(pdata.price, decimals=5), pdata.percent_change, btc_format(pdata.btc_price), pdata.btc_percent_change, monetary_format(pdata.market_cap), monetary_format(pdata.volume), update.effective_user.username, ) # If the last message we sent was price, delete it to reduce spam if ( self.last_message_out and self.last_message_out.get('type') == 'price' and self.last_message_out['message'].message_id ): try: bot.delete_message( chat_id, self.last_message_out['message'].message_id ) except Exception as err: print('Unable to delete previous price message: ', err) print(traceback.format_exc()) self.last_message_out = { 'type': 'price', 'message': bot.send_message( chat_id, message, parse_mode=telegram.ParseMode.MARKDOWN ), } # Delete the command message as well try: bot.delete_message(chat_id, message_id) except Exception: # nbd if we cannot delete it pass def error(self, bot, update, error): """ Log Errors caused by Updates. """ print("Update '{}' caused error '{}'".format(update, error), file=sys.stderr) def start(self): """ Start the bot. """ # Create the EventHandler and pass it your bot's token. updater = Updater(os.environ["TELEGRAM_BOT_TOKEN"]) # Get the dispatcher to register handlers dp = updater.dispatcher # on different commands - answer in Telegram # on commands dp.add_handler( CommandHandler( command=self.available_commands, callback=self.handle_command, filters=Filters.all, ) ) # on noncommand i.e message - echo the message on Telegram dp.add_handler(MessageHandler( Filters.all, lambda bot, update : self.logger(bot, update) )) # dp.add_handler(MessageHandler(Filters.status_update, status)) # log all errors dp.add_error_handler( lambda bot, update, error : self.error(bot, update, error) ) # Start the Bot updater.start_polling() print("Bot started. Montitoring chats: {}".format(self.chat_ids)) # Run the bot until you press Ctrl-C or the process receives SIGINT, # SIGTERM or SIGABRT. This should be used most of the time, since # start_polling() is non-blocking and will stop the bot gracefully. updater.idle() if __name__ == '__main__': c = TelegramMonitorBot() c.start()