Files
2022-10-06 13:36:57 +02:00

194 lines
6.6 KiB
Python

import flask
import psycopg2
import psycopg2.extras
import functools
import datetime
import decimal
from flask import request, session
from inventorysystem import app
psycopg2.extras.register_uuid()
def show_message(message):
return flask.render_template("message.html", message=message)
def get_db():
return psycopg2.connect(app.config["DSN"], cursor_factory=psycopg2.extras.NamedTupleCursor)
def current_user_has_permission(permission, oe=None):
return user_has_permission(session["user_id"], permission, oe)
def user_has_permission(user_id, permission, oe=None):
db = get_db()
cur = db.cursor()
cur.execute("select count(*) from permissions where \"user\"=%s and (permission=%s or permission='admin') and (oe=%s or oe is NULL)", (user_id, permission, oe))
allowed = cur.fetchone()[0] > 0
db.close()
return allowed
def login_required(f):
@functools.wraps(f)
def inner_function(*args, **kwargs):
if "username" not in session:
return flask.redirect(flask.url_for("login"))
return f(*args, **kwargs)
return inner_function
def permission_required(f, permission, oe=None):
@functools.wraps(f)
def inner_function(*args, **kwargs):
if "username" not in session:
return flask.redirect(flask.url_for("login"))
if not current_user_has_permission(permission, oe):
return show_message("Permission denied")
return f(*args, **kwargs)
return inner_function
@app.route('/login', methods=["GET", "POST"])
def login():
if "username" in session:
return flask.redirect(flask.url_for("index"))
if request.method == "GET":
return flask.render_template("login.html")
elif request.method == "POST":
username = request.form["user"]
password = request.form["pass"]
#FIXME hash password
db = get_db()
cur = db.cursor()
cur.execute("select id,full_name from users where username=%s and password=%s",(username,password))
result = cur.fetchall()
if not result:
return show_message("Failed to log in, are username and password correct?")
else:
session["username"] = username
session["user_id"] = result[0].id
session["full_name"] = result[0].full_name
return flask.redirect(flask.url_for("index"))
@app.route('/')
@login_required
def index():
db = get_db()
cur = db.cursor()
cur.execute("select id, name from organizational_units")
result = cur.fetchall()
db.close()
return flask.render_template("index.html", dbresult=result)
def list_of_dicts_to_table(l, headers, default=None):
table = {}
table["headers"] = headers
table["rows"] = []
for d in l:
tmp_list = []
for header in headers:
tmp_list.append(getattr(d, header, default))
table["rows"].append(tmp_list)
return table
@app.route("/inventory/<int:oe>")
def show_inventory(oe):
if not user_has_permission(session["user_id"], "show_inventory", oe):
return show_message("Permission denied"), 403
db = get_db()
cur = db.cursor()
cur.execute("select id,serial,innenauftrag,description,location,purchase_date,purchase_price,old_inventory_id from inventory where oe=%s", (oe,))
result = cur.fetchall()
cur.execute("select id,name from organizational_units where id=%s", (oe,))
oe = cur.fetchone()
db.close()
table = list_of_dicts_to_table(result, ["id", "serial", "description", "location", "innenauftrag", "purchase_date", "purchase_price", "old_inventory_id"])
return flask.render_template("show_inventory.html", table=table, oe=oe)
@app.route("/inventory/<int:oe>/new", methods=["GET", "POST"])
def new_inventory(oe):
if not user_has_permission(session["user_id"], "create_inventory_entry", oe):
return show_message("Permission denied"), 403
if request.method == "GET":
db = get_db()
cur = db.cursor()
cur.execute("select id,name from organizational_units where id=%s", (oe,))
oe = cur.fetchone()
db.close()
return flask.render_template("new_inventory.html", oe=oe)
else:
description = request.form["description"] or None
innenauftrag = request.form["innenauftrag"] or None
serial = request.form["serial"] or None
location = request.form["location"] or None
old_inventory_id = request.form["old_inventory_id"] or None
purchase_date = request.form["purchase_date"] or None
purchase_date = datetime.datetime.fromisoformat(purchase_date)
purchase_price = request.form["purchase_price"] or None
purchase_price = decimal.Decimal(purchase_price)
db = get_db()
cur = db.cursor()
try:
cur.execute("insert into inventory (serial, innenauftrag, description, oe, location, purchase_date, purchase_price, old_inventory_id) values (%s,%s,%s,%s,%s,%s,%s,%s)", (serial, innenauftrag, description, oe, location, purchase_date, purchase_price, old_inventory_id))
except Exception as e:
db.rollback()
raise e
else:
db.commit()
return flask.redirect(flask.url_for("show_inventory", oe=oe))
@app.route("/inventory/<uuid:id>/delete", methods=["GET", "POST"])
def delete_inventory(id):
db = get_db()
cur = db.cursor()
cur.execute("select oe from inventory where id=%s", (id,))
inventory_oe = cur.fetchone().oe
if not current_user_has_permission("delete_inventory_entry", inventory_oe):
return show_message("Permission denied"), 403
if request.method == "GET":
cur.execute("select * from inventory where id=%s", (id,))
result = cur.fetchone()._asdict().items()
return flask.render_template("delete_inventory.html", dbresult=result)
else:
try:
cur.execute("delete from inventory where id=%s", (id,))
except Exception as e:
db.rollback()
raise e
else:
db.commit()
return flask.redirect(flask.url_for("show_inventory", oe=inventory_oe))
@app.route("/users")
@permission_required("user_admin")
def list_users():
db = get_db()
cur = db.cursor()
cur.execute("select id,username,full_name from users", (oe,))
result = cur.fetchall()
cur.execute("select id,name from organizational_units where id=%s", (oe,))
oe = cur.fetchone()
db.close()
table = list_of_dicts_to_table(result, ["id", "serial", "description", "location", "innenauftrag", "purchase_date", "purchase_price", "old_inventory_id"])
return flask.render_template("show_inventory.html", table=table, oe=oe)