import flask import psycopg2 import psycopg2.extras import functools import datetime import decimal from flask import request, session from inventorysystem import app psycopg2.extras.register_uuid() def show_message(message): return flask.render_template("message.html", message=message) def get_db(): return psycopg2.connect(app.config["DSN"], cursor_factory=psycopg2.extras.NamedTupleCursor) def current_user_has_permission(permission, oe=None): return user_has_permission(session["user_id"], permission, oe) def user_has_permission(user_id, permission, oe=None): db = get_db() cur = db.cursor() cur.execute("select count(*) from permissions where \"user\"=%s and (permission=%s or permission='admin') and (oe=%s or oe is NULL)", (user_id, permission, oe)) allowed = cur.fetchone()[0] > 0 db.close() return allowed def login_required(f): @functools.wraps(f) def inner_function(*args, **kwargs): if "username" not in session: return flask.redirect(flask.url_for("login")) return f(*args, **kwargs) return inner_function def permission_required(f, permission, oe=None): @functools.wraps(f) def inner_function(*args, **kwargs): if "username" not in session: return flask.redirect(flask.url_for("login")) if not current_user_has_permission(permission, oe): return show_message("Permission denied") return f(*args, **kwargs) return inner_function @app.route('/login', methods=["GET", "POST"]) def login(): if "username" in session: return flask.redirect(flask.url_for("index")) if request.method == "GET": return flask.render_template("login.html") elif request.method == "POST": username = request.form["user"] password = request.form["pass"] #FIXME hash password db = get_db() cur = db.cursor() cur.execute("select id,full_name from users where username=%s and password=%s",(username,password)) result = cur.fetchall() if not result: return show_message("Failed to log in, are username and password correct?") else: session["username"] = username session["user_id"] = result[0].id session["full_name"] = result[0].full_name return flask.redirect(flask.url_for("index")) @app.route('/') @login_required def index(): db = get_db() cur = db.cursor() cur.execute("select id, name from organizational_units") result = cur.fetchall() db.close() return flask.render_template("index.html", dbresult=result) def list_of_dicts_to_table(l, headers, default=None): table = {} table["headers"] = headers table["rows"] = [] for d in l: tmp_list = [] for header in headers: tmp_list.append(getattr(d, header, default)) table["rows"].append(tmp_list) return table @app.route("/inventory/") def show_inventory(oe): if not user_has_permission(session["user_id"], "show_inventory", oe): return show_message("Permission denied"), 403 db = get_db() cur = db.cursor() cur.execute("select id,serial,innenauftrag,description,location,purchase_date,purchase_price,old_inventory_id from inventory where oe=%s", (oe,)) result = cur.fetchall() cur.execute("select id,name from organizational_units where id=%s", (oe,)) oe = cur.fetchone() db.close() table = list_of_dicts_to_table(result, ["id", "serial", "description", "location", "innenauftrag", "purchase_date", "purchase_price", "old_inventory_id"]) return flask.render_template("show_inventory.html", table=table, oe=oe) @app.route("/inventory//new", methods=["GET", "POST"]) def new_inventory(oe): if not user_has_permission(session["user_id"], "create_inventory_entry", oe): return show_message("Permission denied"), 403 if request.method == "GET": db = get_db() cur = db.cursor() cur.execute("select id,name from organizational_units where id=%s", (oe,)) oe = cur.fetchone() db.close() return flask.render_template("new_inventory.html", oe=oe) else: description = request.form["description"] or None innenauftrag = request.form["innenauftrag"] or None serial = request.form["serial"] or None location = request.form["location"] or None old_inventory_id = request.form["old_inventory_id"] or None purchase_date = request.form["purchase_date"] or None purchase_date = datetime.datetime.fromisoformat(purchase_date) purchase_price = request.form["purchase_price"] or None purchase_price = decimal.Decimal(purchase_price) db = get_db() cur = db.cursor() try: cur.execute("insert into inventory (serial, innenauftrag, description, oe, location, purchase_date, purchase_price, old_inventory_id) values (%s,%s,%s,%s,%s,%s,%s,%s)", (serial, innenauftrag, description, oe, location, purchase_date, purchase_price, old_inventory_id)) except Exception as e: db.rollback() raise e else: db.commit() return flask.redirect(flask.url_for("show_inventory", oe=oe)) @app.route("/inventory//delete", methods=["GET", "POST"]) def delete_inventory(id): db = get_db() cur = db.cursor() cur.execute("select oe from inventory where id=%s", (id,)) inventory_oe = cur.fetchone().oe if not current_user_has_permission("delete_inventory_entry", inventory_oe): return show_message("Permission denied"), 403 if request.method == "GET": cur.execute("select * from inventory where id=%s", (id,)) result = cur.fetchone()._asdict().items() return flask.render_template("delete_inventory.html", dbresult=result) else: try: cur.execute("delete from inventory where id=%s", (id,)) except Exception as e: db.rollback() raise e else: db.commit() return flask.redirect(flask.url_for("show_inventory", oe=inventory_oe)) @app.route("/users") @permission_required("user_admin") def list_users(): db = get_db() cur = db.cursor() cur.execute("select id,username,full_name from users", (oe,)) result = cur.fetchall() cur.execute("select id,name from organizational_units where id=%s", (oe,)) oe = cur.fetchone() db.close() table = list_of_dicts_to_table(result, ["id", "serial", "description", "location", "innenauftrag", "purchase_date", "purchase_price", "old_inventory_id"]) return flask.render_template("show_inventory.html", table=table, oe=oe)