194 lines
6.6 KiB
Python
194 lines
6.6 KiB
Python
import flask
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import functools
|
|
import datetime
|
|
import decimal
|
|
from flask import request, session
|
|
from inventorysystem import app
|
|
|
|
psycopg2.extras.register_uuid()
|
|
|
|
|
|
def show_message(message):
|
|
return flask.render_template("message.html", message=message)
|
|
|
|
def get_db():
|
|
return psycopg2.connect(app.config["DSN"], cursor_factory=psycopg2.extras.NamedTupleCursor)
|
|
|
|
def current_user_has_permission(permission, oe=None):
|
|
return user_has_permission(session["user_id"], permission, oe)
|
|
|
|
def user_has_permission(user_id, permission, oe=None):
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select count(*) from permissions where \"user\"=%s and (permission=%s or permission='admin') and (oe=%s or oe is NULL)", (user_id, permission, oe))
|
|
allowed = cur.fetchone()[0] > 0
|
|
db.close()
|
|
return allowed
|
|
|
|
def login_required(f):
|
|
@functools.wraps(f)
|
|
def inner_function(*args, **kwargs):
|
|
if "username" not in session:
|
|
return flask.redirect(flask.url_for("login"))
|
|
return f(*args, **kwargs)
|
|
return inner_function
|
|
|
|
def permission_required(f, permission, oe=None):
|
|
@functools.wraps(f)
|
|
def inner_function(*args, **kwargs):
|
|
if "username" not in session:
|
|
return flask.redirect(flask.url_for("login"))
|
|
if not current_user_has_permission(permission, oe):
|
|
return show_message("Permission denied")
|
|
return f(*args, **kwargs)
|
|
return inner_function
|
|
|
|
|
|
@app.route('/login', methods=["GET", "POST"])
|
|
def login():
|
|
if "username" in session:
|
|
return flask.redirect(flask.url_for("index"))
|
|
|
|
if request.method == "GET":
|
|
return flask.render_template("login.html")
|
|
|
|
elif request.method == "POST":
|
|
username = request.form["user"]
|
|
password = request.form["pass"]
|
|
#FIXME hash password
|
|
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select id,full_name from users where username=%s and password=%s",(username,password))
|
|
result = cur.fetchall()
|
|
if not result:
|
|
return show_message("Failed to log in, are username and password correct?")
|
|
else:
|
|
session["username"] = username
|
|
session["user_id"] = result[0].id
|
|
session["full_name"] = result[0].full_name
|
|
|
|
return flask.redirect(flask.url_for("index"))
|
|
|
|
@app.route('/')
|
|
@login_required
|
|
def index():
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select id, name from organizational_units")
|
|
result = cur.fetchall()
|
|
db.close()
|
|
|
|
return flask.render_template("index.html", dbresult=result)
|
|
|
|
def list_of_dicts_to_table(l, headers, default=None):
|
|
table = {}
|
|
table["headers"] = headers
|
|
table["rows"] = []
|
|
for d in l:
|
|
tmp_list = []
|
|
for header in headers:
|
|
tmp_list.append(getattr(d, header, default))
|
|
table["rows"].append(tmp_list)
|
|
return table
|
|
|
|
@app.route("/inventory/<int:oe>")
|
|
def show_inventory(oe):
|
|
if not user_has_permission(session["user_id"], "show_inventory", oe):
|
|
return show_message("Permission denied"), 403
|
|
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select id,serial,innenauftrag,description,location,purchase_date,purchase_price,old_inventory_id from inventory where oe=%s", (oe,))
|
|
result = cur.fetchall()
|
|
cur.execute("select id,name from organizational_units where id=%s", (oe,))
|
|
oe = cur.fetchone()
|
|
db.close()
|
|
|
|
table = list_of_dicts_to_table(result, ["id", "serial", "description", "location", "innenauftrag", "purchase_date", "purchase_price", "old_inventory_id"])
|
|
|
|
return flask.render_template("show_inventory.html", table=table, oe=oe)
|
|
|
|
|
|
@app.route("/inventory/<int:oe>/new", methods=["GET", "POST"])
|
|
def new_inventory(oe):
|
|
if not user_has_permission(session["user_id"], "create_inventory_entry", oe):
|
|
return show_message("Permission denied"), 403
|
|
if request.method == "GET":
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select id,name from organizational_units where id=%s", (oe,))
|
|
oe = cur.fetchone()
|
|
db.close()
|
|
|
|
return flask.render_template("new_inventory.html", oe=oe)
|
|
else:
|
|
description = request.form["description"] or None
|
|
innenauftrag = request.form["innenauftrag"] or None
|
|
serial = request.form["serial"] or None
|
|
location = request.form["location"] or None
|
|
old_inventory_id = request.form["old_inventory_id"] or None
|
|
purchase_date = request.form["purchase_date"] or None
|
|
purchase_date = datetime.datetime.fromisoformat(purchase_date)
|
|
purchase_price = request.form["purchase_price"] or None
|
|
purchase_price = decimal.Decimal(purchase_price)
|
|
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
try:
|
|
cur.execute("insert into inventory (serial, innenauftrag, description, oe, location, purchase_date, purchase_price, old_inventory_id) values (%s,%s,%s,%s,%s,%s,%s,%s)", (serial, innenauftrag, description, oe, location, purchase_date, purchase_price, old_inventory_id))
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise e
|
|
else:
|
|
db.commit()
|
|
|
|
return flask.redirect(flask.url_for("show_inventory", oe=oe))
|
|
|
|
@app.route("/inventory/<uuid:id>/delete", methods=["GET", "POST"])
|
|
def delete_inventory(id):
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select oe from inventory where id=%s", (id,))
|
|
inventory_oe = cur.fetchone().oe
|
|
if not current_user_has_permission("delete_inventory_entry", inventory_oe):
|
|
return show_message("Permission denied"), 403
|
|
|
|
if request.method == "GET":
|
|
cur.execute("select * from inventory where id=%s", (id,))
|
|
result = cur.fetchone()._asdict().items()
|
|
|
|
return flask.render_template("delete_inventory.html", dbresult=result)
|
|
|
|
else:
|
|
try:
|
|
cur.execute("delete from inventory where id=%s", (id,))
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise e
|
|
else:
|
|
db.commit()
|
|
|
|
return flask.redirect(flask.url_for("show_inventory", oe=inventory_oe))
|
|
|
|
@app.route("/users")
|
|
@permission_required("user_admin")
|
|
def list_users():
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
cur.execute("select id,username,full_name from users", (oe,))
|
|
result = cur.fetchall()
|
|
cur.execute("select id,name from organizational_units where id=%s", (oe,))
|
|
oe = cur.fetchone()
|
|
db.close()
|
|
|
|
table = list_of_dicts_to_table(result, ["id", "serial", "description", "location", "innenauftrag", "purchase_date", "purchase_price", "old_inventory_id"])
|
|
|
|
return flask.render_template("show_inventory.html", table=table, oe=oe)
|
|
|
|
|
|
|
|
|