132 lines
4.0 KiB
Python
Executable File
132 lines
4.0 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import mailbox
|
|
import jinja2
|
|
import email.header
|
|
import email.utils
|
|
import yaml
|
|
import datetime
|
|
import sys, os
|
|
import argparse
|
|
import quopri
|
|
import requests
|
|
import json
|
|
import subprocess
|
|
import re
|
|
from pprint import pprint
|
|
from pathlib import Path
|
|
|
|
|
|
IMPORT_RE = re.compile(r"@import\((.*)\)")
|
|
FILE_RE = re.compile(r"@file\((.*)\)")
|
|
|
|
CONFIG_FILE = "fsmi_fsr/generator.conf"
|
|
|
|
WEEKDAYS = { 0: "Montag",
|
|
1: "Dienstag",
|
|
2: "Mittwoch",
|
|
3: "Donnerstag",
|
|
4: "Freitag",
|
|
5: "Samstag",
|
|
6: "Sonntag" }
|
|
|
|
def decode_header(header):
|
|
decoded_headers = email.header.decode_header(header)
|
|
header_strs = []
|
|
for dheader in decoded_headers:
|
|
encoding = dheader[1] or "ascii"
|
|
if encoding == "unknown-8bit":
|
|
encoding = "ascii"
|
|
result = dheader[0].decode(encoding, errors="replace") if isinstance(dheader[0], bytes) else dheader[0]
|
|
header_strs.append(result)
|
|
header_text = "".join(header_strs)
|
|
header_text = re.sub(r"\n(\s)", r" ", header_text)
|
|
#header_text = re.sub(r"\n(\s)", r"", header_text)
|
|
return header_text
|
|
|
|
def get_body_text(msg):
|
|
# from https://stackoverflow.com/a/1463144
|
|
for part in msg.walk():
|
|
# each part is a either non-multipart, or another multipart message
|
|
# that contains further parts... Message is organized like a tree
|
|
if part.get_content_type() == 'text/plain':
|
|
payload = part.get_payload()
|
|
if part["Content-Transfer-Encoding"] == "quoted-printable":
|
|
payload = quopri.decodestring(payload.encode("ascii")).decode(part.get_content_charset("utf-8"))
|
|
return payload
|
|
|
|
# from https://stackoverflow.com/a/49986645
|
|
def deEmojify(text):
|
|
regrex_pattern = re.compile(pattern = "["
|
|
u"\U0001F600-\U0001F64F" # emoticons
|
|
u"\U0001F300-\U0001F5FF" # symbols & pictographs
|
|
u"\U0001F680-\U0001F6FF" # transport & map symbols
|
|
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
|
|
"]+", flags = re.UNICODE)
|
|
return regrex_pattern.sub(r'',text)
|
|
|
|
# from https://stackoverflow.com/questions/6558535/find-the-date-for-the-first-monday-after-a-given-a-date
|
|
def next_weekday(d, weekday):
|
|
days_ahead = weekday - d.weekday()
|
|
if days_ahead < 0: # Target day already happened this week
|
|
days_ahead += 7
|
|
return d + datetime.timedelta(days_ahead)
|
|
|
|
def last_weekday(d, weekday):
|
|
days_ahead = weekday - d.weekday()
|
|
if days_ahead >= 0: # Target day already happened this week
|
|
days_ahead -= 7
|
|
return d + datetime.timedelta(days_ahead)
|
|
|
|
import_cache = {}
|
|
|
|
def get_imported_conf_entry(f, key):
|
|
if f not in import_cache:
|
|
import_cache[f] = yaml.safe_load(open(f))
|
|
return import_cache[f][key]
|
|
|
|
def do_imports(entry):
|
|
if isinstance(entry, dict):
|
|
d = entry.items()
|
|
result = {}
|
|
for key,value in d:
|
|
try:
|
|
result[key] = do_imports(value)
|
|
except KeyError:
|
|
pass
|
|
except OSError as e:
|
|
print(f"Warning: reading {key} yielded error {e}")
|
|
return result
|
|
|
|
if isinstance(entry, list):
|
|
l = iter(entry)
|
|
result = []
|
|
for item in l:
|
|
result.append(do_imports(item))
|
|
return result
|
|
|
|
if isinstance(entry, str):
|
|
match = IMPORT_RE.match(entry)
|
|
if match:
|
|
f, key = match.group(1).split(":")
|
|
return get_imported_conf_entry(f, key)
|
|
match = FILE_RE.match(entry)
|
|
if match:
|
|
with open(match.group(1)) as f:
|
|
return f.read().strip()
|
|
|
|
return entry
|
|
|
|
def get_normalized_config_path(f):
|
|
if os.path.isdir(f):
|
|
f = Path(f) / Path("generator.conf")
|
|
return f
|
|
|
|
def get_config(f):
|
|
raw_config = yaml.safe_load(open(f))
|
|
config = do_imports(raw_config)
|
|
if "override_file" in config:
|
|
override_config = yaml.safe_load(open(config["override_file"]))
|
|
config.update(override_config)
|
|
return config
|
|
|