Files
tg4xmpp/xmpp_tg/message_handlers.py

360 lines
15 KiB
Python

import re, sys, os, io, sqlite3, hashlib, time, datetime
import xml.etree.ElementTree as ET
import logging, traceback, pprint
from xmpp_tg.mtproto import TelegramGateClient
from xmpp_tg.utils import var_dump, display_tg_name, get_contact_jid, localtime
# modified by adnidor
class MessageHandler():
_on_connect = lambda: None
def _unknown_command_handler(self, *args, **kwargs):
return "Unknown command, for a list send !help"
class WrongNumberOfArgsError(Exception):
pass
def _min_args(self, num_args):
if len(self.arguments) < num_args:
raise self.WrongNumberOfArgsError("!{} needs at least {} arguments".format(self._command, num_args))
def __init__(self, msg):
self._command = msg["body"].split(" ")[0][1:]
self._handler = getattr(self, self._command, self._unknown_command_handler)
self.type = "groupchat" if msg["type"] == "groupchat" else "chat"
self.sender = msg["from"]
self.jid = msg["from"].bare
self.replyto = self.sender.full if self.type == "chat" else self.sender.bare
self.arguments = msg["body"].split(" ")[1:]
self.msg = msg
def _update(self, text):
xmpp.send_message(mto=self.replyto, mtype=self.type, mbody=text)
def debug(self, *args, **kwargs):
"""Show debug info"""
return pprint.pformat(self.__dict__)
def help(self, *args, **kwargs):
"""List available commands"""
#taken from https://www.python.org/dev/peps/pep-0257/#handling-docstring-indentation
def trim(docstring):
if not docstring:
return ''
# Convert tabs to spaces (following the normal Python rules)
# and split into a list of lines:
lines = docstring.expandtabs().splitlines()
# Determine minimum indentation (first line doesn't count):
indent = 500
for line in lines[1:]:
stripped = line.lstrip()
if stripped:
indent = min(indent, len(line) - len(stripped))
# Remove indentation (first line is special):
trimmed = [lines[0].strip()]
if indent < 500:
for line in lines[1:]:
trimmed.append(line[indent:].rstrip())
# Strip off trailing and leading blank lines:
while trimmed and not trimmed[-1]:
trimmed.pop()
while trimmed and not trimmed[0]:
trimmed.pop(0)
# Return a single string:
return '\n'.join(trimmed)
if len(self.arguments) == 0:
methods = [func for func in dir(self) if not func.startswith("_") and callable(getattr(self, func))]
reply = "Available commands:"
for method in methods:
docstring = getattr(self, method).__doc__
if docstring is None:
docstring = "No description available"
reply += "\n"+method+" ("+docstring.split("\n")[0]+")"
return reply
else:
method = getattr(self,self.arguments[0])
reply = trim(method.__doc__)
return reply
class GateMessageHandler(MessageHandler):
def configure(hndl, self):
"""Get config/set config options"""
config_exclude = ['jid', 'tg_phone']
option = hndl.arguments[0] if len(hndl.arguments) >= 1 else None
value = hndl.arguments[1] if len(hndl.arguments) == 2 else None
if value is not None and option not in config_exclude:
self.db_connection.execute("update accounts set {} = ? where jid = ?".format(option), (value,hndl.jid,) )
self.accounts[hndl.jid] = self.db_connection.execute("SELECT * FROM accounts where jid = ?", (hndl.jid,) ).fetchone()
message = "=== Your current configuration ===\n\n"
for param, value in self.accounts[hndl.jid].items():
message = message + "<%s>: %s" % (param, value) + "\n"
message = message + "\nTo modify some option, please, send !configure param value"
return message
def login(hndl, self): #1 arg
"""Initiates Telegram session
Usage: !login <phone number in international format>"""
hndl._min_args(1)
phone_no = hndl.arguments[0]
hndl._update("Please wait...")
self.spawn_tg_client(hndl.jid, phone_no)
if self.tg_connections[hndl.jid].is_user_authorized():
self.send_presence(pto=jid, pfrom=self.boundjid.bare, ptype='online', pstatus='connected')
return "You are already authenticated in Telegram"
else:
# remove old sessions for this JID #
self.db_connection.execute("DELETE from accounts where jid = ?", (hndl.jid, ) )
self.tg_connections[hndl.jid].send_code_request(phone_no)
self._update('Gate is connected. Telegram should send SMS message to you.')
return 'Please enter one-time code via !code 12345.'
def code(hndl, self):
hndl._min_args(1)
code = hndl.arguments[0]
jid = hndl.jid
if not self.tg_connections[jid].is_user_authorized():
try:
hndl._update('Trying authenticate...')
self.tg_connections[jid].sign_in(self.tg_phones[jid], code)
except SessionPasswordNeededError:
self.gate_reply_message(iq, 'Two-factor authentication detected.')
self.gate_reply_message(iq, 'Please enter your password via !password abc123.')
return
if self.tg_connections[jid].is_user_authorized():
self.send_presence(pto=jid, pfrom=self.boundjid.bare, ptype='online', pstatus='connected')
self.gate_reply_message(iq, 'Authentication successful. Initiating Telegram...')
self.db_connection.execute("INSERT INTO accounts(jid, tg_phone) VALUES(?, ?)", (jid, self.tg_phones[jid],))
self.accounts[jid] = self.db_connection.execute("SELECT * FROM accounts where jid = ?", (jid,) ).fetchone()
self.init_tg(jid)
else:
return 'Authentication failed.'
else:
return 'You are already authenticated. Please use !logout before new login.'
def password(hndl, self):
hndl._min_args(1)
password = hndl.arguments[1]
jid = self.jid
if not self.tg_connections[jid].is_user_authorized():
self.gate_reply_message(iq, 'Checking password...')
self.tg_connections[jid].sign_in(password=password)
if self.tg_connections[jid].is_user_authorized():
self.send_presence(pto=jid, pfrom=self.boundjid.bare, ptype='online', pstatus='connected')
self.gate_reply_message(iq, 'Authentication successful. Initiating Telegram...')
self.db_connection.execute("INSERT INTO accounts(jid, tg_phone) VALUES(?, ?)", (jid, self.tg_phones[jid],))
self.accounts[jid] = self.db_connection.execute("SELECT * FROM accounts where jid = ?", (jid,) ).fetchone()
self.init_tg(jid)
else:
self.gate_reply_message(iq, 'Authentication failed.')
else:
self.gate_reply_message(iq, 'You are already authenticated. Please use !logout before new login.')
def list_sessions(hndl, self):
if not self.tg_connections[hndl.jid].is_user_authorized():
return "Error"
sessions = self.tg_connections[hndl.jid].invoke(GetAuthorizationsRequest())
return str(sessions)
def reload_dialogs(hndl, self):
if not self.tg_connections[hndl.jid].is_user_authorized():
return "Error"
self.tg_process_dialogs(hndl.jid)
return "Dialogs reloadad"
def logout(hndl, self):
"""Deletes current Telegram session at Gateway"""
self.tg_connections[hndl.jid].log_out()
self.db_connection.execute("DELETE FROM accounts WHERE jid = ?", (hndl.jid,))
return 'Your Telegram session was deleted'
def add(hndl, self): #1 arg
"""Add contact by nickname or t.me link"""
hndl._min_args(1)
argument = hndl.arguments[0]
jid = hndl.jid
result = self.tg_connections[jid].get_entity(argument)
if type(result) == User:
tg_peer = InputPeerUser( result.id, result.access_hash )
result = self.tg_connections[jid].invoke( SendMessageRequest(tg_peer, 'Hello! I just want to add you in my contact list.', generate_random_long() ) )
elif type(result) == Channel:
tg_peer = InputPeerChannel( result.id, result.access_hash )
self.tg_connections[jid].invoke(JoinChannelRequest( InputPeerChannel(result.id, result.access_hash) ) )
else:
self.gate_reply_message(iq, 'Sorry, nothing found.')
return
self.tg_process_dialogs(jid)
def join(hndl, self): #1 arg
"""Join conference via invite link"""
hndl._min_args(1)
argument = hndl.arguments[0]
jid = hndl.jid
link = argument.split('/') # https://t.me/joinchat/HrCmckx_SkMbSGFLhXCvSg
self.tg_connections[jid].invoke(ImportChatInviteRequest(link[4]))
time.sleep(1)
self.tg_process_dialogs(jid)
def group(hndl, self): #2 args
"""Creat a Group"""
hndl._min_args(2)
# group name? #
groupname = parsed[1]
# group users? #
groupuser = self.tg_connections[jid].get_entity(parsed[2])
# we re ready to make group
self.tg_connections[jid].invoke(CreateChatRequest([groupuser], groupname))
self.tg_process_dialogs(jid)
def supergroup(hndl, self): #1 arg
"""Create a channel"""
hndl._min_args(1)
groupname = hndl.arguments[0]
self.tg_connections[hndl.jid].invoke(CreateChannelRequest(groupname, groupname, megagroup = True))
self.tg_process_dialogs(hndl.jid)
channel = supergroup
def username(hndl, self): #1 arg
"""Change your Telegram nickname"""
hndl._min_args(1)
username = hndl.arguments[0]
self.tg_connections[hndl.jid].invoke(UpdateUsernameRequest(username))
def name(hndl, self): #1 or 2 args
"""Change your Telegram display name"""
hndl._min_args(1)
firstname = hndl.arguments[0]
lastname = hndl.arguments[1] if len(hndl.arguments) > 1 else None
self.tg_connections[hndl.jid].invoke(UpdateProfileRequest(first_name = firstname, last_name = lastname))
def about(hndl, self): #>0 args
"""Change your Telegram 'about' text"""
hndl._min_args(1)
about = hndl.msg['body'][7:]
self.tg_connections[hndl.jid].invoke(UpdateProfileRequest(about = about))
def import_contact(hndl, self): #2 args
"""Add contact by phone number"""
hndl._min_args(2)
phone = hndl.arguments[0]
firstname = hndl.arguments[1]
lastname = hndl.arguments[2] if len(hndl.arguments) > 2 else None
contact = InputPhoneContact(client_id=generate_random_long(), phone=phone, first_name=firstname, last_name=lastname)
self.tg_connections[hndl.jid].invoke(ImportContactsRequest([contact]))
self.tg_process_dialogs(jid)
def roster(hndl, self):
"""Send Telegram contact list back, with JIDs"""
response = "Telegram chats:\n"
for jid,tid in self.contact_list[hndl.jid].items():
response += "{}: {}\n".format(tid, jid)
return response
def oldhelp(hndl, self):
"""Old !help message"""
return ('=== Available gateway commands ===:\n\n'
'!help - Displays this text\n'
'!login +123456789 - Initiates Telegram session\n'
'!code 12345 - Entering one-time code during auth\n'
'!password abc123 - Entering password during two-factor auth\n'
'!configure - Configure transport settings\n'
#'!list_sessions - List all created sessions at Telegram servers\n'
#'!delete_session 123 - Delete session\n'
'!logout - Deletes current Telegram session at gate\n'
'!reload_dialogs - Reloads dialogs list from Telegram\n\n'
'!add - Find and add Telegram contact. Any formats accepted (nickname or t.me link)\n\n'
'!join - Join Telegram conference via invite link \n\n'
'!import phone firstname lastname - Add Telegram contact with phone number \n\n'
'!group GroupName @InviteContact - Create a normal group\n'
'!supergroup SupergroupName - Create a supergroup\n'
'!channel ChannelName - Create a channel\n\n'
'!name first last - Change your name in Telegram\n'
'!about text - Change about text in Telegram\n'
'!username - Changes your @username in Telegram\n\n'
'!roster - Lists yout TG roster\n')
class ChatCommandHandler(MessageHandler):
def __init__(self, msg):
super(ChatCommandHandler, self).__init__(msg)
if self._command.startswith("s/"):
self._handler = self._replace
self.tg_id = int(msg['to'].node[1:])
def block(hndl, self):
nickname = display_tg_name(self.tg_dialogs[hndl.jid]['users'][hndl.tg_id])
self.tg_connections[hndl.jid].invoke(BlockRequest( InputPeerUser(hndl.tg_id, self.tg_dialogs[jid]['users'][hndl.tg_id].access_hash) ) )
self.gate_reply_message(iq, 'User %s blacklisted!' % nickname)
def unblock(hndl, self):
nickname = display_tg_name(self.tg_dialogs[hndl.jid]['users'][hndl.tg_id])
self.tg_connections[hndl.jid].invoke(UnblockRequest( InputPeerUser(hndl.tg_id, self.tg_dialogs[jid]['users'][hndl.tg_id].access_hash) ) )
self.gate_reply_message(iq, 'User %s unblacklisted!' % nickname)
def remove(hndl, self):
peer = InputPeerUser(hndl.tg_id, self.tg_dialogs[hndl.jid]['users'][hndl.tg_id].access_hash)
c_jid = get_contact_jid(self.tg_dialogs[hndl.jid]['users'][hndl.tg_id], self.boundjid.bare)
self.tg_connections[hndl.jid].invoke( DeleteContactRequest(peer) )
self.tg_connections[hndl.jid].invoke( DeleteHistoryRequest( peer, max_id = 0, just_clear = None ) )
self.send_presence(pto = hndl.jid, pfrom = c_jid, ptype = 'unavailable')
self.send_presence(pto = hndl.jid, pfrom = c_jid, ptype = 'unsubscribed')
self.send_presence(pto = hndl.jid, pfrom = c_jid, ptype = 'unsubscribe')
def _replace(hndl, self):
peer = InputPeerUser(hdnl.tg_id, self.tg_dialogs[hndl.jid]['users'][hndl.tg_id].access_hash)
msg_id, edited = self.edit_message(hdnl.jid, hdnl.tg_id, hndl.msg['body'])
if not edited: return
# and send it
if edited != '' and edited != ' ':
self.tg_dialogs[hndl.jid]['messages'][hndl.tg_id]["body"] = edited
self.tg_connections[hndl.jid].invoke( EditMessageRequest(peer, msg_id, message = edited) )
else:
del(self.tg_dialogs[hndl.jid]['messages'][hndl.tg_id])
self.tg_connections[hndl.jid].invoke( DeleteMessagesRequest([msg_id], revoke = True) )