[FIX] fixed sending subscription request from groups that are removed or from what you've been left and also fixed double auth request
[UPD] now using telethon version == 0.18
[UPD] code optimized and reworked
[UPD] status updates are moved to separate thread to use timer
[UPD] slightly changed status handling (now Available has "Online" status message, "Last seen recently" now is away, not XA, "Last seen ..." is now XA, "Last seen long time ago" is now DND, chats are ffc)
[UPD] command "!del" removed and replaced with another, see above
[UPD] configuration options `xmpp_use_roster_exchange` and `xmpp_keep_online` was removed from configuration file
[UPD] [BREAK] database structure was changed; please, remove and re-create db.sqlite
[ADD] [BREAK] new options in config file: `logfile` (please, specify it!), and unneccessarry `tg_server_ip`, `tg_server_port`, `tg_server_dc`
[ADD] per-user configuration, parameters stored in database. configurable params:
- use_roster_exchange: use XEP-0144 for roster import (default: false) (recommended: true, if your client supports that XEP)
- keep_online: keep telegram session even if jabber goes offline (default: false) (recommended: true, if you wants to receive all events as offline messages when you will go online)
- status_update_interval: interval (sec.) in what we will update all contact statuses to prevent presence spamming, because telegram sending status updates every fucking second (default: 60)
To modify your personal config, please, send !configure to gateway
[ADD] added new commands to gateway:
!configure (for configuration update)
!add @contact (to find Telegram contact and try to start conversation; any format accepted (t.me link, @username or maybe phone, I don't know... )
!join t.me/joinchat/secret (to join Telegram conference via invite link, https://t.me/joinchat/xxxxx accepted)
!group name @contact (try to create normal group with @contact; you can add more later) [UNTESTED]
!supergroup name (try to create supergroup) [UNTESTED]
!channel name (try to create channel) [UNTESTED]
!name first last (change telegram name)
!username usernme (change telegram @username)
!about some about text (change about text)
[ADD] added new commands to dialogs with normal users:
!help
!block (blacklists user)
!unblock (unblacklists user)
[ADD] added new commands to group/channel dialogs:
!help
!leave (leave current group or supergroup)
!invite (invite @user to current group/supergroup)
!kick (kicks @user to/from group/supergroup)
... and also small fixes and improvements
445 lines
21 KiB
Python
445 lines
21 KiB
Python
from telethon import TelegramClient
|
||
from telethon.utils import get_extension
|
||
from telethon.tl.types import UpdateShortMessage, UpdateShortChatMessage, UpdateEditMessage, UpdateDeleteMessages, \
|
||
UpdateNewMessage, UpdateUserStatus, UpdateShort, Updates, UpdateNewChannelMessage,\
|
||
UpdateChannelTooLong, UpdateDeleteChannelMessages, UpdateEditChannelMessage,\
|
||
UpdateUserName
|
||
from telethon.tl.types import InputPeerChat, InputPeerUser, InputPeerChannel, InputUser
|
||
from telethon.tl.types import MessageMediaDocument, MessageMediaPhoto, MessageMediaUnsupported, MessageMediaContact,\
|
||
MessageMediaGeo, MessageMediaEmpty, MessageMediaVenue
|
||
from telethon.tl.types import DocumentAttributeAnimated, DocumentAttributeAudio, DocumentAttributeFilename,\
|
||
DocumentAttributeSticker, DocumentAttributeVideo, DocumentAttributeHasStickers
|
||
from telethon.tl.types import Message, MessageService, MessageActionChannelCreate, MessageActionChannelMigrateFrom,\
|
||
MessageActionChatCreate, MessageActionChatAddUser, MessageActionChatDeleteUser,\
|
||
MessageActionChatEditTitle, MessageActionChatJoinedByLink, MessageActionChatMigrateTo,\
|
||
MessageActionPinMessage
|
||
from telethon.tl.types import UserStatusOnline, UserStatusOffline, UserStatusRecently
|
||
from telethon.tl.types import User, Chat, Channel
|
||
from telethon.tl.types import PeerUser, PeerChat, PeerChannel
|
||
from telethon.tl.functions.users import GetFullUserRequest
|
||
from telethon.tl.functions.messages import ReadHistoryRequest, GetFullChatRequest, GetMessagesRequest
|
||
from telethon.tl.functions.channels import ReadHistoryRequest as ReadHistoryChannel, GetParticipantRequest, GetMessagesRequest
|
||
from telethon.tl.functions.updates import GetDifferenceRequest
|
||
from telethon.tl.functions.contacts import ResolveUsernameRequest
|
||
|
||
import hashlib
|
||
import os
|
||
import queue
|
||
import threading
|
||
import time
|
||
from xmpp_tg.utils import display_tg_name
|
||
|
||
from .utils import var_dump
|
||
import traceback
|
||
|
||
|
||
class TelegramGateClient(TelegramClient):
|
||
def __init__(self, session, api_id, api_hash, xmpp_gate, jid, phone, proxy=None):
|
||
super().__init__(session, api_id, api_hash, proxy=proxy, update_workers = 4)
|
||
|
||
self.me = None
|
||
|
||
self.xmpp_gate = xmpp_gate
|
||
self.jid = jid
|
||
self.phone = phone
|
||
|
||
self._media_queue = queue.Queue()
|
||
self._media_thread = threading.Thread(name='MediaDownloaderThread', target=self.media_thread_downloader)
|
||
|
||
self._status_updates = dict()
|
||
self._status_update_thread = threading.Thread(name = 'StatusUpdateThread', target = self.status_updater_thread)
|
||
|
||
self._groups_users = dict()
|
||
self._message_cache_users = dict()
|
||
self._message_cache_groups = dict()
|
||
self._message_cache_supergroups = dict()
|
||
|
||
self._del_pts = 0
|
||
|
||
|
||
def xmpp_update_handler(self, obj):
|
||
|
||
print("We have received update for <%s>" % self.jid)
|
||
print(obj)
|
||
|
||
# we have received some updates, so we're logined and can get <me> object and start mtd / upd threads #
|
||
if not self.me:
|
||
me = self.get_me()
|
||
self.me = InputPeerUser(me.id, me.access_hash)
|
||
self._media_thread.start()
|
||
self._status_update_thread.start()
|
||
|
||
'''
|
||
Боты
|
||
Сделать запоминание ростера в бд
|
||
Сделать лучше хендлинг ошибок
|
||
Доделать все типы информационных сообщений
|
||
Сделать джойны по линкам в чаты/каналы
|
||
Сделать поиск и добавление пользователей
|
||
Сделать листание истории
|
||
Сделать отправку всех непрочтенных сообщений при входе
|
||
'''
|
||
|
||
# Здесь будет очень длинный пиздец ^__^
|
||
|
||
nl = '\n'
|
||
|
||
try:
|
||
|
||
# message from normal chat #
|
||
if type(obj) in [UpdateShortMessage] and not obj.out:
|
||
|
||
fwd_from = self._process_forward_msg(obj) if obj.fwd_from else '' # process forward messages
|
||
self.gate_send_message( mfrom='u' + str(obj.user_id), mbody = '[MSG {}] {}{}'.format(obj.id, fwd_from, obj.message) )
|
||
usr = self._get_user_information(obj.user_id) # get peer information
|
||
self.invoke(ReadHistoryRequest( InputPeerUser(usr.id, usr.access_hash), obj.id )) # delivery report
|
||
|
||
# message from normal group #
|
||
if type(obj) in [UpdateShortChatMessage] and not obj.out:
|
||
fwd_from = self._process_forward_msg(obj) if obj.fwd_from else '' # process forward messages
|
||
usr = self._get_user_information(obj.from_id)
|
||
nickname = display_tg_name(usr)
|
||
|
||
# send message
|
||
self.gate_send_message(mfrom='g' + str(obj.chat_id), mbody ='[MSG {}] [User: {}] {}{}'.format(obj.id, nickname, fwd_from, obj.message) )
|
||
self.invoke(ReadHistoryRequest(InputPeerChat(obj.chat_id), obj.id))
|
||
|
||
|
||
# message from supergroup or media message #
|
||
if type(obj) in [UpdateNewMessage, UpdateNewChannelMessage, UpdateEditMessage, UpdateEditChannelMessage] and not obj.message.out:
|
||
|
||
cid = None
|
||
msg = ''
|
||
fwd_from = ''
|
||
mid = obj.message.id
|
||
|
||
|
||
# detect message type
|
||
is_user = type(obj.message.to_id) is PeerUser
|
||
is_group = type(obj.message.to_id) is PeerChat
|
||
is_supergroup = type(obj.message.to_id) is PeerChannel
|
||
|
||
|
||
# detect from id
|
||
if is_user:
|
||
cid = obj.message.from_id
|
||
peer = InputPeerUser(cid, self.xmpp_gate.tg_dialogs[self.jid]['users'][cid].access_hash) if cid in self.xmpp_gate.tg_dialogs[self.jid]['users'] else None
|
||
prefix = 'u'
|
||
elif is_group:
|
||
cid = obj.message.to_id.chat_id
|
||
peer = InputPeerChat(cid)
|
||
prefix = 'g'
|
||
elif is_supergroup:
|
||
cid = obj.message.to_id.channel_id
|
||
peer = InputPeerChannel(cid, self.xmpp_gate.tg_dialogs[self.jid]['supergroups'][cid].access_hash) if cid in self.xmpp_gate.tg_dialogs[self.jid]['supergroups'] else None
|
||
prefix = 's'
|
||
|
||
# our message #
|
||
if type(obj.message) == MessageService:
|
||
obj.message.fwd_from, obj.message.post, obj.message.edit_date, obj.message.media = None, None, None, None
|
||
msg = self._process_info_msg(obj.message, peer)
|
||
elif type(obj.message) == Message:
|
||
msg = obj.message.message
|
||
|
||
|
||
# is forwarded?
|
||
if obj.message.fwd_from:
|
||
fwd_from = self._process_forward_msg(obj.message)
|
||
|
||
# maybe its channel? #
|
||
if obj.message.post:
|
||
prefix = 'c'
|
||
|
||
# get sender information from chat info #
|
||
if not is_user and not obj.message.post:
|
||
usr = self._get_user_information(obj.message.from_id)
|
||
nickname = display_tg_name(usr)
|
||
msg = '[User: {}] {}'.format(nickname, msg)
|
||
|
||
|
||
# message media #
|
||
if obj.message.media:
|
||
msg = '{} {}'.format( msg, self._process_media_msg(obj.message.media) )
|
||
|
||
# edited #
|
||
if obj.message.edit_date:
|
||
msg = '[Edited] {}'.format(msg)
|
||
|
||
# send message #
|
||
self.gate_send_message(prefix + str(cid), mbody = '[MSG {}] {}{}'.format(mid, fwd_from, msg) )
|
||
|
||
# delivery report
|
||
if is_supergroup:
|
||
self.invoke(ReadHistoryChannel(peer, mid))
|
||
else:
|
||
self.invoke(ReadHistoryRequest(peer, mid))
|
||
|
||
# Status Updates #
|
||
if type(obj) is UpdateUserStatus:
|
||
|
||
# process status update #
|
||
if type(obj.status) is UserStatusOnline:
|
||
self._status_updates[str(obj.user_id)] = { 'status': None, 'message': 'Online' }
|
||
elif type(obj.status) is UserStatusOffline:
|
||
self._status_updates[str(obj.user_id)] = { 'status': 'xa', 'message': obj.status.was_online.strftime('Last seen at %H:%M %d/%m/%Y') }
|
||
elif type(obj.status) is UserStatusRecently:
|
||
self._status_updates[str(obj.user_id)] = { 'status': 'away', 'message': 'Last seen recently' }
|
||
else:
|
||
pass
|
||
|
||
|
||
except Exception:
|
||
print('Exception occurs!')
|
||
print(traceback.format_exc())
|
||
|
||
def gate_send_message(self, mfrom, mbody):
|
||
tg_from = int(mfrom[1:])
|
||
if not tg_from in self.xmpp_gate.tg_dialogs[self.jid]['users'] and not tg_from in self.xmpp_gate.tg_dialogs[self.jid]['groups'] and not tg_from in self.xmpp_gate.tg_dialogs[self.jid]['supergroups']: # new contact appeared
|
||
self.xmpp_gate.tg_process_dialogs( self.jid )
|
||
|
||
self.xmpp_gate.send_message( mto=self.jid, mfrom=mfrom + '@' + self.xmpp_gate.config['jid'], mtype='chat', mbody=mbody)
|
||
|
||
def generate_media_link(self, media):
|
||
"""
|
||
Генерирует будующее имя и ссылку на скачиваемое медиа-вложения из сообщения
|
||
:param media:
|
||
:return:
|
||
"""
|
||
if type(media) is MessageMediaPhoto:
|
||
media_id = media.photo.id
|
||
elif type(media) is MessageMediaDocument:
|
||
media_id = media.document.id
|
||
else:
|
||
return None
|
||
|
||
ext = get_extension(media)
|
||
if ext == '.oga':
|
||
ext = '.ogg'
|
||
|
||
file_name = hashlib.new('sha256')
|
||
file_name.update(str(media_id).encode('ascii'))
|
||
file_name.update(str(os.urandom(2)).encode('ascii'))
|
||
file_name = file_name.hexdigest() + ext
|
||
|
||
link = self.xmpp_gate.config['media_web_link_prefix'] + file_name
|
||
|
||
return {'name': file_name, 'link': link}
|
||
|
||
@staticmethod
|
||
def get_document_attribute(attributes, match):
|
||
"""
|
||
Находит заданных аттрибут в списке. Используется при разборе медиа-вложений типа Документ.
|
||
:param attributes:
|
||
:param match:
|
||
:return:
|
||
"""
|
||
for attrib in attributes:
|
||
if type(attrib) == match:
|
||
return attrib
|
||
return None
|
||
|
||
def _get_user_information(self, uid):
|
||
|
||
if uid in self.xmpp_gate.tg_dialogs[self.jid]['users']:
|
||
return self.xmpp_gate.tg_dialogs[self.jid]['users'][uid]
|
||
|
||
entity = self.get_entity(uid)
|
||
if entity.access_hash:
|
||
self.xmpp_gate.tg_dialogs[self.jid]['users'][uid] = entity
|
||
return entity
|
||
else:
|
||
return {'first_name': 'Unknown', 'last_name': 'user', 'access_hash': -1, 'id': 0}
|
||
|
||
|
||
def _process_forward_msg(self, message):
|
||
"""
|
||
Обрабатывает информацию в пересланном сообщении (от кого оно и/или из какого канала). Требует дополнительно
|
||
предоставление информации об пользователях/каналах.
|
||
:param message:
|
||
:param users:
|
||
:param channels:
|
||
:return:
|
||
"""
|
||
if message.fwd_from.from_id: # От пользователя
|
||
|
||
usr = self._get_user_information(message.fwd_from.from_id)
|
||
fwd_from = display_tg_name(usr)
|
||
|
||
if message.fwd_from.channel_id: # От канала
|
||
fwd_from = 'Channel {}'.format(message.fwd_from.channel_id)
|
||
|
||
# let's construct
|
||
fwd_reply = '|Forwarded from [{}]|'.format(fwd_from)
|
||
return fwd_reply
|
||
|
||
def _process_media_msg(self, media):
|
||
"""
|
||
Обрабатывает медиа-вложения в сообщениях. Добавляет их в очередь на загрузку. Производит разбор с генерацию
|
||
готового для вывода сообщения с информацией о медиа и сгенерированной ссылкой на него.
|
||
:param media:
|
||
:return:
|
||
"""
|
||
msg = ''
|
||
|
||
if type(media) is MessageMediaDocument: # Документ или замаскированная сущность
|
||
attributes = media.document.attributes
|
||
attributes_types = [type(a) for a in attributes] # Документами могут быть разные вещи и иметь аттрибуты
|
||
|
||
size_text = '|Size: {:.2f} Mb'.format(media.document.size / 1024 / 1024)
|
||
|
||
if media.document.size > self.xmpp_gate.config['media_max_download_size']: # Не загружаем большие файлы
|
||
g_link = {'link': 'File is too big to be downloaded via Telegram <---> XMPP Gateway. Sorry.'}
|
||
else:
|
||
g_link = self.generate_media_link(media) # Добавляем файл в очередь на загрузку в отдельном потоке
|
||
self._media_queue.put({'media': media, 'file': g_link['name']})
|
||
|
||
attr_fn = self.get_document_attribute(attributes, DocumentAttributeFilename)
|
||
if attr_fn: # Если есть оригинальное имя файла, то выводим
|
||
msg = '[FileName:{}{}] {}'.format(attr_fn.file_name, size_text, g_link['link'])
|
||
else:
|
||
msg = g_link['link']
|
||
|
||
if DocumentAttributeSticker in attributes_types: # Стикер
|
||
smile = self.get_document_attribute(attributes, DocumentAttributeSticker).alt
|
||
msg = '[Sticker {}] {}'.format(smile, g_link['link']) # У стикеров свой формат вывода
|
||
elif DocumentAttributeAudio in attributes_types: # Аудио файл / Голосовое сообщение
|
||
attr_a = self.get_document_attribute(attributes, DocumentAttributeAudio)
|
||
|
||
if attr_a.voice: # Голосовое сообщение
|
||
msg = '[VoiceMessage|{} sec] {}'.format(attr_a.duration, g_link['link']) # Тоже свой формат
|
||
else: # Приложенный аудиофайл, добавляем возможную информацию из его тегов
|
||
attr_f = self.get_document_attribute(attributes, DocumentAttributeFilename)
|
||
msg = '[Audio|File:{}{}|Performer:{}|Title:{}|Duration:{} sec] {}' \
|
||
.format(attr_f.file_name, size_text, attr_a.performer, attr_a.title,
|
||
attr_a.duration, g_link['link'])
|
||
elif DocumentAttributeVideo in attributes_types: # Видео
|
||
video_type = 'Video'
|
||
video_file = ''
|
||
caption = ''
|
||
|
||
if DocumentAttributeAnimated in attributes_types: # Проверка на "gif"
|
||
video_type = 'AnimatedVideo'
|
||
|
||
if DocumentAttributeFilename in attributes_types: # Если есть оригинальное имя файла - указываем
|
||
attr_v = self.get_document_attribute(attributes, DocumentAttributeFilename)
|
||
video_file = '|File:{}'.format(attr_v.file_name)
|
||
|
||
if media.caption:
|
||
caption = media.caption + ' '
|
||
|
||
# Тоже свой формат
|
||
msg = '[{}{}{}] {}{}'.format(video_type, video_file, size_text, caption, g_link['link'])
|
||
elif type(media) is MessageMediaPhoto: # Фотография (сжатая, jpeg)
|
||
g_link = self.generate_media_link(media)
|
||
msg = g_link['link']
|
||
|
||
self._media_queue.put({'media': media, 'file': g_link['name']})
|
||
|
||
if media.caption: # Если есть описание - указываем
|
||
msg = '{} {}'.format(media.caption, msg)
|
||
|
||
elif type(media) is MessageMediaContact: # Контакт (с номером)
|
||
msg = 'First name: {} / Last name: {} / Phone: {}'\
|
||
.format(media.first_name, media.last_name, media.phone_number)
|
||
elif type(media) in [MessageMediaGeo, MessageMediaVenue]: # Адрес на карте
|
||
map_link_template = 'https://maps.google.com/maps?q={0:.4f},{1:.4f}&ll={0:.4f},{1:.4f}&z=16'
|
||
map_link = map_link_template.format(media.geo.lat, media.geo.long)
|
||
msg = map_link
|
||
|
||
if type(media) is MessageMediaVenue:
|
||
msg = '[Title: {}|Address: {}|Provider: {}] {}'.format(media.title, media.address, media.provider, msg)
|
||
|
||
return msg
|
||
|
||
def _process_info_msg(self, message, peer):
|
||
"""
|
||
Обрабатывает информационные сообщения в групповых чатах. Возвращает готовое для вывода сообщение.
|
||
:param message:
|
||
:param users:
|
||
:return:
|
||
"""
|
||
|
||
msg = ''
|
||
usr = self._get_user_information(message.from_id)
|
||
nickname = display_tg_name(usr)
|
||
|
||
# supergroup created #
|
||
if type(message.action) is MessageActionChannelCreate:
|
||
pass
|
||
|
||
# group created #
|
||
elif type(message.action) is MessageActionChatCreate:
|
||
pass
|
||
|
||
# user added #
|
||
elif type(message.action) is MessageActionChatAddUser:
|
||
added_users = []
|
||
for user_id in message.action.users:
|
||
usr = self._get_user_information(user_id)
|
||
added_users.append(display_tg_name(usr))
|
||
|
||
msg = 'User [{}] has just invited [{}]'.format(nickname, ','.join(added_users))
|
||
|
||
# user exit #
|
||
elif type(message.action) is MessageActionChatDeleteUser:
|
||
usr = self._get_user_information(message.action.user_id)
|
||
msg = 'User [{}] has just left the room'.format(display_tg_name(usr))
|
||
|
||
# user joined #
|
||
elif type(message.action) is MessageActionChatJoinedByLink:
|
||
usr = self._get_user_information(message.action.user_id)
|
||
msg = 'User [{}] joined the room'.format(display_tg_name(usr))
|
||
|
||
# chat name modified #
|
||
elif type(message.action) is MessageActionChatEditTitle:
|
||
msg = 'User [{}] changed title to [{}]'.format(nickname, message.action.title)
|
||
|
||
# pinned message
|
||
elif type(message.action) is MessageActionPinMessage:
|
||
pinned_mid = message.reply_to_msg_id # target message
|
||
message_req = self.invoke(GetMessagesRequest(peer, [pinned_mid]))
|
||
if len(message_req.messages) > 0:
|
||
pinned_message = message_req.messages[0].message
|
||
pinned_from = self._get_user_information(message_req.messages[0].from_id)
|
||
msg = 'User [{}] pinned message: [{}]: {}'.format(nickname, display_tg_name(pinned_from), pinned_message)
|
||
|
||
# group converted to supergroup
|
||
elif type(message.action) in [MessageActionChatMigrateTo, MessageActionChannelMigrateFrom]:
|
||
pass
|
||
|
||
return msg
|
||
|
||
def media_thread_downloader(self):
|
||
"""
|
||
Этот метод запускается в отдельном потоке и скачивает по очереди все медиа вложения из сообщений
|
||
:return:
|
||
"""
|
||
while True:
|
||
try:
|
||
if self._media_queue.empty(): # Нет медиа в очереди - спим
|
||
time.sleep(0.1)
|
||
else: # Иначе скачиваем медиа
|
||
print('MTD ::: Queue is not empty. Downloading...')
|
||
media = self._media_queue.get()
|
||
file_path = self.xmpp_gate.config['media_store_path'] + media['file']
|
||
if os.path.isfile(file_path):
|
||
print('MTD ::: File already exists')
|
||
else:
|
||
self.download_media(media['media'], file_path, False)
|
||
print('MTD ::: Media downloaded')
|
||
except Exception:
|
||
print(traceback.format_exc())
|
||
|
||
def status_updater_thread(self):
|
||
|
||
while True:
|
||
try:
|
||
if len(self._status_updates) > 0:
|
||
for uid, status in self._status_updates.items():
|
||
self.xmpp_gate.send_presence( pto=self.jid, pfrom='u'+str(uid)+'@'+self.xmpp_gate.config['jid'], pshow = status['status'], pstatus = status['message'] )
|
||
except Exception:
|
||
print(traceback.format_exc())
|
||
|
||
self._status_updates = dict()
|
||
time.sleep( self.xmpp_gate.accounts[self.jid]['status_update_interval'])
|