[SVN] bump to version 2.0 [UPD] now transport working with telethon 0.15.5 and sleekxmpp 1.3.2 [FIX] fixed everlasting authorization requests. if you got deauth message — ignore it, FROM subscription is enough. [ADD] implemented roster exchange via XEP-0144 [ADD] we will send authorization request when unknown contact sent us a message [ADD] correct presence handling for transport and users [ADD] fixed presence spam (by default, we updating presence once for 60 seconds -- look at `status_update_interval` in mtproto.py) [ADD] we will automatically connect to all actual sessions after transport start
500 lines
24 KiB
Python
500 lines
24 KiB
Python
from telethon import TelegramClient
|
||
from telethon.utils import get_extension
|
||
from telethon.tl.types import UpdateShortMessage, UpdateShortChatMessage, UpdateEditMessage, UpdateDeleteMessages, \
|
||
UpdateNewMessage, UpdateUserStatus, UpdateShort, Updates, UpdateNewChannelMessage,\
|
||
UpdateChannelTooLong, UpdateDeleteChannelMessages, UpdateEditChannelMessage,\
|
||
UpdateUserName
|
||
from telethon.tl.types import InputPeerChat, InputPeerUser, InputPeerChannel, InputUser
|
||
from telethon.tl.types import MessageMediaDocument, MessageMediaPhoto, MessageMediaUnsupported, MessageMediaContact,\
|
||
MessageMediaGeo, MessageMediaEmpty, MessageMediaVenue
|
||
from telethon.tl.types import DocumentAttributeAnimated, DocumentAttributeAudio, DocumentAttributeFilename,\
|
||
DocumentAttributeSticker, DocumentAttributeVideo, DocumentAttributeHasStickers
|
||
from telethon.tl.types import MessageService, MessageActionChannelCreate, MessageActionChannelMigrateFrom,\
|
||
MessageActionChatCreate, MessageActionChatAddUser, MessageActionChatDeleteUser,\
|
||
MessageActionChatEditTitle, MessageActionChatJoinedByLink, MessageActionChatMigrateTo,\
|
||
MessageActionPinMessage
|
||
from telethon.tl.types import UserStatusOnline, UserStatusOffline, UserStatusRecently
|
||
from telethon.tl.types import User, Chat, Channel
|
||
from telethon.tl.types import PeerUser, PeerChat, PeerChannel
|
||
from telethon.tl.functions.users import GetFullUserRequest
|
||
from telethon.tl.functions.messages import ReadHistoryRequest, GetFullChatRequest
|
||
from telethon.tl.functions.channels import ReadHistoryRequest as ReadHistoryChannel, GetParticipantRequest
|
||
from telethon.tl.functions.updates import GetDifferenceRequest
|
||
from telethon.tl.functions.contacts import ResolveUsernameRequest
|
||
|
||
import hashlib
|
||
import os
|
||
import queue
|
||
import threading
|
||
import time
|
||
from xmpp_tg.utils import display_tg_name
|
||
|
||
from .utils import var_dump
|
||
import traceback
|
||
|
||
|
||
class TelegramGateClient(TelegramClient):
|
||
def __init__(self, session, api_id, api_hash, xmpp_gate, jid, phone, proxy=None):
|
||
super().__init__(session, api_id, api_hash, proxy=proxy, update_workers = 4)
|
||
|
||
self.me = None
|
||
|
||
self.xmpp_gate = xmpp_gate
|
||
self.jid = jid
|
||
self.phone = phone
|
||
self.user_options = {'nl_after_info': True, 'status_update_interval': 60}
|
||
|
||
self._media_queue = queue.Queue()
|
||
self._media_thread = threading.Thread(name='MediaDownloaderThread', target=self.media_thread_downloader)
|
||
self._media_thread.start()
|
||
|
||
self._groups_users = dict()
|
||
self._message_cache_users = dict()
|
||
self._message_cache_groups = dict()
|
||
self._message_cache_supergroups = dict()
|
||
|
||
self._status_last = dict()
|
||
|
||
self._del_pts = 0
|
||
|
||
|
||
def xmpp_update_handler(self, obj):
|
||
print('new update for ' + self.jid)
|
||
print(type(obj), obj.__dict__)
|
||
|
||
if not self.me:
|
||
self.me = self.get_me()
|
||
|
||
'''
|
||
Боты
|
||
Сделать запоминание ростера в бд
|
||
Сделать лучше хендлинг ошибок
|
||
Доделать все типы информационных сообщений
|
||
Сделать джойны по линкам в чаты/каналы
|
||
Сделать поиск и добавление пользователей
|
||
Сделать листание истории
|
||
Сделать отправку всех непрочтенных сообщений при входе
|
||
'''
|
||
|
||
# Здесь будет очень длинный пиздец ^__^
|
||
|
||
nl = '\n' if self.user_options['nl_after_info'] else ''
|
||
|
||
try:
|
||
|
||
# message from normal chat #
|
||
if type(obj) in [UpdateShortMessage] and not obj.out:
|
||
|
||
fwd_from = self._process_forward_msg(obj) if obj.fwd_from else '' # process forward messages
|
||
self.gate_send_message( mfrom='u' + str(obj.user_id), mbody = '[MSG {}] {}{}'.format(obj.id, fwd_from, obj.message) )
|
||
|
||
if obj.user_id in self.xmpp_gate.tg_dialogs[self.jid]['users']: # make as read
|
||
usr = self.xmpp_gate.tg_dialogs[self.jid]['users'][obj.user_id]
|
||
self.invoke(ReadHistoryRequest( InputPeerUser(usr.id, usr.access_hash), obj.id ))
|
||
|
||
# message from normal group #
|
||
if type(obj) in [UpdateShortChatMessage] and not obj.out:
|
||
fwd_from = self._process_forward_msg(obj) if obj.fwd_from else '' # process forward messages
|
||
nickname = ''
|
||
|
||
# get sender information from chat info
|
||
if obj.from_id not in self._groups_users:
|
||
chat_info = self.invoke(GetFullChatRequest(obj.chat_id))
|
||
|
||
for usr in chat_info.users:
|
||
self._groups_users[usr.id] = usr
|
||
|
||
nickname = display_tg_name(self._groups_users[obj.from_id].first_name, self._groups_users[obj.from_id].last_name)
|
||
|
||
# send message
|
||
self.gate_send_message(mfrom='g' + str(obj.chat_id), mbody ='[MSG {}] [User: {}] {}{}'.format(obj.id, nickname, fwd_from, obj.message) )
|
||
self.invoke(ReadHistoryRequest(InputPeerChat(obj.chat_id), obj.id))
|
||
|
||
|
||
# message from supergroup or media message #
|
||
if type(obj) in [UpdateNewMessage, UpdateNewChannelMessage, UpdateEditMessage, UpdateEditChannelMessage] and not obj.message.out:
|
||
|
||
cid = None
|
||
mid = obj.message.id
|
||
msg = obj.message.message
|
||
fwd_from = ''
|
||
|
||
# detect message type
|
||
is_user = type(obj.message.to_id) is PeerUser
|
||
is_group = type(obj.message.to_id) is PeerChat
|
||
is_supergroup = type(obj.message.to_id) is PeerChannel
|
||
|
||
# is forwarded?
|
||
if obj.message.fwd_from:
|
||
fwd_from = self._process_forward_msg(obj.message)
|
||
|
||
# detect from id
|
||
if is_user:
|
||
cid = obj.message.from_id
|
||
usr = self.xmpp_gate.tg_dialogs[self.jid]['users'][cid]
|
||
prefix = 'u'
|
||
elif is_group:
|
||
cid = obj.message.to_id.chat_id
|
||
prefix = 'g'
|
||
elif is_supergroup:
|
||
cid = obj.message.to_id.channel_id
|
||
access_hash = self.xmpp_gate.tg_dialogs[self.jid]['supergroups'][cid].access_hash if cid in self.xmpp_gate.tg_dialogs[self.jid]['supergroups'] else None
|
||
prefix = 's'
|
||
|
||
# maybe its channel? #
|
||
if obj.message.post:
|
||
prefix = 'c'
|
||
|
||
# maybe its forwarded? #
|
||
|
||
# get sender information from chat info #
|
||
if not is_user and not obj.message.post:
|
||
if obj.message.from_id not in self._groups_users:
|
||
|
||
chat_info = self.invoke(GetFullChatRequest(cid)) if is_group else self.invoke(GetParticipantRequest(InputPeerChannel(cid, access_hash), InputPeerUser(self.me.id, self.me.access_hash)))
|
||
for usr in chat_info.users:
|
||
self._groups_users[usr.id] = usr
|
||
|
||
nickname = display_tg_name(self._groups_users[obj.message.from_id].first_name, self._groups_users[obj.message.from_id].last_name)
|
||
msg = '[User: {}] {}'.format(nickname, msg)
|
||
|
||
|
||
# message media #
|
||
if obj.message.media:
|
||
msg = '{} {}'.format( msg, self._process_media_msg(obj.message.media) )
|
||
|
||
# edited #
|
||
if obj.message.edit_date:
|
||
msg = '[Edited] {}'.format(msg)
|
||
|
||
# send message #
|
||
self.gate_send_message(prefix + str(cid), mbody = '[MSG {}] {}{}'.format(mid, fwd_from, msg) )
|
||
|
||
# delivery report
|
||
if is_user and usr.access_hash: # make as read
|
||
self.invoke(ReadHistoryRequest( InputPeerUser(usr.id, usr.access_hash), mid ))
|
||
if is_group:
|
||
self.invoke(ReadHistoryRequest(InputPeerChat(cid), mid))
|
||
if is_supergroup and access_hash:
|
||
self.invoke(ReadHistoryChannel(InputPeerChannel(cid, access_hash), mid))
|
||
|
||
|
||
# Status Updates #
|
||
if type(obj) is UpdateUserStatus:
|
||
|
||
print(self._status_last)
|
||
print(time.time())
|
||
|
||
# save last update time #
|
||
if (obj.user_id in self._status_last) and ( (time.time() - self._status_last[obj.user_id]['time'] < self.user_options['status_update_interval']) or self._status_last[obj.user_id]['status'] == obj.status ):
|
||
return
|
||
|
||
self._status_last[obj.user_id] = {'status': obj.status, 'time': time.time()}
|
||
|
||
# process status update #
|
||
if type(obj.status) is UserStatusOnline:
|
||
self.xmpp_gate.send_presence( pto=self.jid, pfrom='u'+str(obj.user_id)+'@'+self.xmpp_gate.config['jid'])
|
||
elif type(obj.status) is UserStatusOffline:
|
||
self.xmpp_gate.send_presence( pto=self.jid, pfrom='u'+str(obj.user_id)+'@'+self.xmpp_gate.config['jid'], ptype='xa', pstatus=obj.status.was_online.strftime('Last seen at %H:%M %d/%m/%Y') )
|
||
elif type(obj.status) is UserStatusRecently:
|
||
self.xmpp_gate.send_presence( pto=self.jid, pfrom='u' + str(obj.user_id) + '@' + self.xmpp_gate.config['jid'], pstatus='Last seen recently' )
|
||
else:
|
||
print(type(obj.status))
|
||
print(obj.update.status.__dict__)
|
||
|
||
|
||
except Exception:
|
||
print('Exception occurs!')
|
||
print(traceback.format_exc())
|
||
|
||
print(' ')
|
||
|
||
def gate_send_message(self, mfrom, mbody):
|
||
tg_from = int(mfrom[1:])
|
||
if not tg_from in self.xmpp_gate.tg_dialogs[self.jid]['users'] and not tg_from in self.xmpp_gate.tg_dialogs[self.jid]['groups'] and not tg_from in self.xmpp_gate.tg_dialogs[self.jid]['supergroups']:
|
||
print('Re-init dialog list...')
|
||
self.xmpp_gate.tg_process_dialogs( self.jid )
|
||
|
||
self.xmpp_gate.send_message( mto=self.jid, mfrom=mfrom + '@' + self.xmpp_gate.config['jid'], mtype='chat', mbody=mbody)
|
||
|
||
def generate_media_link(self, media):
|
||
"""
|
||
Генерирует будующее имя и ссылку на скачиваемое медиа-вложения из сообщения
|
||
:param media:
|
||
:return:
|
||
"""
|
||
if type(media) is MessageMediaPhoto:
|
||
media_id = media.photo.id
|
||
elif type(media) is MessageMediaDocument:
|
||
media_id = media.document.id
|
||
else:
|
||
return None
|
||
|
||
ext = get_extension(media)
|
||
if ext == '.oga':
|
||
ext = '.ogg'
|
||
|
||
file_name = hashlib.new('sha256')
|
||
file_name.update(str(media_id).encode('ascii'))
|
||
file_name.update(str(os.urandom(2)).encode('ascii'))
|
||
file_name = file_name.hexdigest() + ext
|
||
|
||
link = self.xmpp_gate.config['media_web_link_prefix'] + file_name
|
||
|
||
return {'name': file_name, 'link': link}
|
||
|
||
@staticmethod
|
||
def get_document_attribute(attributes, match):
|
||
"""
|
||
Находит заданных аттрибут в списке. Используется при разборе медиа-вложений типа Документ.
|
||
:param attributes:
|
||
:param match:
|
||
:return:
|
||
"""
|
||
for attrib in attributes:
|
||
if type(attrib) == match:
|
||
return attrib
|
||
return None
|
||
|
||
def _process_forward_msg(self, message):
|
||
"""
|
||
Обрабатывает информацию в пересланном сообщении (от кого оно и/или из какого канала). Требует дополнительно
|
||
предоставление информации об пользователях/каналах.
|
||
:param message:
|
||
:param users:
|
||
:param channels:
|
||
:return:
|
||
"""
|
||
if message.fwd_from.from_id: # От пользователя
|
||
|
||
usr = self.get_entity(message.fwd_from.from_id) if not message.fwd_from.from_id in self.xmpp_gate.tg_dialogs[self.jid]['users'] else self.xmpp_gate.tg_dialogs[self.jid]['users'][message.fwd_from.from_id]
|
||
print(usr)
|
||
if usr.access_hash:
|
||
self.xmpp_gate.tg_dialogs[self.jid]['users'][message.fwd_from.from_id] = usr
|
||
fwd_from = display_tg_name(usr.first_name, usr.last_name)
|
||
else:
|
||
fwd_from = '<Unknown User>'
|
||
|
||
if message.fwd_from.channel_id: # От канала
|
||
fwd_from = '<Channel {}>'.format(message.fwd_from.channel_id)
|
||
|
||
# let's construct
|
||
fwd_reply = '|Forwarded from [{}]|'.format(fwd_from)
|
||
return fwd_reply
|
||
|
||
def _process_media_msg(self, media):
|
||
"""
|
||
Обрабатывает медиа-вложения в сообщениях. Добавляет их в очередь на загрузку. Производит разбор с генерацию
|
||
готового для вывода сообщения с информацией о медиа и сгенерированной ссылкой на него.
|
||
:param media:
|
||
:return:
|
||
"""
|
||
msg = ''
|
||
# print(var_dump(media))
|
||
|
||
if type(media) is MessageMediaDocument: # Документ или замаскированная сущность
|
||
attributes = media.document.attributes
|
||
attributes_types = [type(a) for a in attributes] # Документами могут быть разные вещи и иметь аттрибуты
|
||
|
||
size_text = '|Size: {:.2f} Mb'.format(media.document.size / 1024 / 1024)
|
||
|
||
if media.document.size > self.xmpp_gate.config['media_max_download_size']: # Не загружаем большие файлы
|
||
g_link = {'link': 'File is too big to be downloaded via Telegram <---> XMPP Gateway. Sorry.'}
|
||
else:
|
||
g_link = self.generate_media_link(media) # Добавляем файл в очередь на загрузку в отдельном потоке
|
||
self._media_queue.put({'media': media, 'file': g_link['name']})
|
||
|
||
attr_fn = self.get_document_attribute(attributes, DocumentAttributeFilename)
|
||
if attr_fn: # Если есть оригинальное имя файла, то выводим
|
||
msg = '[FileName:{}{}] {}'.format(attr_fn.file_name, size_text, g_link['link'])
|
||
else:
|
||
msg = g_link['link']
|
||
|
||
if DocumentAttributeSticker in attributes_types: # Стикер
|
||
smile = self.get_document_attribute(attributes, DocumentAttributeSticker).alt
|
||
msg = '[Sticker {}] {}'.format(smile, g_link['link']) # У стикеров свой формат вывода
|
||
elif DocumentAttributeAudio in attributes_types: # Аудио файл / Голосовое сообщение
|
||
attr_a = self.get_document_attribute(attributes, DocumentAttributeAudio)
|
||
|
||
if attr_a.voice: # Голосовое сообщение
|
||
msg = '[VoiceMessage|{} sec] {}'.format(attr_a.duration, g_link['link']) # Тоже свой формат
|
||
else: # Приложенный аудиофайл, добавляем возможную информацию из его тегов
|
||
attr_f = self.get_document_attribute(attributes, DocumentAttributeFilename)
|
||
msg = '[Audio|File:{}{}|Performer:{}|Title:{}|Duration:{} sec] {}' \
|
||
.format(attr_f.file_name, size_text, attr_a.performer, attr_a.title,
|
||
attr_a.duration, g_link['link'])
|
||
elif DocumentAttributeVideo in attributes_types: # Видео
|
||
video_type = 'Video'
|
||
video_file = ''
|
||
caption = ''
|
||
|
||
if DocumentAttributeAnimated in attributes_types: # Проверка на "gif"
|
||
video_type = 'AnimatedVideo'
|
||
|
||
if DocumentAttributeFilename in attributes_types: # Если есть оригинальное имя файла - указываем
|
||
attr_v = self.get_document_attribute(attributes, DocumentAttributeFilename)
|
||
video_file = '|File:{}'.format(attr_v.file_name)
|
||
|
||
if media.caption:
|
||
caption = media.caption + ' '
|
||
|
||
# Тоже свой формат
|
||
msg = '[{}{}{}] {}{}'.format(video_type, video_file, size_text, caption, g_link['link'])
|
||
elif type(media) is MessageMediaPhoto: # Фотография (сжатая, jpeg)
|
||
g_link = self.generate_media_link(media)
|
||
msg = g_link['link']
|
||
|
||
self._media_queue.put({'media': media, 'file': g_link['name']})
|
||
|
||
if media.caption: # Если есть описание - указываем
|
||
msg = '{} {}'.format(media.caption, msg)
|
||
|
||
elif type(media) is MessageMediaContact: # Контакт (с номером)
|
||
msg = 'First name: {} / Last name: {} / Phone: {}'\
|
||
.format(media.first_name, media.last_name, media.phone_number)
|
||
elif type(media) in [MessageMediaGeo, MessageMediaVenue]: # Адрес на карте
|
||
map_link_template = 'https://maps.google.com/maps?q={0:.4f},{1:.4f}&ll={0:.4f},{1:.4f}&z=16'
|
||
map_link = map_link_template.format(media.geo.lat, media.geo.long)
|
||
msg = map_link
|
||
|
||
if type(media) is MessageMediaVenue:
|
||
msg = '[Title: {}|Address: {}|Provider: {}] {}'.format(media.title, media.address, media.provider, msg)
|
||
|
||
return msg
|
||
|
||
@staticmethod
|
||
def _process_info_msg(message, users):
|
||
"""
|
||
Обрабатывает информационные сообщения в групповых чатах. Возвращает готовое для вывода сообщение.
|
||
:param message:
|
||
:param users:
|
||
:return:
|
||
"""
|
||
alt_msg = None
|
||
nickname = display_tg_name(users[0].first_name, users[0].last_name)
|
||
uid = users[0].id
|
||
|
||
# MessageActionChatEditPhoto
|
||
|
||
# Создана супергруппа
|
||
if type(message.action) is MessageActionChannelCreate:
|
||
# Пока нет смысла - поддержка каналов не реализована
|
||
pass
|
||
# Создана группа
|
||
elif type(message.action) is MessageActionChatCreate:
|
||
pass
|
||
# Добавлен пользователь в чат
|
||
elif type(message.action) is MessageActionChatAddUser:
|
||
if len(users) == 2: # Кто-то добавил другого пользователя
|
||
j_name = display_tg_name(users[1].first_name, users[1].last_name)
|
||
j_uid = users[1].id
|
||
alt_msg = 'User [{}] (UID:{}) added [{}] (UID:{})'.format(nickname, uid,
|
||
j_name, j_uid)
|
||
else: # Пользователь вошел сам
|
||
alt_msg = 'User [{}] (UID:{}) joined'.format(nickname, uid)
|
||
# Пользователь удален/вышел/забанен
|
||
elif type(message.action) is MessageActionChatDeleteUser:
|
||
pass
|
||
# Пользователь вошел по инвайт ссылке
|
||
elif type(message.action) is MessageActionChatJoinedByLink:
|
||
alt_msg = 'User [{}] (UID:{}) joined via invite link'.format(nickname, uid)
|
||
# Изменено название чата
|
||
elif type(message.action) is MessageActionChatEditTitle:
|
||
g_title = message.action.title
|
||
alt_msg = 'User [{}] (UID:{}) changed title to [{}]'.format(nickname, uid, g_title)
|
||
# Прикреплено сообщение в чате
|
||
elif type(message.action) is MessageActionPinMessage:
|
||
# Notify all members реализовано путем указания, что пользователя упомянули,
|
||
# то есть флаг mentioned=True. Но для транспорта он не имеет смысла.
|
||
p_mid = message.reply_to_msg_id # Наркоманы
|
||
alt_msg = 'User [{}] (UID:{}) pinned message with MID:{}'.format(nickname, uid, p_mid)
|
||
# Группа была преобразована в супергруппу
|
||
elif type(message.action) is MessageActionChatMigrateTo:
|
||
# Это сложный ивент, который ломает текущую реализацию хендлинга
|
||
# (ибо в доках, которых нет, не сказано, что так можно было)
|
||
# Пусть полежит до рефакторинга
|
||
pass
|
||
# Супергруппа была технически создана из группы
|
||
elif type(message.action) is MessageActionChannelMigrateFrom:
|
||
# ---...---...---
|
||
# ---...---...---
|
||
# ---...---...---
|
||
pass
|
||
|
||
return alt_msg
|
||
|
||
def get_cached_message(self, dlg_id, msg_id, user=False, group=False, supergroup=False):
|
||
"""
|
||
Получает из кэша сообщение диалога указанной группы (для работы цитат в последних сообщениях)
|
||
:param dlg_id:
|
||
:param msg_id:
|
||
:param user:
|
||
:param group:
|
||
:param supergroup:
|
||
:return:
|
||
"""
|
||
if user:
|
||
obj = self._message_cache_users
|
||
elif group:
|
||
obj = self._message_cache_groups
|
||
elif supergroup:
|
||
obj = self._message_cache_supergroups
|
||
else:
|
||
return None
|
||
|
||
if dlg_id in obj:
|
||
if msg_id in obj[dlg_id]:
|
||
return obj[dlg_id][msg_id]
|
||
|
||
return None
|
||
|
||
def set_cached_message(self, dlg_id, msg_id, msg, user=False, group=False, supergroup=False):
|
||
"""
|
||
Кэширует сообщение из диалога указанной группы (для работы цитат в последних сообщениях)
|
||
:param dlg_id:
|
||
:param msg_id:
|
||
:param msg:
|
||
:param user:
|
||
:param group:
|
||
:param supergroup:
|
||
:return:
|
||
"""
|
||
if user:
|
||
obj = self._message_cache_users
|
||
elif group:
|
||
obj = self._message_cache_groups
|
||
elif supergroup:
|
||
obj = self._message_cache_supergroups
|
||
else:
|
||
return
|
||
|
||
if dlg_id not in obj:
|
||
obj[dlg_id] = dict()
|
||
|
||
obj[dlg_id][msg_id] = msg
|
||
|
||
# Удаляем старые сообщения из кэша
|
||
if len(obj[dlg_id]) > self.xmpp_gate.config['messages_max_max_cache_size']:
|
||
del obj[dlg_id][sorted(obj[dlg_id].keys())[0]]
|
||
|
||
def media_thread_downloader(self):
|
||
"""
|
||
Этот метод запускается в отдельном потоке и скачивает по очереди все медиа вложения из сообщений
|
||
:return:
|
||
"""
|
||
while True:
|
||
try:
|
||
if self._media_queue.empty(): # Нет медиа в очереди - спим
|
||
time.sleep(0.1)
|
||
else: # Иначе скачиваем медиа
|
||
print('MTD ::: Queue is not empty. Downloading...')
|
||
media = self._media_queue.get()
|
||
file_path = self.xmpp_gate.config['media_store_path'] + media['file']
|
||
if os.path.isfile(file_path):
|
||
print('MTD ::: File already exists')
|
||
else:
|
||
self.download_media(media['media'], file_path, False)
|
||
print('MTD ::: Media downloaded')
|
||
except Exception:
|
||
print(traceback.format_exc())
|