#!/usr/bin/python3 import mailbox import jinja2 import email.header import email.utils import yaml import datetime import sys, os import pypandoc import argparse import quopri import requests import json import subprocess import re from pprint import pprint CONFIG_FILE = "generator.conf" WEEKDAYS = { 0: "Montag", 1: "Dienstag", 2: "Mittwoch", 3: "Donnerstag", 4: "Freitag", 5: "Samstag", 6: "Sonntag" } def decode_header(header): decoded_header = email.header.decode_header(header)[0] encoding = decoded_header[1] or "ascii" if encoding == "unknown-8bit": encoding = "ascii" return decoded_header[0].decode(encoding, errors="replace") if isinstance(decoded_header[0], bytes) else decoded_header[0] def get_body_text(msg): # from https://stackoverflow.com/a/1463144 for part in msg.walk(): # each part is a either non-multipart, or another multipart message # that contains further parts... Message is organized like a tree if part.get_content_type() == 'text/plain': payload = part.get_payload() if part["Content-Transfer-Encoding"] == "quoted-printable": payload = quopri.decodestring(payload.encode("ascii")).decode(part.get_content_charset("utf-8")) return payload # from https://stackoverflow.com/a/49986645 def deEmojify(text): regrex_pattern = re.compile(pattern = "[" u"\U0001F600-\U0001F64F" # emoticons u"\U0001F300-\U0001F5FF" # symbols & pictographs u"\U0001F680-\U0001F6FF" # transport & map symbols u"\U0001F1E0-\U0001F1FF" # flags (iOS) "]+", flags = re.UNICODE) return regrex_pattern.sub(r'',text) class Top: def __init__(self, title=None, sender=None, body=None, protostub=None, message=None): if message: subject = message["Subject"] needs_stripping = subject[:6] == "[top] " self.title = deEmojify(decode_header(subject[6:] if needs_stripping else subject)) real_name, address = email.utils.parseaddr(message["From"]) real_name = decode_header(real_name) self.sender = real_name or address payload = get_body_text(message) self.body = str(payload.rpartition("\n--")[0] if "\n--" in payload else payload) elif title: self.title = title self.sender = sender self.body = body self.protostub = protostub else: raise ValueError("One of title or message is needed") if self.body is None: self.body = "" def __repr__(self): return "" # from https://stackoverflow.com/questions/6558535/find-the-date-for-the-first-monday-after-a-given-a-date def next_weekday(d, weekday): days_ahead = weekday - d.weekday() if days_ahead < 0: # Target day already happened this week days_ahead += 7 return d + datetime.timedelta(days_ahead) def last_weekday(d, weekday): days_ahead = weekday - d.weekday() if days_ahead >= 0: # Target day already happened this week days_ahead -= 7 return d + datetime.timedelta(days_ahead) def wiki2latex(intext): intext = intext.replace(":\n", ":\n\n") return pypandoc.convert_text(intext, 'latex', format='md') def j2replace(intext): return j2env.from_string(intext).render(context) def date(indate): return indate.strftime("%d.%m.%Y") def time(intime): return intime.strftime("%H:%M") def weekday(indate): return WEEKDAYS[indate.weekday()] def prototop(top): result = "" if "protostub" in dir(top) and top.protostub: result = j2env.from_string(top.protostub).render(context, top=top) elif top.body: result = j2env.from_string(top.body).render(context) for search,replace in config["protoreplace"].items(): result = result.replace(search, replace) return result def conf2top(top): sender = None body = None protostub = None try: sender = top["sender"] except KeyError: pass try: body = top["body"] except KeyError: pass try: protostub = top["protostub"] except KeyError: pass if "file" in top: try: body = open(top["file"]).read() except OSError as e: print("Warning: Error opening", top["file"], file=sys.stderr) if "command" in top: body = subprocess.run(top["command"], shell=True, text=True, capture_output=True, check=True).stdout if "proto_command" in top: protostub = subprocess.run(top["proto_command"], shell=True, text=True, capture_output=True, check=True).stdout return Top(top["title"], sender, body, protostub) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--config", "-c", default=CONFIG_FILE) mode = parser.add_mutually_exclusive_group(required=True) mode.add_argument("--invite", action="store_true") mode.add_argument("--mm-invite", action="store_true") mode.add_argument("--presentation", action="store_true") mode.add_argument("--protocol", action="store_true") parser.add_argument("--debug", action="store_true", help=argparse.SUPPRESS) parser.add_argument("--write-mbox", action="store_true") parser.add_argument("--send-mm", action="store_true") parser.add_argument("--send-mail", action="store_true") args = parser.parse_args() config = yaml.safe_load(open(args.config)) if config["redeleitung"]["name"].startswith("./"): with open(config["redeleitung"]["name"]) as f: config["redeleitung"]["name"] = f.read().strip() if config["protokoll"]["name"].startswith("./"): with open(config["protokoll"]["name"]) as f: config["protokoll"]["name"] = f.read().strip() if config["place"].startswith("./"): with open(config["place"]) as f: config["place"] = f.read().strip() if args.invite: template_file = config["invite_template_file"] elif args.presentation: template_file = config["presentation_template_file"] elif args.mm_invite: template_file = config["mminvite_template_file"] elif args.protocol: template_file = config["protocol_template_file"] else: raise Exception("Should never happen") j2env = jinja2.Environment() j2env.filters["wiki2latex"] = wiki2latex j2env.filters["j2replace"] = j2replace j2env.filters["date"] = date j2env.filters["time"] = time j2env.filters["weekday"] = weekday j2env.filters["prototop"] = prototop template = j2env.from_string(open(template_file).read()) mbox = mailbox.mbox(config["top_mbox_file"]) current_date = next_weekday(datetime.date.today(), config["default_weekday"]) next_date = current_date + datetime.timedelta(days=7) last_date = last_weekday(current_date, config["default_weekday"]) time = datetime.time.fromisoformat(config["default_time"]) pre_tops = [] post_tops = [] for top in config["pre_tops"]: pre_tops.append(conf2top(top)) for top in config["post_tops"]: post_tops.append(conf2top(top)) email_tops = [] for message in mbox: if args.debug: print(message.get_payload()) top = Top(message=message) email_tops.append(top) to = pre_tops + email_tops + post_tops context = {"to": to, "redeleitung": config["redeleitung"], "protokoll": config["protokoll"], "date": current_date, "time": time, "place": config["place"], "next_date": next_date, "last_date": last_date, "meeting_link": config["meeting_link"], "email_tops": email_tops, "WEEKDAYS": WEEKDAYS} if args.debug: for top in to: pprint(top.__dict__) pprint(context) elif args.send_mail: msg = email.message.EmailMessage() msg.set_content(template.render(context)) msg["Subject"] = j2env.from_string(config["invite_subject"]).render(context) msg["From"] = email.utils.formataddr((config["redeleitung"]["name"], config["redeleitung"]["email"])) msg["To"] = config["invite_mail"] subprocess.run([*config["sendmail"], config["invite_mail"]], input=str(msg), text=True) elif args.write_mbox: msg = email.message.EmailMessage() msg.set_content(template.render(context)) msg["Subject"] = j2env.from_string(config["invite_subject"]).render(context) msg["From"] = email.utils.formataddr((config["redeleitung"]["name"], config["redeleitung"]["email"])) msg["To"] = config["invite_mail"] mbox = mailbox.mbox(config["mbox_out"]) mbox.add(msg) mbox.close() print(mbox) elif args.send_mm: headers = {'Content-Type': 'application/json',} values = json.dumps({ "text": template.render(context), "username": "test"}) response = requests.post(config["mm_url"], headers=headers, data=values) else: print(template.render(context))