Files
myriad/myriad/manage.py
2026-03-30 22:20:16 +01:00

789 lines
25 KiB
Python

from flask import Blueprint, flash, g, redirect, render_template, request, session, url_for, current_app, send_file, send_from_directory
from werkzeug.utils import secure_filename
import os, uuid, json, zipfile, datetime
from myriad.utilities import server_time, get_datetime_obj, server_time_obj
from myriad.auth import login_required
from myriad.db import get_db
bp = Blueprint('manage', __name__, url_prefix='/manage')
@bp.route('/new', methods=('GET', 'POST'))
@login_required
def new():
if request.method == 'POST':
name = request.form['name']
subtitle = request.form["subtitle"]
bio = request.form['bio']
user_id = g.user[0]
date_created = get_datetime_obj(server_time())
db = get_db()
db.execute("INSERT INTO member (user_id, member_name, bio, subtitle, created) VALUES (?, ?, ?, ?, ?)",(user_id, name, bio, subtitle, date_created))
db.commit()
return redirect(url_for('home.full_list'))
return render_template('manage/new.html')
@bp.route("/import", methods=("GET","POST"))
@login_required
def import_member_route():
db = get_db()
response=""
mid=None
if request.method=="POST":
if "file" in request.files:
file = request.files["file"]
if file.filename.split(".")[1].lower() == "json":
response = json.loads(file.read())
import_member(response)
last = db.execute('SELECT last_insert_rowid()').fetchone()
mid = last[0]
else:
response = "Not a JSON file"
return render_template('manage/import.html', response=response, mid=mid)
@bp.route("/delete/<mid>")
@login_required
def delete(mid):
db = get_db()
db.execute("DELETE FROM member WHERE id=(?)",(mid,))
db.commit()
return redirect(url_for('home.full_list'))
@bp.route("/edit/<mid>", methods=('GET', 'POST'))
@login_required
def edit(mid):
db = get_db()
edit_location=None
if request.method == "POST":
if "name" in request.form:
name = request.form['name']
bio = request.form['bio']
subtitle = request.form['subtitle']
theme = request.form["theme"]
privacy = request.form["privacy"]
db.execute("UPDATE member SET member_name=(?), bio=(?), subtitle=(?), public=(?), theme=(?) WHERE id=(?)",(name, bio, subtitle, privacy, theme, mid))
db.commit()
edit_location="details"
if "file" in request.files:
# here we are just saving the uploaded file to the icons folder.
# we're not going hard on security because we expect there to only be 1 admin
# but the filename will always be changed to a random string of numbers and letters known as uuid
file = request.files["file"]
filename = str(uuid.uuid4()) + "." + file.filename.split(".")[1]
file.save(os.path.join(current_app.config["ICON_UPLOAD_FOLDER"], filename))
db.execute("INSERT INTO icons (member_id, icon_location) VALUES (?, ?)", (mid, filename))
db.commit()
edit_location="icons"
if "gid_add" in request.form:
gid = request.form["gid_add"]
db.execute("INSERT INTO group_members (group_id,member_id) VALUES (?,?)",(gid,mid))
db.commit()
edit_location="groups"
elif "gid_remove" in request.form:
gid = request.form["gid_remove"]
db.execute("DELETE FROM group_members WHERE group_id=(?) AND member_id=(?)",(gid,mid))
db.commit()
edit_location="groups"
if "blinkie" in request.files:
file = request.files["blinkie"]
filename = str(uuid.uuid4()) + "." + file.filename.split(".")[1]
file.save(os.path.join(current_app.config["BLINKIES_UPLOAD_FOLDER"], filename))
db.execute("INSERT INTO blinkies (member_id, blinkie_location) VALUES (?, ?)", (mid, filename))
db.commit()
edit_location="blinkies"
if "stamp" in request.files:
file = request.files["stamp"]
filename = str(uuid.uuid4()) + "." + file.filename.split(".")[1]
file.save(os.path.join(current_app.config["STAMPS_UPLOAD_FOLDER"], filename))
db.execute("INSERT INTO stamps (member_id, stamp_location) VALUES (?, ?)", (mid, filename))
db.commit()
edit_location="stamps"
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
icons = db.execute("SELECT * FROM icons WHERE member_id=(?)",(mid,)).fetchall()
blinkies = db.execute("SELECT * FROM blinkies WHERE member_id=(?)",(mid,)).fetchall()
stamps = db.execute("SELECT * FROM stamps WHERE member_id=(?)",(mid,)).fetchall()
groups = db.execute("SELECT * FROM groups").fetchall()
member_groups = db.execute("SELECT * FROM group_members WHERE member_id=(?)",(mid,)).fetchall()
unjoined_groups = []
joined_groups = []
joined_ids = []
for group in member_groups:
joined_ids.append(group[1])
for group in groups:
if group[0] in joined_ids:
joined_groups.append(group)
else:
unjoined_groups.append(group)
themes = os.listdir(current_app.config["THEMES_FOLDER"])
return render_template("manage/edit.html", member=member, icons=icons, unjoined_groups=unjoined_groups, joined_groups=joined_groups, themes=themes, edit_location=edit_location, blinkies=blinkies, stamps=stamps)
@bp.route("/set_main_icon/<mid>/<icon_id>")
@login_required
def set_main_icon(mid, icon_id):
db = get_db()
db.execute("UPDATE member SET main_icon=(?) WHERE id=(?)",(icon_id, mid))
db.commit()
return redirect(url_for("manage.edit", mid=mid))
@bp.route("/delete_icon/<mid>/<icon_id>")
@login_required
def delete_icon(mid, icon_id):
db = get_db()
icon = db.execute("SELECT icon_location FROM icons WHERE id=(?)",(icon_id,)).fetchone()
db.execute("DELETE FROM icons WHERE id=(?)",(icon_id,))
db.commit()
os.remove(os.path.join(current_app.config["ICON_UPLOAD_FOLDER"], icon[0]))
return redirect(url_for("manage.edit", mid=mid))
@bp.route("/delete_blinkie/<mid>/<blinkie_id>")
@login_required
def delete_blinkie(mid, blinkie_id):
db = get_db()
icon = db.execute("SELECT blinkie_location FROM blinkies WHERE id=(?)",(blinkie_id,)).fetchone()
db.execute("DELETE FROM blinkies WHERE id=(?)",(blinkie_id,))
db.commit()
os.remove(os.path.join(current_app.config["BLINKIES_UPLOAD_FOLDER"], icon[0]))
return redirect(url_for("manage.edit", mid=mid))
@bp.route("/delete_stamp/<mid>/<stamp_id>")
@login_required
def delete_stamp(mid, stamp_id):
db = get_db()
icon = db.execute("SELECT stamp_location FROM stamps WHERE id=(?)",(stamp_id,)).fetchone()
db.execute("DELETE FROM stamps WHERE id=(?)",(stamp_id,))
db.commit()
os.remove(os.path.join(current_app.config["STAMPS_UPLOAD_FOLDER"], icon[0]))
return redirect(url_for("manage.edit", mid=mid))
@bp.route("/add_to_front/<mid>/<location>")
@login_required
def add_to_front(mid,location):
db = get_db()
db.execute("UPDATE member SET front=(?) WHERE id=(?)",(1, mid))
db.commit()
db.execute("INSERT INTO front_log (member_id, start_time) VALUES (?, ?)",(mid, server_time_obj()))
db.commit()
if location == "home":
return redirect(url_for('index'))
else:
return redirect(url_for('home.full_list'))
@bp.route("/remove_front/<mid>/<location>")
@login_required
def remove_front(mid, location):
db = get_db()
db.execute("UPDATE member SET front=(?) WHERE id=(?)",(0, mid))
db.commit()
current_entry = db.execute("SELECT id FROM front_log WHERE member_id=(?) AND end_time IS NULL",(mid,)).fetchone()
current_entry_id = current_entry[0]
db.execute("UPDATE front_log SET end_time=(?) WHERE id=(?)",(server_time_obj(), current_entry_id))
db.commit()
if location == "home":
return redirect(url_for('index'))
else:
return redirect(url_for('home.full_list'))
@bp.route("/delete_front_log/<fid>")
@login_required
def delete_front_log(fid):
db = get_db()
db.execute("DELETE FROM front_log WHERE id=(?)",(fid,))
db.commit()
return redirect(url_for('manage.admin'))
@bp.route("/add_to_home/<mid>/<location>")
@login_required
def add_to_home(mid, location):
db = get_db()
db.execute("UPDATE member SET homepage=(?) WHERE id=(?)",(1, mid))
db.commit()
if location == "home":
return redirect(url_for('index'))
else:
return redirect(url_for('home.full_list'))
@bp.route("/remove_home/<mid>/<location>")
@login_required
def remove_home(mid,location):
db = get_db()
db.execute("UPDATE member SET homepage=(?) WHERE id=(?)",(0, mid))
db.commit()
if location == "home":
return redirect(url_for('index'))
else:
return redirect(url_for('home.full_list'))
def import_groups(groups):
db = get_db()
for group in groups:
gid = group["id"]
name = group["name"]
description = group["description"]
if group["privacy"] == "public":
privacy = 1
else:
privacy = 0
db.execute("INSERT INTO groups (id, group_name, group_description, public) VALUES (?, ?, ?, ?)", (gid, name, description, privacy))
db.commit()
def import_member(member):
db = get_db()
mid = member["id"]
date_created_obj = get_datetime_obj(member["date-created"])
name = member["name"]
subtitle = member["subtitle"]
description = member["description"]
if member["privacy"] == "public":
privacy = 1
else:
privacy = 0
theme = member["theme"]
homepage = member["homepage-pin"]
user_id = 0
main_icon_id = ""
groups = member["groups"]
for group in groups:
db.execute("INSERT INTO group_members (group_id, member_id) VALUES (?, ?)",(group, mid))
db.commit()
blog = member["blog"]
for post in blog:
post_date_created = get_datetime_obj(post["date-created"])
title = post["title"]
content = post["content"]
if post["privacy"] == "public":
privacy = 1
else:
privacy = 0
db.execute("INSERT INTO blog (member_id, created, title, content, public) VALUES (?, ?, ?, ?, ?)",(mid, post_date_created, title, content, privacy))
db.commit()
icons = member["icons"]
for icon in icons:
db.execute("INSERT INTO icons (member_id, icon_location) VALUES (?, ?)",(mid, icon))
db.commit()
if icon == member["main-icon"]:
last = db.execute('SELECT last_insert_rowid()').fetchone()
main_icon_id = last[0]
blinkies = member["blinkies"]
for blinkie in blinkies:
db.execute("INSERT INTO blinkies (member_id, blinkie_location) VALUES (?, ?)",(mid, blinkie))
db.commit()
stamps = member["stamps"]
for stamp in stamps:
db.execute("INSERT INTO stamps (member_id, stamp_location) VALUES (?, ?)",(mid, stamp))
db.commit()
db.execute("INSERT INTO member (id,created,user_id, member_name,subtitle, bio,public,theme,homepage,main_icon) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",(mid, date_created_obj, user_id, name, subtitle, description,privacy, theme, homepage, main_icon_id))
db.commit()
@bp.route("/admin", methods=("GET", "POST"))
@login_required
def admin():
db = get_db()
if request.method == "POST":
if "json" in request.files:
file = request.files["json"].read()
file_json = json.loads(file)
groups = file_json["groups"]
import_groups(groups)
members = file_json["members"]
for member in members:
import_member(member)
return "<a href='/'>go home</a>"
elif "zip" in request.files:
file = request.files["zip"]
with zipfile.ZipFile(file, "r") as zipf:
for f in zipf.namelist():
dirs = f.split("/")
content_type = dirs[2]
filename = dirs[3]
if content_type == "tmp":
system_content = zipf.read(f)
system_json = json.loads(system_content.decode())
groups = system_json["groups"]
import_groups(groups)
for m in system_json["members"]:
import_member(m)
else:
zipf.extract(f)
return "<a href='/'>go home</a>"
users = db.execute("SELECT * FROM user").fetchall()
front_log = db.execute("SELECT * FROM front_log ORDER BY start_time DESC").fetchall()
return render_template("manage/admin.html", users=users, front_log=front_log)
# DATA EXPORTS
def generate_json(mid):
db = get_db()
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
groups_r = db.execute("SELECT group_id FROM group_members WHERE member_id=(?)",(mid,)).fetchall()
blog_r = db.execute("SELECT * FROM blog WHERE member_id=(?)",(mid,)).fetchall()
icons_r = db.execute("SELECT icon_location FROM icons WHERE member_id=(?)",(mid,)).fetchall()
blinkies_r = db.execute("SELECT blinkie_location FROM blinkies WHERE member_id=(?)",(mid,)).fetchall()
stamps_r = db.execute("SELECT stamp_location FROM stamps WHERE member_id=(?)",(mid,)).fetchall()
if member[9] == 1:
privacy = "public"
else:
privacy = "private"
date_created = member[2]
homepage = member[7]
groups = []
for group in groups_r:
groups.append(group[0])
blog = []
for post in blog_r:
d_c = post[2]
if post[5] == 1:
priv = "public"
else:
priv = "private"
p = {
"id":post[0],
"member-id":post[1],
"date-created":d_c,
"date-format":"d/m/Y, H:M:S",
"title":post[3],
"content":post[4],
"privacy":priv
}
blog.append(p)
icons = []
for icon in icons_r:
icons.append(icon[0])
main_icon_id = member[6]
main_icon = db.execute("SELECT icon_location FROM icons WHERE id=(?)",(main_icon_id,)).fetchone()
blinkies = []
for blinkie in blinkies_r:
blinkies.append(blinkie[0])
stamps = []
for stamp in stamps_r:
stamps.append(stamp[0])
data = {
"id":mid,
"date-created":date_created,
"date-format":"d/m/Y, H:M:S",
"name":member[3],
"subtitle":member[4],
"description":member[5],
"privacy":privacy,
"theme":member[10],
"groups":groups,
"blog":blog,
"icons":icons,
"blinkies":blinkies,
"stamps":stamps,
"homepage-pin":homepage
}
if main_icon:
data["main-icon"] = main_icon[0]
else:
data["main-icon"] = ""
return data
def generate_json_groups():
db = get_db()
groups_r = db.execute("SELECT * FROM groups").fetchall()
groups = []
for group in groups_r:
if group[3] == 1:
priv = "public"
else:
priv = "private"
g = {
"id":group[0],
"name":group[1],
"description":group[2],
"privacy":priv
}
groups.append(g)
return groups
@bp.route("/export_fields/<mid>")
@login_required
def export_fields(mid):
db = get_db()
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
data = generate_json(mid)
filename = str(mid) + "-" + member[3] + ".json"
full_path = current_app.config["TMP_FOLDER"] + "/" + filename
with open(full_path, 'w') as f:
json.dump(data, f)
return send_file("static/tmp/"+filename, as_attachment=True)
# THIS TECHNICALLY WORKS BUT I DONT LIKE IT RN
@bp.route("/export_icons/<mid>")
@login_required
def export_icons(mid):
db = get_db()
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
icons = db.execute("SELECT icon_location FROM icons WHERE member_id=(?)",(mid,)).fetchall()
zip_name = str(member[0]) + "-" + member[3] + "-icons.zip"
full_path = current_app.config["TMP_FOLDER"] + "/" + zip_name
with zipfile.ZipFile(full_path, "w") as zipf:
for icon in icons:
filename = icon[0]
zipf.write(current_app.config["ICON_UPLOAD_FOLDER"] + "/" + filename)
return send_file("static/tmp/"+zip_name, as_attachment=True)
# THIS TECHNICALLY WORKS BUT I DONT LIKE IT RN
@bp.route("/export_blinkies/<mid>")
@login_required
def export_blinkies(mid):
db = get_db()
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
blinkies = db.execute("SELECT blinkie_location FROM blinkies WHERE member_id=(?)",(mid,)).fetchall()
zip_name = str(member[0]) + "-" + member[3] + "-blinkies.zip"
full_path = current_app.config["TMP_FOLDER"] + "/" + zip_name
with zipfile.ZipFile(full_path, "w") as zipf:
for blinkie in blinkies:
filename = blinkie[0]
zipf.write(current_app.config["BLINKIES_UPLOAD_FOLDER"] + "/" + filename)
return send_file("static/tmp/"+zip_name, as_attachment=True)
# THIS TECHNICALLY WORKS BUT I DONT LIKE IT RN
@bp.route("/export_stamps/<mid>")
@login_required
def export_stamps(mid):
db = get_db()
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
stamps = db.execute("SELECT stamp_location FROM stamps WHERE member_id=(?)",(mid,)).fetchall()
zip_name = str(member[0]) + "-" + member[3] + "-stamps.zip"
full_path = current_app.config["TMP_FOLDER"] + "/" + zip_name
with zipfile.ZipFile(full_path, "w") as zipf:
for stamp in stamps:
filename = stamp[0]
zipf.write(current_app.config["STAMPS_UPLOAD_FOLDER"] + "/" + filename)
return send_file("static/tmp/"+zip_name, as_attachment=True)
# THIS TECHNICALLY WORKS BUT I DONT LIKE IT RN
@bp.route("/export_member/<mid>")
@login_required
def export_member(mid):
db = get_db()
member = db.execute("SELECT * FROM member WHERE id=(?)",(mid,)).fetchone()
icons = db.execute("SELECT icon_location FROM icons WHERE member_id=(?)",(mid,)).fetchall()
blinkies = db.execute("SELECT blinkie_location FROM blinkies WHERE member_id=(?)",(mid,)).fetchall()
stamps = db.execute("SELECT stamp_location FROM stamps WHERE member_id=(?)",(mid,)).fetchall()
data = generate_json(mid)
data_name = str(member[0]) + "-" + member[3] + ".json"
data_full_path = current_app.config["TMP_FOLDER"] + "/" + data_name
with open(data_full_path, 'w') as f:
json.dump(data, f)
zip_name = str(member[0]) + "-" + member[3] + ".zip"
zip_path = current_app.config["TMP_FOLDER"] + "/" + zip_name
with zipfile.ZipFile(zip_path, "w") as zipf:
zipf.write(data_full_path)
for icon in icons:
iname = icon[0]
zipf.write(current_app.config["ICON_UPLOAD_FOLDER"] + "/" + iname)
for blinkie in blinkies:
bname = blinkie[0]
zipf.write(current_app.config["BLINKIES_UPLOAD_FOLDER"] + "/" + bname)
for stamp in stamps:
sname = stamp[0]
zipf.write(current_app.config["STAMPS_UPLOAD_FOLDER"] + "/" + sname)
return send_file("static/tmp/"+zip_name, as_attachment=True)
@bp.route("/export_system")
@login_required
def export_system():
db = get_db()
members = db.execute("SELECT * FROM member").fetchall()
groups_r = db.execute("SELECT * FROM groups").fetchall()
data = {}
data["members"] = []
for member in members:
d = generate_json(member[0])
data["members"].append(d)
groups = generate_json_groups()
data["groups"] = groups
filename = "myriad_system_textonly.json"
file_full_path = current_app.config["TMP_FOLDER"] + "/" + filename
with open(file_full_path, 'w') as f:
json.dump(data, f)
return send_file("static/tmp/"+filename, as_attachment=True)
@bp.route("/export_system_full")
@login_required
def export_system_full():
db = get_db()
members = db.execute("SELECT * FROM member").fetchall()
icons = db.execute("SELECT icon_location FROM icons").fetchall()
blinkies = db.execute("SELECT blinkie_location FROM blinkies").fetchall()
stamps = db.execute("SELECT stamp_location FROM stamps").fetchall()
data = {}
data["members"] = []
for member in members:
d = generate_json(member[0])
data["members"].append(d)
groups = generate_json_groups()
data["groups"] = groups
filename = "myriad_system.json"
file_full_path = current_app.config["TMP_FOLDER"] + "/" + filename
with open(file_full_path, 'w') as f:
json.dump(data, f)
zip_name = "myriad_system.zip"
zip_path = current_app.config["TMP_FOLDER"] + "/" + zip_name
with zipfile.ZipFile(zip_path, "w") as zipf:
zipf.write(file_full_path)
for icon in icons:
iname = icon[0]
zipf.write(current_app.config["ICON_UPLOAD_FOLDER"] + "/" + iname)
for blinkie in blinkies:
bname = blinkie[0]
zipf.write(current_app.config["BLINKIES_UPLOAD_FOLDER"] + "/" + bname)
for stamp in stamps:
sname = stamp[0]
zipf.write(current_app.config["STAMPS_UPLOAD_FOLDER"] + "/" + sname)
return send_file("static/tmp/"+zip_name, as_attachment=True)
# ASSETS
@bp.route("/assets", methods=('GET', 'POST'))
@login_required
def assets():
db = get_db()
if "blinkie" in request.files:
file = request.files["blinkie"]
filename = file.filename
file.save(os.path.join(current_app.config["BLINKIES_UPLOAD_FOLDER"], filename))
if "stamp" in request.files:
file = request.files["stamp"]
filename = file.filename
file.save(os.path.join(current_app.config["STAMPS_UPLOAD_FOLDER"], filename))
icons = db.execute("SELECT * FROM icons").fetchall()
icon_storage = os.listdir(current_app.config["ICON_UPLOAD_FOLDER"])
i_storage = []
for icon in icon_storage:
in_database = False
for i in icons:
if i[2] == icon:
in_database = True
if not in_database:
i_storage.append(icon)
unlinked_icons = []
for i in icons:
in_storage = False
if i[2] in icon_storage:
in_storage = True
if not in_storage:
unlinked_icons.append(i)
blinkies = os.listdir(current_app.config["BLINKIES_UPLOAD_FOLDER"])
stamps = os.listdir(current_app.config["STAMPS_UPLOAD_FOLDER"])
return render_template("manage/assets.html", icons=unlinked_icons, icon_storage=i_storage, blinkies=blinkies, stamps=stamps)
@bp.route("/delete_idb")
@login_required
def delete_idb():
db = get_db()
icons = db.execute("SELECT * FROM icons").fetchall()
icon_storage = os.listdir(current_app.config["ICON_UPLOAD_FOLDER"])
for i in icons:
in_storage = False
if i[2] in icon_storage:
in_storage = True
if not in_storage:
db.execute("DELETE FROM icons WHERE id=(?)", (i[0],))
db.commit()
return redirect(url_for("manage.assets"))
@bp.route("/delete_ifiles")
@login_required
def delete_ifiles():
db = get_db()
icons = db.execute("SELECT * FROM icons").fetchall()
icon_storage = os.listdir(current_app.config["ICON_UPLOAD_FOLDER"])
for icon in icon_storage:
in_database = False
for i in icons:
if i[2] == icon:
in_database = True
if not in_database:
os.remove(os.path.join(current_app.config["ICON_UPLOAD_FOLDER"], icon))
return redirect(url_for("manage.assets"))
# GROUPS
@bp.route("/groups", methods=('GET', 'POST'))
@login_required
def groups():
db = get_db()
if request.method == "POST":
name = request.form['name']
desc = request.form['desc']
db.execute("INSERT INTO groups (group_name, group_description) VALUES (?, ?)",(name, desc))
db.commit()
groups = db.execute("SELECT * FROM groups").fetchall()
return render_template("manage/groups.html", groups=groups)
@bp.route("/group_edit/<gid>", methods=("GET", "POST"))
@login_required
def group_edit(gid):
db = get_db()
if request.method == "POST":
name = request.form["name"]
desc = request.form["desc"]
privacy = request.form["privacy"]
db.execute("UPDATE groups SET group_name=(?), group_description=(?), public=(?) WHERE id=(?)",(name, desc, privacy, gid))
db.commit()
return redirect(url_for("manage.groups"))
@bp.route("/group_delete/<gid>")
@login_required
def group_delete(gid):
db = get_db()
db.execute("DELETE FROM groups WHERE id=(?)",(gid,))
db.execute("DELETE FROM group_members WHERE group_id=(?)",(gid,))
db.commit()
return redirect(url_for("manage.groups"))