Fixed checking of attachments.
Plus cleanup of old memorize util
This commit is contained in:
parent
34ba0c4d04
commit
a398742b96
26
bot.py
26
bot.py
@ -27,8 +27,14 @@ class TelegramMonitorBot:
|
|||||||
(os.environ.get('DEBUG') is not None) and
|
(os.environ.get('DEBUG') is not None) and
|
||||||
(os.environ.get('DEBUG').upper() != "false"))
|
(os.environ.get('DEBUG').upper() != "false"))
|
||||||
|
|
||||||
|
# Are admins exempt from having messages checked?
|
||||||
|
self.admin_exempt = (
|
||||||
|
(os.environ.get('ADMIN_EXEMPT') is not None) and
|
||||||
|
(os.environ.get('ADMIN_EXEMPT').upper() != "false"))
|
||||||
|
|
||||||
if (self.debug):
|
if (self.debug):
|
||||||
print("🔵 DEBUG:", os.environ["DEBUG"])
|
print("🔵 debug:", self.debug)
|
||||||
|
print("🔵 admin_exempt:", self.admin_exempt)
|
||||||
print("🔵 TELEGRAM_BOT_POSTGRES_URL:", os.environ["TELEGRAM_BOT_POSTGRES_URL"])
|
print("🔵 TELEGRAM_BOT_POSTGRES_URL:", os.environ["TELEGRAM_BOT_POSTGRES_URL"])
|
||||||
print("🔵 TELEGRAM_BOT_TOKEN:", os.environ["TELEGRAM_BOT_TOKEN"])
|
print("🔵 TELEGRAM_BOT_TOKEN:", os.environ["TELEGRAM_BOT_TOKEN"])
|
||||||
print("🔵 NOTIFY_CHAT:", os.environ['NOTIFY_CHAT'] if 'NOTIFY_CHAT' in os.environ else "<undefined>")
|
print("🔵 NOTIFY_CHAT:", os.environ['NOTIFY_CHAT'] if 'NOTIFY_CHAT' in os.environ else "<undefined>")
|
||||||
@ -124,6 +130,9 @@ class TelegramMonitorBot:
|
|||||||
def security_check_message(self, bot, update):
|
def security_check_message(self, bot, update):
|
||||||
""" Test message for security violations """
|
""" Test message for security violations """
|
||||||
|
|
||||||
|
if not update.message.text:
|
||||||
|
return
|
||||||
|
|
||||||
# Remove accents from letters (é->e, ñ->n, etc...)
|
# Remove accents from letters (é->e, ñ->n, etc...)
|
||||||
message = unidecode.unidecode(update.message.text)
|
message = unidecode.unidecode(update.message.text)
|
||||||
# TODO: Replace lookalike unicode characters:
|
# TODO: Replace lookalike unicode characters:
|
||||||
@ -196,9 +205,11 @@ class TelegramMonitorBot:
|
|||||||
update.message.document or
|
update.message.document or
|
||||||
update.message.game or
|
update.message.game or
|
||||||
update.message.voice):
|
update.message.voice):
|
||||||
print("Non message handling")
|
|
||||||
# Logging
|
# Logging
|
||||||
log_message = "❌ HIDE ATTACHMENT"
|
if update.message.document:
|
||||||
|
log_message = "❌ HIDE DOCUMENT: {}".format(update.message.document.__dict__)
|
||||||
|
else:
|
||||||
|
log_message = "❌ HIDE NON-DOCUMENT ATTACHMENT"
|
||||||
if self.debug:
|
if self.debug:
|
||||||
update.message.reply_text(log_message)
|
update.message.reply_text(log_message)
|
||||||
print(log_message)
|
print(log_message)
|
||||||
@ -258,14 +269,15 @@ class TelegramMonitorBot:
|
|||||||
(user.username or (user.first_name + " " + user.last_name) or "").encode('utf-8'))
|
(user.username or (user.first_name + " " + user.last_name) or "").encode('utf-8'))
|
||||||
)
|
)
|
||||||
|
|
||||||
if (self.debug or
|
# Don't check admin activity
|
||||||
(update.message.text and update.message.from_user.id not in self.get_admin_ids(bot, update.message.chat_id))):
|
is_admin = update.message.from_user.id not in self.get_admin_ids(bot, update.message.chat_id)
|
||||||
|
if is_admin and self.admin_exempt:
|
||||||
|
print("👮♂️ Skipping checks. User is admin: {}".format(user.id))
|
||||||
|
else:
|
||||||
# Security checks
|
# Security checks
|
||||||
self.attachment_check(bot, update)
|
self.attachment_check(bot, update)
|
||||||
self.security_check_username(bot, update)
|
self.security_check_username(bot, update)
|
||||||
self.security_check_message(bot, update)
|
self.security_check_message(bot, update)
|
||||||
else:
|
|
||||||
print("👮♂️ Skipping checks. User is admin: {}".format(user.id))
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("Error: {}".format(e))
|
print("Error: {}".format(e))
|
||||||
|
6
mwt.py
6
mwt.py
@ -1,7 +1,7 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
class MWT(object):
|
class MWT(object):
|
||||||
"""Memoize With Timeout"""
|
"""Memorize With Timeout"""
|
||||||
_caches = {}
|
_caches = {}
|
||||||
_timeouts = {}
|
_timeouts = {}
|
||||||
|
|
||||||
@ -26,11 +26,11 @@ class MWT(object):
|
|||||||
key = (args, tuple(kw))
|
key = (args, tuple(kw))
|
||||||
try:
|
try:
|
||||||
v = self.cache[key]
|
v = self.cache[key]
|
||||||
print("cache")
|
# print("cache")
|
||||||
if (time.time() - v[1]) > self.timeout:
|
if (time.time() - v[1]) > self.timeout:
|
||||||
raise KeyError
|
raise KeyError
|
||||||
except KeyError:
|
except KeyError:
|
||||||
print("new")
|
# print("new")
|
||||||
v = self.cache[key] = f(*args,**kwargs),time.time()
|
v = self.cache[key] = f(*args,**kwargs),time.time()
|
||||||
return v[0]
|
return v[0]
|
||||||
func.func_name = f.__name__
|
func.func_name = f.__name__
|
||||||
|
Loading…
Reference in New Issue
Block a user