Skip to content

Commit 4b12161

Browse files
committed
Better logging
1 parent 3fee193 commit 4b12161

File tree

12 files changed

+192
-49
lines changed

12 files changed

+192
-49
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
*logs.txt
33
compile-test/
44
output/
5+
configs/
6+
logs/
57
# answer/A1T1
68
# answer/A1T2
79
*logs.txt

config/evaluator.json

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
{
2+
"backend": {
3+
"name": "backend",
4+
"log_dir": "/home/vishalr/Desktop/backend-2022/logs/",
5+
"config_dir": "/home/vishalr/Desktop/backend-2022/configs/"
6+
},
7+
28
"executor": {
9+
"name": "Assignment2",
310
"num_backends": 2,
411
"num_workers": 1,
512
"fetch_port": 9000,
@@ -9,7 +16,8 @@
916
"threshold": 8,
1017
"global_queue_thread": true,
1118
"global_prefetch_thread": true,
12-
"timeout": 30
19+
"timeout": 30,
20+
"log_dir": "/home/vishalr/Desktop/backend-2022/logs/"
1321
},
1422

1523
"docker": {
@@ -23,6 +31,8 @@
2331
"shared_output_dir": "/home/vishalramesh01/backend-2022/output/",
2432
"docker_output_dir": "/output",
2533
"docker_memswapiness": 0,
26-
"spawn_wait": 40
34+
"spawn_wait": 40,
35+
"blacklist_threshold": 5,
36+
"blacklist_duration": 120
2737
}
2838
}
File renamed without changes.
File renamed without changes.

docker/hadoop_config/mapred-site.xml

-4
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
<!-- Put site-specific property overrides in this file. -->
1818

1919
<configuration>
20-
<property>
21-
<name>mapreduce.framework.name</name>
22-
<value>yarn</value>
23-
</property>
2420
<property>
2521
<name>mapreduce.framework.name</name>
2622
<value>yarn</value>

docker/hadoop_server-A2.py

+23-9
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,15 @@ def create_hdfs_directory(dirname: str) -> int:
122122
logger.mark(f"Created Directory - hdfs:{dirname}")
123123
return res
124124

125-
def delete_hdfs_directories(dirname: str) -> int:
126-
process = subprocess.Popen([f"{HDFS} dfs -rm -r {dirname}"], shell=True, text=True)
125+
def delete_hdfs_directories(dirname: List[str]) -> int:
126+
if type(dirname) != list:
127+
dirname = [dirname]
128+
command = []
129+
for dirs in dirname:
130+
cmd = f"{HDFS} dfs -rm -r {dirs}"
131+
command.append(cmd)
132+
command = "&&".join(command)
133+
process = subprocess.Popen([command], shell=True, text=True)
127134
res = process.wait()
128135
logger.mark(f"Deleted Directory - hdfs:{dirname}")
129136
return res
@@ -181,14 +188,14 @@ def run_hadoop_job(team_id, assignment_id, submission_id, timeout, mapper: str,
181188
"""
182189

183190
path = os.path.join(SUBMISSIONS, team_id)
184-
if not os.path.exists(path):
185-
os.mkdir(path)
191+
# if not os.path.exists(path):
192+
# os.mkdir(path)
186193

187194
logger.mark(f"Created Directory - {path}")
188195

189196
task_path = os.path.join(path, submission_id)
190197
if not os.path.exists(task_path):
191-
os.mkdir(task_path)
198+
os.makedirs(task_path)
192199

193200
logger.mark(f"Created Directory - {task_path}")
194201

@@ -473,10 +480,17 @@ def cleanup(team_id, assign_id, task) -> int:
473480

474481
task_path = TASK_OUTPUT_PATH[assign_id]
475482

476-
_ = delete_hdfs_directories(f"/{team_id}/{assign_id}/{task}/{task_path}")
477-
_ = delete_hdfs_directories(f"/{team_id}/{assign_id}/{task}")
478-
_ = delete_hdfs_directories(f"/{team_id}/{assign_id}")
479-
_ = delete_hdfs_directories(f"/{team_id}")
483+
# _ = delete_hdfs_directories(f"/{team_id}/{assign_id}/{task}/{task_path}")
484+
# _ = delete_hdfs_directories(f"/{team_id}/{assign_id}/{task}")
485+
# _ = delete_hdfs_directories(f"/{team_id}/{assign_id}")
486+
# _ = delete_hdfs_directories(f"/{team_id}")
487+
488+
_ = delete_hdfs_directories([
489+
f"/{team_id}/{assign_id}/{task}/{task_path}",
490+
f"/{team_id}/{assign_id}/{task}",
491+
f"/{team_id}/{assign_id}",
492+
f"/{team_id}"
493+
])
480494

481495
return 0
482496

flask_backend/__init__.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
import multiprocessing
12
from redis import Redis, ConnectionPool
23
from common.redisqueue import RedisQueue
34

45
pool = ConnectionPool(host='localhost', port=6379, db=0)
56
broker = Redis(connection_pool=pool)
6-
queue = RedisQueue(broker=broker, queue_name="sanity-queue")
7+
queue = RedisQueue(broker=broker, queue_name="sanity-queue")
8+
9+
manager = multiprocessing.Manager()
10+
executor_table = manager.dict()

flask_backend/app.py flask_backend/backend.py

+77-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from common.utils import Tee, get_datetime
1313

1414
from dotenv import load_dotenv
15-
from flask_backend import queue
15+
from flask_backend import queue, executor_table
1616
from flask_cors import cross_origin
1717
from flask import Flask, request, jsonify
1818
from signal import signal, SIGPIPE, SIG_DFL
@@ -26,12 +26,30 @@ def createApp():
2626
os.environ['TZ'] = 'Asia/Kolkata'
2727
time.tzset()
2828

29+
config_path = os.path.join(os.getcwd(),"config", "evaluator.json")
30+
31+
configs = None
32+
with open(config_path, "r") as f:
33+
configs = json.loads(f.read())
34+
2935
app = Flask(__name__)
3036
db = DataBase()
3137

32-
f = open(f'./sanity_checker_logs.txt', 'a+')
38+
configs = configs["backend"]
39+
backend_name = configs["name"]
40+
backend_logdir = configs["log_dir"]
41+
backend_config_dir = configs["config_dir"]
42+
43+
log_path = os.path.join(backend_logdir, backend_name)
44+
if not os.path.exists(log_path):
45+
os.makedirs(log_path)
46+
47+
f = open(os.path.join(log_path, f'sanity_checker_logs.txt'), 'a+')
3348
backup = sys.stdout
3449
sys.stdout = Tee(sys.stdout, f)
50+
51+
def is_registered(executor_ip) -> bool:
52+
return executor_ip in executor_table
3553

3654
def delete_files(path):
3755
for file in os.listdir(path):
@@ -240,13 +258,17 @@ def get_jobs():
240258
client_addr = request.environ['REMOTE_ADDR']
241259
else:
242260
client_addr = request.environ['HTTP_X_FORWARDED_FOR'] # if behind a proxy
261+
262+
if not is_registered(str(client_addr)):
263+
res = {"status": 401, "msg": f"This executor has not been registered with backend server. Please register before making request"}
264+
return jsonify(res)
243265

244266
prefetch_factor = int(request.args.get("prefetch_factor"))
245267

246268
if prefetch_factor is None: prefetch_factor = 1
247269

248270
if len(queue) == 0:
249-
res = {"msg": "Submission Queue is currently empty.", "len": len(queue), "num_submissions": 0}
271+
res = {"msg": "Submission Queue is currently empty.", "len": len(queue), "num_submissions": 0, "status": 200}
250272
return jsonify(res)
251273

252274
data = []
@@ -269,11 +291,10 @@ def get_jobs():
269291

270292
r = requests.post(request_url, data=data)
271293

272-
res = {"msg": f"Dequeued {length} submissions from queue.", "num_submissions": length, "len": len(queue)}
294+
res = {"msg": f"Dequeued {length} submissions from queue.", "num_submissions": length, "len": len(queue), "status": 200}
273295
# res = {"msg": "dequeued from submission queue", "len": len(queue), "server_response": res}
274296
return jsonify(res)
275297

276-
277298
@app.route("/queue-length", methods=["GET"])
278299
def queue_length():
279300
msg = {"length": len(queue)}
@@ -285,6 +306,57 @@ def empty_queue():
285306
res = {"msg": "Queue Emptied"}
286307
return jsonify(res)
287308

309+
@app.route("/register_executor", methods=["POST"])
310+
def register_executor():
311+
executor_addr = None
312+
if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
313+
executor_addr = request.environ['REMOTE_ADDR']
314+
else:
315+
executor_addr = request.environ['HTTP_X_FORWARDED_FOR'] # if behind a proxy
316+
317+
executor_addr = str(executor_addr)
318+
319+
data = json.loads(request.data)
320+
321+
executor_metadata = {
322+
'executor_name': data["executor_name"],
323+
'executor_uuid': data["executor_uuid"],
324+
'executor_log_dir': data["executor_log_dir"],
325+
'num_backends': data["num_backends"],
326+
'num_workers': data["num_workers"],
327+
'num_threads': data["num_threads"],
328+
'num_prefetch_threads': data["num_prefetch_threads"],
329+
'prefetch_factor': data["prefetch_factor"],
330+
'threshold': data["threshold"],
331+
'timeout': data["timeout"],
332+
'ipaddr': executor_addr,
333+
}
334+
executor_table[executor_addr] = executor_metadata
335+
336+
if not os.path.exists(backend_config_dir):
337+
os.makedirs(backend_config_dir)
338+
339+
with open(os.path.join(backend_config_dir, f'{data["executor_name"]}-{data["executor_uuid"]}.json'), 'w', encoding='utf-8') as _f:
340+
json.dump(executor_metadata, _f, ensure_ascii=False, indent=4)
341+
342+
msg = {"status": 200, "message": "Registered!"}
343+
return jsonify(msg)
344+
345+
@app.route("/executors", methods=["GET"])
346+
def get_executors():
347+
files = list(os.listdir(backend_config_dir))
348+
executors = {}
349+
for filename in files:
350+
if ".json" not in filename:
351+
continue
352+
path = os.path.join(backend_config_dir, filename)
353+
e = open(path,"r")
354+
executor = json.loads(e.read())
355+
e.close()
356+
executors[executor['ipaddr']] = executor
357+
358+
return jsonify(executors)
359+
288360
return app
289361

290362
if __name__ == "__main__":

0 commit comments

Comments
 (0)