Files
fsr_generator/helpers/generate.py
2022-03-15 14:42:18 +01:00

347 lines
12 KiB
Python
Executable File

#!/usr/bin/python3
import mailbox
import jinja2
import email.header
import email.utils
import yaml
import datetime
import sys, os
import pypandoc
import argparse
import quopri
import requests
import json
import subprocess
import re
from pprint import pprint
from pathlib import Path
CONFIG_FILE = "generator.conf"
WEEKDAYS = { 0: "Montag",
1: "Dienstag",
2: "Mittwoch",
3: "Donnerstag",
4: "Freitag",
5: "Samstag",
6: "Sonntag" }
IMPORT_RE = re.compile(r"@import\((.*)\)")
FILE_RE = re.compile(r"@file\((.*)\)")
def decode_header(header):
decoded_headers = email.header.decode_header(header)
header_strs = []
for dheader in decoded_headers:
encoding = dheader[1] or "ascii"
if encoding == "unknown-8bit":
encoding = "ascii"
result = dheader[0].decode(encoding, errors="replace") if isinstance(dheader[0], bytes) else dheader[0]
header_strs.append(result)
header_text = "".join(header_strs)
header_text = re.sub(r"\n(\s)", r" ", header_text)
#header_text = re.sub(r"\n(\s)", r"", header_text)
return header_text
def get_body_text(msg):
# from https://stackoverflow.com/a/1463144
for part in msg.walk():
# each part is a either non-multipart, or another multipart message
# that contains further parts... Message is organized like a tree
if part.get_content_type() == 'text/plain':
payload = part.get_payload()
if part["Content-Transfer-Encoding"] == "quoted-printable":
payload = quopri.decodestring(payload.encode("ascii")).decode(part.get_content_charset("utf-8"))
return payload
# from https://stackoverflow.com/a/49986645
def deEmojify(text):
regrex_pattern = re.compile(pattern = "["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags = re.UNICODE)
return regrex_pattern.sub(r'',text)
class Top:
def __init__(self, title=None, sender=None, body=None, protostub=None, message=None):
if message:
subject = message["Subject"]
needs_stripping = subject[:6] == "[top] "
self.title = deEmojify(decode_header(subject[6:] if needs_stripping else subject))
real_name, address = email.utils.parseaddr(message["From"])
real_name = decode_header(real_name)
self.sender = real_name or address
payload = get_body_text(message)
self.body = str(payload.rpartition("\n--")[0] if "\n--" in payload else payload)
elif title:
self.title = title
self.sender = sender
self.body = body
self.protostub = protostub
else:
raise ValueError("One of title or message is needed")
if self.body is None:
self.body = ""
def __repr__(self):
return "<TOP "+self.title+">"
# from https://stackoverflow.com/questions/6558535/find-the-date-for-the-first-monday-after-a-given-a-date
def next_weekday(d, weekday):
days_ahead = weekday - d.weekday()
if days_ahead < 0: # Target day already happened this week
days_ahead += 7
return d + datetime.timedelta(days_ahead)
def last_weekday(d, weekday):
days_ahead = weekday - d.weekday()
if days_ahead >= 0: # Target day already happened this week
days_ahead -= 7
return d + datetime.timedelta(days_ahead)
def wiki2latex(intext):
intext = intext.replace(":\n", ":\n\n")
return pypandoc.convert_text(intext, 'latex', format='md')
def j2replace(intext):
return j2env.from_string(intext).render(context)
def date(indate):
return indate.strftime("%d.%m.%Y")
def time(intime):
return intime.strftime("%H:%M")
def weekday(indate):
return WEEKDAYS[indate.weekday()]
def prototop(top):
result = ""
if "protostub" in dir(top) and top.protostub:
result = j2env.from_string(top.protostub).render(context, top=top)
elif top.body:
result = j2env.from_string(top.body).render(context)
for search,replace in config["protoreplace"].items():
result = result.replace(search, replace)
return result
def conf2top(top):
sender = None
body = None
protostub = None
try:
sender = top["sender"]
except KeyError:
pass
try:
body = top["body"]
except KeyError:
pass
try:
protostub = top["protostub"]
except KeyError:
pass
if "file" in top:
try:
body = open(top["file"]).read()
except OSError as e:
print("Warning: Error opening", top["file"], file=sys.stderr)
if "command" in top:
try:
body = subprocess.run(top["command"], shell=True, text=True, capture_output=True, check=True).stdout
except subprocess.CalledProcessError as e:
print(f"Warning: Command for '{top['title']}' returned non-zero exit code, output not used")
if "proto_command" in top:
try:
protostub = subprocess.run(top["proto_command"], shell=True, text=True, capture_output=True, check=True).stdout
except subprocess.CalledProcessError as e:
print(f"Warning: Protocommand for '{top['title']}' returned non-zero exit code, output not used")
return Top(top["title"], sender, body, protostub)
import_cache = {}
def get_imported_conf_entry(f, key):
if f not in import_cache:
import_cache[f] = yaml.safe_load(open(f))
return import_cache[f][key]
def do_imports(entry):
if isinstance(entry, dict):
d = entry.items()
result = {}
for key,value in d:
try:
result[key] = do_imports(value)
except KeyError:
pass
except OSError as e:
print(f"Warning: reading {key} yielded error {e}")
return result
if isinstance(entry, list):
l = iter(entry)
result = []
for item in l:
result.append(do_imports(item))
return result
if isinstance(entry, str):
match = IMPORT_RE.match(entry)
if match:
f, key = match.group(1).split(":")
return get_imported_conf_entry(f, key)
match = FILE_RE.match(entry)
if match:
with open(match.group(1)) as f:
return f.read().strip()
return entry
def get_config(f):
raw_config = yaml.safe_load(open(f))
config = do_imports(raw_config)
if "override_file" in config:
override_config = yaml.safe_load(open(config["override_file"]))
config.update(override_config)
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", default=CONFIG_FILE)
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--invite", action="store_true")
mode.add_argument("--mm-invite", action="store_true")
mode.add_argument("--presentation", action="store_true")
mode.add_argument("--protocol", action="store_true")
parser.add_argument("--debug", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--print-config", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--write-mbox", action="store_true")
parser.add_argument("--send-mm", action="store_true")
parser.add_argument("--send-mail", action="store_true")
parser.add_argument("--save", action="store_true")
parser.add_argument("--time")
parser.add_argument("--date")
args = parser.parse_args()
config = get_config(args.config)
if args.print_config:
pprint(config)
sys.exit(0)
if args.invite:
template_file = config["invite_template_file"]
elif args.presentation:
template_file = config["presentation_template_file"]
elif args.mm_invite:
template_file = config["mminvite_template_file"]
elif args.protocol:
template_file = config["protocol_template_file"]
else:
raise Exception("Should never happen")
j2env = jinja2.Environment()
j2env.filters["wiki2latex"] = wiki2latex
j2env.filters["j2replace"] = j2replace
j2env.filters["date"] = date
j2env.filters["time"] = time
j2env.filters["weekday"] = weekday
j2env.filters["prototop"] = prototop
template = j2env.from_string(open(template_file).read())
mbox = mailbox.mbox(config["top_mbox_file"])
current_date = next_weekday(datetime.date.today(), config["default_weekday"])
if args.date:
current_date = datetime.date.fromisoformat(args.date)
#next_date = current_date + datetime.timedelta(days=7)
next_date = next_weekday(current_date, config["default_weekday"])
last_date = last_weekday(current_date, config["default_weekday"])
time = datetime.time.fromisoformat(args.time or config["default_time"])
pre_tops = []
post_tops = []
for top in config["pre_tops"]:
pre_tops.append(conf2top(top))
for top in config["post_tops"]:
post_tops.append(conf2top(top))
email_tops = []
for message in mbox:
if args.debug:
print(message.get_payload())
top = Top(message=message)
email_tops.append(top)
to = pre_tops + email_tops + post_tops
context = {"to": to,
"redeleitung": config["redeleitung"],
"protokoll": config["protokoll"],
"date": current_date,
"time": time,
"place": config["place"],
"next_date": next_date,
"last_date": last_date,
"meeting_link": config["meeting_link"],
"email_tops": email_tops,
"WEEKDAYS": WEEKDAYS}
if args.debug:
for top in to:
pprint(top.__dict__)
pprint(context)
elif args.save:
if args.invite:
filename = Path(config["invite_save_path"]) / Path("invite_"+datetime.date.today().isoformat()+".txt")
elif args.presentation:
filename = Path(config["presentation_save_path"]) / Path("presentation_"+datetime.date.today().isoformat()+".tex")
elif args.protocol:
filename = Path(config["protocol_save_path"]) / Path(datetime.date.today().isoformat())
else:
raise Exception("Should never happen")
with open(filename, "w") as file:
file.write(template.render(context))
elif args.send_mail:
msg = email.message.EmailMessage()
msg.set_content(template.render(context))
msg["Subject"] = j2env.from_string(config["invite_subject"]).render(context)
msg["From"] = email.utils.formataddr((config["redeleitung"]["name"], config["redeleitung"]["email"]))
msg["To"] = config["invite_mail"]
subprocess.run([*config["sendmail"], config["invite_mail"]], input=str(msg), text=True, check=True)
elif args.write_mbox:
msg = email.message.EmailMessage()
msg.set_content(template.render(context))
msg["Subject"] = j2env.from_string(config["invite_subject"]).render(context)
msg["From"] = email.utils.formataddr((config["redeleitung"]["name"], config["redeleitung"]["email"]))
msg["To"] = config["invite_mail"]
mbox = mailbox.mbox(config["mbox_out"])
mbox.add(msg)
mbox.close()
print(mbox)
elif args.send_mm:
headers = {'Content-Type': 'application/json',}
values = json.dumps({ "text": template.render(context), "username": "test"})
response = requests.post(config["mm_url"], headers=headers, data=values)
else:
print(template.render(context))