-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.py
147 lines (115 loc) · 4.91 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import json
import os
from PIL import Image
from utils import sqlite
from quart import Quart, send_from_directory, request
with open("config.json", "r") as f:
config = json.load(f)
app = Quart(__name__)
db = sqlite.Database()
db.create_tables() # Attempt to make tables
db_foldername = config.get("database_folder", "database")
def standard_json(message: str, code: int = 200):
return {"message": message, "code": code}, code
@app.route("/")
async def index():
return {
"POST /<folder>": "Upload file to storage",
"DELETE /<folder>/<filename>": "Delete file from storage",
"GET /<folder>/<filename>": "Get file from storage",
"GET /<folder>/<filename>/stats": "Get file stats from storage",
}
@app.route("/<folder>/<filename>", methods=["GET"])
async def get_image(folder: str, filename: str):
try:
img = await send_from_directory(f"./{db_foldername}/{folder}", filename)
db.execute("UPDATE image SET views = views + 1 WHERE name = ?", (filename,))
return img
except FileNotFoundError:
return standard_json("Image not found", 404)
@app.route("/<folder>/<filename>/stats", methods=["GET"])
async def head_image_stats(folder: str, filename: str):
if not os.path.exists(f"./{db_foldername}/{folder}"):
return standard_json("Folder not found", 404)
if not os.path.exists(f"./{db_foldername}/{folder}/{filename}"):
return standard_json("Image not found", 404)
data = db.fetchrow("SELECT * FROM image WHERE name=?", (filename,))
return {
"code": 200,
"name": data["name"],
"created_at": str(data["created_at"]),
"user_id": data["user_id"],
"channel_id": data["channel_id"],
"guild_id": data["guild_id"]
}
@app.route("/<folder>/<filename>", methods=["DELETE"])
async def delete_image(folder: str, filename: str):
if not os.path.exists(f"./{db_foldername}/{folder}"):
return standard_json("Folder not found", 404)
if not os.path.exists(f"./{db_foldername}/{folder}/{filename}"):
return standard_json("Image not found", 404)
os.remove(f"./{db_foldername}/{folder}/{filename}")
db.execute("DELETE FROM image WHERE name=?", (filename,))
return standard_json("Image deleted")
@app.route("/<folder>", methods=["POST"])
async def post_image(folder: str):
if not os.path.exists(f"./{db_foldername}/{folder}"):
return standard_json("DB folder not found", 404)
user_id = request.headers.get("user_id", None)
channel_id = request.headers.get("channel_id", None)
guild_id = request.headers.get("guild_id", None)
overwrite = request.args.get("overwrite", False) == "true"
if not user_id:
return standard_json("Missing user_id header", 400)
try:
user_id = int(user_id)
channel_id = int(channel_id) if channel_id else None
guild_id = int(guild_id) if guild_id else None
except ValueError:
return standard_json("Invalid user_id, channel_id, or guild_id header, must be int value", 400)
files = await request.files
if not files:
return standard_json("No file found", 400)
duplicate = []
for g in files:
image = files[g]
filename = image.filename.split(".")[:-1]
file_ext = image.filename.split(".")[-1].lower()
full_filename = f"{''.join(filename)}.{file_ext}"
if len(image.filename.split(".")) < 1:
return standard_json(
"The file(s) sent must must have the following format: "
"{\"filename.ext\": \"image-bytes\"}",
400
)
if file_ext not in ("png", "jpg", "jpeg", "gif"):
return standard_json(f"Invalid file type '{file_ext}'", 400)
try:
img = Image.open(image)
except OSError:
return standard_json(f"The image '{full_filename}' is not a real image...", 400)
if os.path.exists(f"./{db_foldername}/{folder}/{full_filename}"):
if not overwrite:
duplicate.append(full_filename)
continue
db.execute(
"UPDATE image SET user_id=?, channel_id=?, guild_id=? WHERE name=?",
(user_id, channel_id, guild_id, full_filename)
)
else:
db.execute(
"INSERT INTO image (name, user_id, channel_id, guild_id) VALUES (?, ?, ?, ?)",
(full_filename, user_id, channel_id, guild_id)
)
img.save(f"./{db_foldername}/{folder}/{full_filename}")
if duplicate and len(duplicate) == len(files):
return standard_json("All images you attempted to upload already exist", 400)
elif duplicate:
return standard_json(
"Image(s) uploaded, however the following "
f"files were not uploaded: {duplicate}",
206
)
else:
return standard_json("Image(s) uploaded")
app.run(port=config["port"], debug=config["debug"])