17
17
from flask import Flask , request , jsonify
18
18
from signal import signal , SIGPIPE , SIG_DFL
19
19
from flask_backend .parser import SanityCheckerASTVisitor
20
+ from job_tracker .job import MRJob , SparkJob , KafkaJob , job_template_selector
21
+ from pprint import pprint
20
22
21
23
signal (SIGPIPE , SIG_DFL )
22
24
@@ -113,12 +115,23 @@ def update_submission(marks, message, data, send_mail=False):
113
115
@cross_origin ()
114
116
def sanity_check ():
115
117
'''
116
- Currently assuming the assignment to be a MR Job
118
+ BigHOST Sanity Checker
117
119
'''
120
+
118
121
jobs = json .loads (request .data )
119
122
data = jobs
120
123
124
+ job_template = job_template_selector (data ["assignmentId" ])
125
+
126
+ job = job_template (
127
+ team_id = data ["teamId" ],
128
+ assignment_id = data ["assignmentId" ],
129
+ submission_id = data ["submissionId" ]
130
+ )
131
+
132
+ job .record ("received" )
121
133
update_submission (marks = - 1 , message = 'Sanity Checking' , data = data )
134
+ job .record ("sanity_check_start" )
122
135
123
136
compile_path = f"{ os .path .join (os .getcwd (),'compile-test' , str (data ['submissionId' ]))} "
124
137
@@ -127,7 +140,7 @@ def sanity_check():
127
140
128
141
_ = open (os .path .join (compile_path , "__init__.py" ), "w+" ).close () # required for pylint to work
129
142
130
- if "A3" not in data ["assignmentId" ]:
143
+ if "A1" in data [ "assignmentId" ] or "A2" in data ["assignmentId" ]:
131
144
mapper_data = data ["mapper" ]
132
145
reducer_data = data ['reducer' ]
133
146
mapper_name = f"{ data ['teamId' ]} -{ data ['assignmentId' ]} -mapper.py"
@@ -250,10 +263,25 @@ def sanity_check():
250
263
update_submission (marks = - 1 , message = 'Sanity Check Passed' , data = data )
251
264
252
265
data ["timeout" ] = get_timeouts (assignment_id = data ['assignmentId' ])
266
+
267
+ job .record ("sanity_check_end" )
268
+
269
+ if isinstance (job , MRJob ):
270
+ job .mapper = data ["mapper" ]
271
+ job .reducer = data ["reducer" ]
272
+ elif isinstance (job , SparkJob ):
273
+ job .spark = data ["spark" ]
274
+ elif isinstance (job , KafkaJob ):
275
+ job .producer = data ["producer" ]
276
+ job .consumer = data ["consumer" ]
277
+
278
+ job .timeout = data ["timeout" ]
253
279
254
- update_submission (marks = - 1 , message = 'Queued for Execution' , data = data )
280
+ update_submission (marks = - 1 , message = 'Queued for Execution' , data = job . get_db_fields () )
255
281
256
- data = pickle .dumps (data )
282
+ job .record ("waiting_queue_entry" )
283
+
284
+ data = pickle .dumps (job )
257
285
queue .enqueue (data )
258
286
259
287
res = {"msg" : "Queued" , "len" : len (queue )}
@@ -280,7 +308,7 @@ def get_jobs():
280
308
res = {"msg" : "Submission Queue is currently empty." , "len" : len (queue ), "num_submissions" : 0 , "status" : 200 }
281
309
return jsonify (res )
282
310
283
- data = []
311
+ data = {}
284
312
i = 0
285
313
while i < prefetch_factor :
286
314
queue_data = queue .dequeue ()
@@ -290,7 +318,9 @@ def get_jobs():
290
318
291
319
_ , serialized_job = queue_data
292
320
job = pickle .loads (serialized_job )
293
- data .append (job )
321
+ job .record ("waiting_queue_exit" )
322
+ # pprint(job.__dict__)
323
+ data [f"job{ i + 1 } " ] = job .__dict__
294
324
i += 1
295
325
296
326
length = len (data )
@@ -303,6 +333,7 @@ def get_jobs():
303
333
"status" : 200 ,
304
334
"jobs" : data
305
335
}
336
+
306
337
return jsonify (res )
307
338
308
339
@app .route ("/register_executor" , methods = ["POST" ])
@@ -405,7 +436,13 @@ def executor_log():
405
436
if not os .path .exists (executor_log_path ):
406
437
os .makedirs (executor_log_path )
407
438
408
- logs = json .loads (data ["logs" ])
439
+ logs = json .loads (data ["worker_logs" ])
440
+ for logname in logs :
441
+ f = open (os .path .join (executor_log_path , logname ), "a+" )
442
+ f .write (logs [logname ])
443
+ f .close ()
444
+
445
+ logs = json .loads (data ["output_processor_logs" ])
409
446
for logname in logs :
410
447
f = open (os .path .join (executor_log_path , logname ), "a+" )
411
448
f .write (logs [logname ])
0 commit comments