major refactoring

* move all scripts not intended to be called directly to helpers/ dir
* introduce sequencer.py as replacement for various scripts
* introduce --save option to generate.py
* other smaller changes/bugfixes
This commit is contained in:
2022-03-09 15:52:02 +01:00
parent da116d7b01
commit 151f11d90b
18 changed files with 164 additions and 125 deletions

10
helpers/compile_presentation.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/sh
set -e
dest_file="presentation_$(date +%Y-%m-%d).tex"
echo Compiling
mkdir -p data/presentation
cd data/presentation/
latexmk -pdf "../$dest_file"
ln -srnf "presentation_$(date +%Y-%m-%d).pdf" ../../presentation.pdf

336
helpers/generate.py Executable file
View File

@@ -0,0 +1,336 @@
#!/usr/bin/python3
import mailbox
import jinja2
import email.header
import email.utils
import yaml
import datetime
import sys, os
import pypandoc
import argparse
import quopri
import requests
import json
import subprocess
import re
from pprint import pprint
from pathlib import Path
CONFIG_FILE = "generator.conf"
WEEKDAYS = { 0: "Montag",
1: "Dienstag",
2: "Mittwoch",
3: "Donnerstag",
4: "Freitag",
5: "Samstag",
6: "Sonntag" }
IMPORT_RE = re.compile(r"@import\((.*)\)")
FILE_RE = re.compile(r"@file\((.*)\)")
def decode_header(header):
decoded_headers = email.header.decode_header(header)
header_strs = []
for dheader in decoded_headers:
encoding = dheader[1] or "ascii"
if encoding == "unknown-8bit":
encoding = "ascii"
result = dheader[0].decode(encoding, errors="replace") if isinstance(dheader[0], bytes) else dheader[0]
header_strs.append(result)
header_text = "".join(header_strs)
header_text = re.sub(r"\n(\s)", r" ", header_text)
#header_text = re.sub(r"\n(\s)", r"", header_text)
return header_text
def get_body_text(msg):
# from https://stackoverflow.com/a/1463144
for part in msg.walk():
# each part is a either non-multipart, or another multipart message
# that contains further parts... Message is organized like a tree
if part.get_content_type() == 'text/plain':
payload = part.get_payload()
if part["Content-Transfer-Encoding"] == "quoted-printable":
payload = quopri.decodestring(payload.encode("ascii")).decode(part.get_content_charset("utf-8"))
return payload
# from https://stackoverflow.com/a/49986645
def deEmojify(text):
regrex_pattern = re.compile(pattern = "["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags = re.UNICODE)
return regrex_pattern.sub(r'',text)
class Top:
def __init__(self, title=None, sender=None, body=None, protostub=None, message=None):
if message:
subject = message["Subject"]
needs_stripping = subject[:6] == "[top] "
self.title = deEmojify(decode_header(subject[6:] if needs_stripping else subject))
real_name, address = email.utils.parseaddr(message["From"])
real_name = decode_header(real_name)
self.sender = real_name or address
payload = get_body_text(message)
self.body = str(payload.rpartition("\n--")[0] if "\n--" in payload else payload)
elif title:
self.title = title
self.sender = sender
self.body = body
self.protostub = protostub
else:
raise ValueError("One of title or message is needed")
if self.body is None:
self.body = ""
def __repr__(self):
return "<TOP "+self.title+">"
# from https://stackoverflow.com/questions/6558535/find-the-date-for-the-first-monday-after-a-given-a-date
def next_weekday(d, weekday):
days_ahead = weekday - d.weekday()
if days_ahead < 0: # Target day already happened this week
days_ahead += 7
return d + datetime.timedelta(days_ahead)
def last_weekday(d, weekday):
days_ahead = weekday - d.weekday()
if days_ahead >= 0: # Target day already happened this week
days_ahead -= 7
return d + datetime.timedelta(days_ahead)
def wiki2latex(intext):
intext = intext.replace(":\n", ":\n\n")
return pypandoc.convert_text(intext, 'latex', format='md')
def j2replace(intext):
return j2env.from_string(intext).render(context)
def date(indate):
return indate.strftime("%d.%m.%Y")
def time(intime):
return intime.strftime("%H:%M")
def weekday(indate):
return WEEKDAYS[indate.weekday()]
def prototop(top):
result = ""
if "protostub" in dir(top) and top.protostub:
result = j2env.from_string(top.protostub).render(context, top=top)
elif top.body:
result = j2env.from_string(top.body).render(context)
for search,replace in config["protoreplace"].items():
result = result.replace(search, replace)
return result
def conf2top(top):
sender = None
body = None
protostub = None
try:
sender = top["sender"]
except KeyError:
pass
try:
body = top["body"]
except KeyError:
pass
try:
protostub = top["protostub"]
except KeyError:
pass
if "file" in top:
try:
body = open(top["file"]).read()
except OSError as e:
print("Warning: Error opening", top["file"], file=sys.stderr)
if "command" in top:
body = subprocess.run(top["command"], shell=True, text=True, capture_output=True, check=args.allowfailcommand).stdout
if "proto_command" in top:
protostub = subprocess.run(top["proto_command"], shell=True, text=True, capture_output=True, check=args.allowfailcommand).stdout
return Top(top["title"], sender, body, protostub)
import_cache = {}
def get_imported_conf_entry(f, key):
if f not in import_cache:
import_cache[f] = yaml.safe_load(open(f))
return import_cache[f][key]
def do_imports(entry):
if isinstance(entry, dict):
d = entry.items()
result = {}
for key,value in d:
try:
result[key] = do_imports(value)
except KeyError:
pass
return result
if isinstance(entry, list):
l = iter(entry)
result = []
for item in l:
result.append(do_imports(item))
return result
if isinstance(entry, str):
match = IMPORT_RE.match(entry)
if match:
f, key = match.group(1).split(":")
return get_imported_conf_entry(f, key)
match = FILE_RE.match(entry)
if match:
with open(match.group(1)) as f:
return f.read().strip()
return entry
def get_config(f):
raw_config = yaml.safe_load(open(f))
config = do_imports(raw_config)
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", default=CONFIG_FILE)
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--invite", action="store_true")
mode.add_argument("--mm-invite", action="store_true")
mode.add_argument("--presentation", action="store_true")
mode.add_argument("--protocol", action="store_true")
parser.add_argument("--debug", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--print-config", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--write-mbox", action="store_true")
parser.add_argument("--send-mm", action="store_true")
parser.add_argument("--send-mail", action="store_true")
parser.add_argument("--save", action="store_true")
parser.add_argument("--allowfailcommand", action="store_false")
parser.add_argument("--time")
parser.add_argument("--date")
args = parser.parse_args()
config = get_config(args.config)
if args.print_config:
pprint(config)
sys.exit(0)
if args.invite:
template_file = config["invite_template_file"]
elif args.presentation:
template_file = config["presentation_template_file"]
elif args.mm_invite:
template_file = config["mminvite_template_file"]
elif args.protocol:
template_file = config["protocol_template_file"]
else:
raise Exception("Should never happen")
j2env = jinja2.Environment()
j2env.filters["wiki2latex"] = wiki2latex
j2env.filters["j2replace"] = j2replace
j2env.filters["date"] = date
j2env.filters["time"] = time
j2env.filters["weekday"] = weekday
j2env.filters["prototop"] = prototop
template = j2env.from_string(open(template_file).read())
mbox = mailbox.mbox(config["top_mbox_file"])
current_date = next_weekday(datetime.date.today(), config["default_weekday"])
if args.date:
current_date = datetime.date.fromisoformat(args.date)
#next_date = current_date + datetime.timedelta(days=7)
next_date = next_weekday(current_date, config["default_weekday"])
last_date = last_weekday(current_date, config["default_weekday"])
time = datetime.time.fromisoformat(args.time or config["default_time"])
pre_tops = []
post_tops = []
for top in config["pre_tops"]:
pre_tops.append(conf2top(top))
for top in config["post_tops"]:
post_tops.append(conf2top(top))
email_tops = []
for message in mbox:
if args.debug:
print(message.get_payload())
top = Top(message=message)
email_tops.append(top)
to = pre_tops + email_tops + post_tops
context = {"to": to,
"redeleitung": config["redeleitung"],
"protokoll": config["protokoll"],
"date": current_date,
"time": time,
"place": config["place"],
"next_date": next_date,
"last_date": last_date,
"meeting_link": config["meeting_link"],
"email_tops": email_tops,
"WEEKDAYS": WEEKDAYS}
if args.debug:
for top in to:
pprint(top.__dict__)
pprint(context)
elif args.save:
if args.invite:
filename = Path(config["invite_save_path"]) / Path("invite_"+datetime.date.today().isoformat()+".txt")
elif args.presentation:
filename = Path(config["presentation_save_path"]) / Path("presentation_"+datetime.date.today().isoformat()+".tex")
elif args.protocol:
filename = Path(config["protocol_save_path"]) / Path(datetime.date.today().isoformat())
else:
raise Exception("Should never happen")
with open(filename, "w") as file:
file.write(template.render(context))
elif args.send_mail:
msg = email.message.EmailMessage()
msg.set_content(template.render(context))
msg["Subject"] = j2env.from_string(config["invite_subject"]).render(context)
msg["From"] = email.utils.formataddr((config["redeleitung"]["name"], config["redeleitung"]["email"]))
msg["To"] = config["invite_mail"]
subprocess.run([*config["sendmail"], config["invite_mail"]], input=str(msg), text=True, check=True)
elif args.write_mbox:
msg = email.message.EmailMessage()
msg.set_content(template.render(context))
msg["Subject"] = j2env.from_string(config["invite_subject"]).render(context)
msg["From"] = email.utils.formataddr((config["redeleitung"]["name"], config["redeleitung"]["email"]))
msg["To"] = config["invite_mail"]
mbox = mailbox.mbox(config["mbox_out"])
mbox.add(msg)
mbox.close()
print(mbox)
elif args.send_mm:
headers = {'Content-Type': 'application/json',}
values = json.dumps({ "text": template.render(context), "username": "test"})
response = requests.post(config["mm_url"], headers=headers, data=values)
else:
print(template.render(context))

17
helpers/get_uvproto.sh Executable file
View File

@@ -0,0 +1,17 @@
#!/bin/sh
set -e
: "${FSR_GEN_SSH_TO:=fsmi-login.fsmi.uni-karlsruhe.de}"
: "${FSR_GEN_SSH:=$(test "$(hostname -d)" = "fsmi.uni-karlsruhe.de" || echo 1)}"
QUERY="select '* FSR-Protokoll vom '||datum|| case when protokoll like '%TODO%' then ' (hat noch TODOs)' else '' end from protokolle where ist_veroeffentlicht=false and name is null order by datum asc"
cmd="psql --no-align --tuples-only service=fsmi -c \"$QUERY\""
if [ -z "$FSR_GEN_SSH" ] || [ "$FSR_GEN_SSH" -eq 0 ]; then
raw_proto="$(sh -c "$cmd")"
else
raw_proto="$(ssh -- "$FSR_GEN_SSH_TO" "$cmd")"
fi
echo "$raw_proto"

4
helpers/list_termine.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/sh -e
khal list --day-format "" --format "* {start} {title}" -a calendars_fsmi today 30d | grep -v Fachschaftsrat | grep -v Feriensprechstunde || true

4
helpers/list_termine_proto.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/sh -e
echo "{{'{|'}}"
khal list --day-format "" --format "{{{{'{{{{'}}}}Termin|was={title}|wann={start}{{{{'}}}}'}}}}" -a calendars_fsmi today 30d | grep -v Fachschaftsrat | grep -v Feriensprechstunde || true
echo "{{'|}'}}"

38
helpers/read_db.sh Executable file
View File

@@ -0,0 +1,38 @@
#!/bin/sh
set -e
: "${FSR_GEN_SSH_TO:=fsmi-login.fsmi.uni-karlsruhe.de}"
: "${FSR_GEN_SSH:=$(test "$(hostname -d)" = "fsmi.uni-karlsruhe.de" || echo 1)}"
sql() {
# $1: select
# $2: order_by
select="$1"
order_by="$2"
printf "
SELECT %s FROM protokolle
WHERE ist_veroeffentlicht=false AND name IS NULL
ORDER BY %s
" "$select" "$order_by" | tr '\n' ' '
}
cmd="psql --no-align --tuples-only service=fsmi -c"
cmd_raw="$cmd '$(sql "datum" "datum ASC")'"
cmd_last="$cmd '$(sql "protokoll" "datum DESC LIMIT 1")'"
if [ -z "$FSR_GEN_SSH" ] || [ "$FSR_GEN_SSH" -eq 0 ]; then
raw_proto="$(sh -c "$cmd_raw")"
sh -c "$cmd_last" >data/last_proto
else
raw_proto="$(ssh -- "$FSR_GEN_SSH_TO" "$cmd_raw")"
ssh -- "$FSR_GEN_SSH_TO" "$cmd_last" >data/last_proto
fi
for proto in $raw_proto; do
echo "* FSR-Protokoll vom $proto"
done > data/uvproto.txt
echo "$proto" > data/last_date
grep -ioP '(?<=nächste Redeleitung: ).*(?=</li>)' data/last_proto > data/redeleitung
grep -ioP '(?<=nächstes Protokoll: ).*(?=</li>)' data/last_proto > data/protokoll
grep -ioP '(?<=Ort: ).*(?=</li>)' data/last_proto > data/ort

55
helpers/read_ubmails.py Executable file
View File

@@ -0,0 +1,55 @@
#!/usr/bin/python3
import mailbox
import jinja2
import email.header
import email.utils
import yaml
import datetime
import sys, os
import pypandoc
import argparse
import quopri
from pprint import pprint
from dateutil import parser as dateutilparser
import generate
CONFIG_FILE = "generator.conf"
def decode_header(header):
decoded_header = email.header.decode_header(header)[0]
encoding = decoded_header[1] or "ascii"
if encoding == "unknown-8bit":
encoding = "ascii"
return decoded_header[0].decode(encoding, errors="replace") if isinstance(decoded_header[0], bytes) else decoded_header[0]
if __name__ == "__main__":
aparser = argparse.ArgumentParser()
aparser.add_argument("--config", "-c", default=CONFIG_FILE)
args = aparser.parse_args()
config = generate.get_config(args.config)
mbox = mailbox.Maildir(config["ubmails_inbox_maildir"])
latest = None
latest_date = None
for message in mbox:
if message["Subject"]:
if decode_header(message["Subject"]).strip() == "Unbeantwortete Mails":
date = dateutilparser.parse(message["Date"])
if latest is None:
latest = message
latest_date = date
elif latest_date < date:
latest = message
latest_date = date
if not latest:
print("ERROR: No Mail found", file=sys.stderr)
sys.exit(1)
if latest_date.date() != datetime.date.today():
print("WARNING: Mail is not from today", file=sys.stderr)
payload = latest.get_payload(decode=True).decode("utf8").strip()
result = (payload.rpartition("\n--")[0].strip() if "\n--" in payload else payload)
print(result)

101
helpers/sequencer.py Executable file
View File

@@ -0,0 +1,101 @@
#!/usr/bin/python3
import sys
import argparse
import subprocess
import os
import mailbox
import datetime
import pytz
from dateutil import parser as dateutilparser
import generate
class Actions:
@staticmethod
def clean_data():
open(config["top_mbox_file"], 'w').close()
for top in config["pre_tops"]:
if "file" in top and os.path.isfile(top["file"]):
os.remove(top["file"])
for top in config["post_tops"]:
if "file" in top and os.path.isfile(top["file"]):
os.remove(top["file"])
@staticmethod
def read_db():
subprocess.run(["helpers/read_db.sh"], check=True)
@staticmethod
def read_topmails():
in_mbox = mailbox.Maildir(config["top_inbox_maildir"])
out_mbox = mailbox.mbox(config["top_mbox_file"])
out_mbox.clear()
buffer = []
timezone = pytz.timezone("Europe/Berlin")
last_fsr_date = datetime.date.fromisoformat(open(config["last_date_file"]).read().strip())
last_fsr_datetime = datetime.datetime.combine(last_fsr_date, datetime.time(17, 30), timezone)
for message in in_mbox:
if message["List-Id"]:
if config["top_list_id"] in generate.decode_header(message["List-Id"]).strip():
date = dateutilparser.parse(message["Date"])
if date > last_fsr_datetime:
buffer.append(message)
for message in sorted(buffer, key=lambda x: dateutilparser.parse(x["Date"])):
out_mbox.add(message)
out_mbox.close()
@staticmethod
def generate(*args):
subprocess.run(["helpers/generate.py", "--config", cliargs.config, *args], check=True)
@staticmethod
def compile_presentation():
subprocess.run(["helpers/compile_presentation.sh"], check=True)
def dispatch(action, args=[]):
if action in dir(Actions):
getattr(Actions, action)(*args)
elif action in config["sequencer"]:
for item in config["sequencer"][action]:
ilist = item.split()
if not ilist[0] in cliargs.skip:
dispatch(ilist[0], ilist[1:])
else:
raise ValueError(f"Action {action} not found")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", default=generate.CONFIG_FILE)
parser.add_argument("--skip", action='append')
parser.add_argument("action", nargs="?")
cliargs = parser.parse_args()
if cliargs.skip is None:
cliargs.skip = []
config = generate.get_config(cliargs.config)
if os.path.dirname(cliargs.config) != "":
os.chdir(os.path.dirname(cliargs.config))
commandname = sys.argv[0]
commandname = commandname.split("/")[-1]
if commandname != "sequencer.py":
dispatch(commandname)
elif cliargs.action:
dispatch(cliargs.action)
else:
print("No action specified")
sys.exit(1)