forked from mattvaldes/vulnerable-api
-
Notifications
You must be signed in to change notification settings - Fork 27
/
vAPI.py
398 lines (379 loc) · 15.1 KB
/
vAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
"""
s API interacts with a user and token database.
The code is written to exemplify common API security vulnerabilities
1. No input validation
2. SQL queries are not parameterized
3. No real error handling
4. Errors that are handled give too much information
4. Tokens have an expiration date, but do not expire
5. Token string is generated with an md5 of the expire datetime string
6. Passwords are not hashed in the database
7. There is an *undocumented* GET that returns the user database
"""
from __future__ import print_function
import connexion
import sqlite3
import json
import hashlib
import time
import os
import re
import xml.etree.ElementTree as ET
import logging
import sys
import getopt
import argparse
from lxml import etree
def get_root():
"""
Give default message for a GET on root directory.
"""
src_ip = connexion.request.environ.get(
"HTTP_X_FORWARDED_FOR"
) or connexion.request.environ.get("REMOTE_ADDR")
response = {"response": {"application": "vAPI", "status": "running"}}
logging.info(
'app=vAPI: src_ip=%s action=success signature="API status request"' % src_ip
)
return response, 200
def get_token():
"""
User needs to get an auth token before actioning the database
"""
src_ip = connexion.request.environ.get(
"HTTP_X_FORWARDED_FOR"
) or connexion.request.environ.get("REMOTE_ADDR")
content_type = connexion.request.headers.get("Content-type")
if content_type == "application/xml":
try:
# LXML is vulnerable to XXE, etree is vulnerable to Billion Laughs
# So just have etree try to parse it just to watch it die
ET.parse(connexion.request.body)
except Exception:
# But etree will throw an exception for XXE, so ignore that
pass
# force unsafe external entity parsing
parser = etree.XMLParser(load_dtd=True, resolve_entities=True)
data = etree.parse(connexion.request.body, parser)
username = data.find("username").text
password = data.find("password").text
else:
data = connexion.request.get_json() if connexion.request.is_json else {}
username = data.get("username", "")
password = data.get("password", "")
conn = sqlite3.connect("vAPI.db")
c = conn.cursor()
# no data validation
# no sql parameterization
user_query = "SELECT * FROM users WHERE username = '%s' AND password = '%s'" % (
username,
password,
)
c.execute(user_query)
user = c.fetchone()
response = {}
if user:
response["access"] = {}
response["access"]["user"] = {"id": user[0], "name": user[1]}
# make sure to get most recent token in database, because we arent
# removing them...
token_query = (
"SELECT * FROM tokens WHERE userid = '%s' ORDER BY expires DESC" % (user[0])
)
c.execute(token_query)
token_record = c.fetchone()
if isinstance(token_record, tuple):
if token_record[3] < int(time.time()):
# token has expired. create new one that expires 5 minutes
# after creation
expire_stamp = int(time.time() + 300)
expire_date = time.ctime(int(expire_stamp))
token = hashlib.md5(expire_date.encode("utf-8")).hexdigest()
# we'll parameterize this one because we need this serious
# functionality
c.execute(
"INSERT INTO tokens (token, userid, expires) VALUES (?, ?, ?)",
(token, user[0], expire_stamp),
)
conn.commit()
response["access"]["token"] = {"id": token, "expires": expire_date}
logging.info(
'app=vAPI:tokens src_ip=%s user=%s action=success signature="Request token: authentication succeeded, found expired token in db"'
% (src_ip, username)
)
return response, 200
else:
# recent token hasn't expired. use same one.
expire_date = time.ctime(int(token_record[3]))
response["access"]["token"] = {
"id": token_record[1],
"expires": expire_date,
}
logging.info(
'app=vAPI:tokens src_ip=%s user=%s action=success signature="Request token: authentication succeeded, found recent token in db"'
% (src_ip, username)
)
return response, 200
else:
# no token exists. create one that expires in 5 minutes
expire_stamp = int(time.time() + 300)
expire_date = time.ctime(int(expire_stamp))
token = hashlib.md5(expire_date.encode("utf-8")).hexdigest()
# we'll parameterize this one because we need this serious
# functionality
c.execute(
"INSERT INTO tokens (token, userid, expires) VALUES (?, ?, ?)",
(token, user[0], expire_stamp),
)
conn.commit()
response["access"]["token"] = {"id": token, "expires": expire_date}
logging.info(
'app=vAPI:tokens src_ip=%s user=%s action=success signature="Request token: authentication succeeded, created new token in db"'
% (src_ip, username)
)
return response, 200
else:
# let's do another look up so we can return helpful info for failure
# cases
c.execute("SELECT * FROM users WHERE username = '%s'" % username)
user = c.fetchone()
if user:
response["error"] = {"message": "password does not match"}
logging.info(
'app=vAPI:tokens src_ip=%s user=%s action=failure signature="Request token: authentication failed, wrong password"'
% (src_ip, username)
)
else:
logging.info(
'app=vAPI:tokens src_ip=%s user=%s action=failure signature="Request token: authentication failed, user unknown"'
% (src_ip, username)
)
response["error"] = {"message": "username " + username + " not found"}
return response, 401
def get_get_token():
"""
this is an undocumented request. EASTER EGG
/tokens is only supposed to accept a POST! Are you checking the other verbs?
"""
conn = sqlite3.connect("vAPI.db")
c = conn.cursor()
query = "SELECT * FROM users"
c.execute(query)
users = c.fetchall()
c.close()
conn.close()
return {"response": users}, 200
def get_user(user):
"""
Expects a user id to return that user's data.
X-Auth-Token is also expected
"""
src_ip = connexion.request.environ.get(
"HTTP_X_FORWARDED_FOR"
) or connexion.request.environ.get("REMOTE_ADDR")
token = connexion.request.headers.get("X-Auth-Token")
try:
conn = sqlite3.connect("vAPI.db")
c = conn.cursor()
user_query = "SELECT * FROM users WHERE id = '%s'" % (user)
c.execute(user_query)
user_record = c.fetchone()
token_query = "SELECT * FROM tokens WHERE token = '%s'" % (str(token))
c.execute(token_query)
token_record = c.fetchone()
c.close()
except Exception:
return {"error": "database error"}, 500
response = {}
# you'll notice we don't actually check the token expiration date
if isinstance(token_record, tuple):
if isinstance(user_record, tuple):
if token_record[2] == user_record[0]:
response["user"] = {}
response["user"]["id"] = user_record[0]
response["user"]["name"] = user_record[1]
response["user"]["password"] = user_record[2]
logging.info(
'app=vAPI:user src_ip=%s user=%s action=success signature="Requesting user record: success for user record %s"'
% (src_ip, token_record[2], user)
)
return response, 200
else:
response["error"] = {"message": "the token and user do not match!"}
logging.info(
'app=vAPI:user src_ip=%s user=%s action=failure signature="Requesting user record: no permission to show user=%s"'
% (src_ip, token_record[2], user)
)
return response, 403
else:
response["error"] = {"message": "user id " + user + " not found"}
logging.info(
'app=vAPI:user src_ip=%s user=%s action=failure signature="Requesting user record: failed for unknown user %s"'
% (src_ip, "", user)
)
return response, 404
else:
response["error"] = {"message": "token id " + str(token) + " not found"}
logging.info(
'app=vAPI:user src_ip=%s user=%s action=failure "Requesting user record: authentication failed for user %s"'
% (src_ip, "", user)
)
return response, 401
def create_user():
src_ip = connexion.request.environ.get(
"HTTP_X_FORWARDED_FOR"
) or connexion.request.environ.get("REMOTE_ADDR")
token = connexion.request.headers.get("X-Auth-Token")
conn = sqlite3.connect("vAPI.db")
c = conn.cursor()
token_query = "SELECT * FROM tokens WHERE token = '%s' AND userid = 10" % (
str(token)
)
c.execute(token_query)
token_record = c.fetchone()
response = {}
if isinstance(token_record, tuple):
data = connexion.request.get_json() if connexion.request.is_json else {}
name = data.get("username", "")
password = data.get("password", "")
# catastrophically bad regex
match = "([a-z]+)*[0-9]"
m = re.search(match, name)
if m:
user_query = "SELECT * FROM users WHERE username = '%s'" % (name)
c.execute(user_query)
user_record = c.fetchone()
if isinstance(user_record, tuple):
response["error"] = {"message": "User %s already exists!" % name}
logging.info(
'app=vAPI:user src_ip=%s user=%s action=failure signature="Create new user: already existing user %s"'
% (src_ip, token_record[2], name)
)
return response, 403
else:
c.execute(
"INSERT INTO users (username, password) VALUES (?, ?)",
(name, password),
)
conn.commit()
response["user"] = {"username": name, "password": password}
logging.info(
'app=vAPI:user src_ip=%s user=%s action=success signature="Create new user: %s"'
% (src_ip, token_record[2], name)
)
return response, 200
else:
response["error"] = {
"message": "username {0} invalid format, check documentation!".format(
name
)
}
logging.info(
'app=vAPI:user src_ip=%s user=%s action=failure signature="Create new user: invalid name %s"'
% (src_ip, token_record[2], name)
)
return response, 403
else:
response["error"] = {"message": "must provide valid admin token"}
logging.info(
'app=vAPI:user src_ip=%s action=failure signature="Create new user: authentication failed, invalid token"'
% src_ip
)
return response, 401
def display_uptime():
return display_uptime_flag(False)
def display_uptime_flag(flag):
src_ip = connexion.request.environ.get(
"HTTP_X_FORWARDED_FOR"
) or connexion.request.environ.get("REMOTE_ADDR")
if flag:
command = "uptime -" + flag
logging.info(
'app=vAPI:uptime src_ip=%s action=success signature="Uptime request: flag=%s"'
% (src_ip, flag)
)
else:
command = "uptime"
logging.info(
'app=vAPI:uptime src_ip=%s action=success signature="Uptime request"'
% src_ip
)
output = os.popen(command).read()
response = {"response": {"command": command, "output": output}}
return json.dumps(response, sort_keys=True, indent=2), 200
def create_widget_reservation():
src_ip = connexion.request.environ.get(
"HTTP_X_FORWARDED_FOR"
) or connexion.request.environ.get("REMOTE_ADDR")
token = connexion.request.headers.get("X-Auth-Token")
conn = sqlite3.connect("vAPI.db")
c = conn.cursor()
token_query = "SELECT * FROM tokens WHERE token = '%s'" % (str(token))
c.execute(token_query)
token_record = c.fetchone()
response = {}
data = connexion.request.get_json() if connexion.request.is_json else {}
name = data.get("name", {})
c.close()
conn.close()
if isinstance(token_record, tuple):
# catastrophically bad regex
match = "([a-z]+)*[0-9]"
m = re.search(match, str(name))
if m:
response = {"message": "created reservation for widget %s" % name}
logging.info(
'app=vAPI:widget src_ip=%s user=%s action=success signature="Create reservation: widget=%s"'
% (src_ip, token_record[2], name)
)
return response, 200
else:
response["error"] = {"message": "illegal widget name"}
logging.info(
'app=vAPI:widget src_ip=%s user=%s action=failure signature="Create reservation: illegal name widget=%s"'
% (src_ip, token_record[2], name)
)
return response, 403
else:
response["error"] = {"message": "must provide valid token"}
logging.info(
'app=vAPI:widget src_ip=%s action=failure signature="Create reservation: authentication failure widget=%s"'
% (src_ip, name)
)
return response, 401
if __name__ == "__main__":
logging.basicConfig(
filename="vAPI.log",
filemode="a",
format="%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
level=logging.INFO,
)
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", dest="port", type=int, help="Listening port", default=8081
)
parser.add_argument(
"-s",
dest="oasfile",
type=str,
help="OpenAPI specification YAML file path",
default="vAPI.yaml",
)
args = parser.parse_args()
myport = args.port
oasfile = args.oasfile
logging.info(
'app=vAPI action=success signature="Starting vAPI on port {} using {}"'.format(
myport, oasfile
)
)
logging.info("Starting vAPI on port {} using {}".format(myport, oasfile))
logger = logging.getLogger("vAPI")
try:
app = connexion.FlaskApp(__name__, specification_dir="openapi/")
app.add_api(oasfile, arguments={"title": "Vulnerable API"})
app.run(port=myport, debug=True)
except Exception as e:
logging.error('app=vAPI action=failure signature="Starting vAPI failed with exception: {}"'.format(str(e)))
print("Starting vAPI failed with exception: {}".format(e))