Skip to content

Commit 22615af

Browse files
authored
Merge pull request #717 from leigaoms/lgrestapi
perf optimization for job list and detail
2 parents 9decea4 + 4389b9c commit 22615af

8 files changed

+1216
-629
lines changed

src/ClusterManager/job_launcher.py

+26-14
Original file line numberDiff line numberDiff line change
@@ -650,11 +650,6 @@ def submit_job_impl(self, job):
650650

651651
ret["jobId"] = job_object.job_id
652652

653-
dataHandler.UpdateJobTextField(job_object.job_id, "jobStatus", "scheduling")
654-
dataHandler.UpdateJobTextField(job_object.job_id, "jobDescriptionPath", job_description_path)
655-
dataHandler.UpdateJobTextField(job_object.job_id, "jobDescription", base64.b64encode(job_description))
656-
dataHandler.UpdateJobTextField(job_object.job_id, "lastUpdated", datetime.datetime.now().isoformat())
657-
658653
jobMeta = {}
659654
jobMeta["jobDescriptionPath"] = job_description_path
660655
jobMeta["jobPath"] = job_object.job_path
@@ -663,19 +658,31 @@ def submit_job_impl(self, job):
663658
jobMeta["LaunchCMD"] = pods[0].spec.containers[0].command
664659

665660
jobMetaStr = base64.b64encode(json.dumps(jobMeta))
666-
dataHandler.UpdateJobTextField(job_object.job_id, "jobMeta", jobMetaStr)
661+
662+
dataFields = {
663+
"jobStatus": "scheduling",
664+
"jobDescriptionPath": job_description_path,
665+
"jobDescription": base64.b64encode(job_description),
666+
"lastUpdated": datetime.datetime.now().isoformat(),
667+
"jobMeta": jobMetaStr
668+
}
669+
conditionFields = {"jobId": job_object.job_id}
670+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
667671
except Exception as e:
668672
logger.error("Submit job failed: %s" % job, exc_info=True)
669673
ret["error"] = str(e)
670674
retries = dataHandler.AddandGetJobRetries(job["jobId"])
671675
if retries >= 5:
672-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatus", "error")
673-
dataHandler.UpdateJobTextField(job["jobId"], "errorMsg", "Cannot submit job!" + str(e))
674-
675676
detail = get_job_status_detail(job)
676677
detail = job_status_detail_with_finished_time(detail, "error", "Server error in job submission")
677-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
678678

679+
dataFields = {
680+
"jobStatus": "error",
681+
"errorMsg": "Cannot submit job!" + str(e),
682+
"jobStatusDetail": base64.b64encode(json.dumps(detail))
683+
}
684+
conditionFields = {"jobId": job["jobId"]}
685+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
679686
# Try to clean up the job
680687
try:
681688
job_deployer = JobDeployer()
@@ -705,15 +712,20 @@ def kill_job_impl(self, job_id, desired_state="killed", dataHandlerOri=None):
705712
job_deployer = JobDeployer()
706713
errors = job_deployer.delete_job(job_id, force=True)
707714

715+
dataFields = {
716+
"jobStatusDetail": base64.b64encode(json.dumps(detail)),
717+
"lastUpdated": datetime.datetime.now().isoformat()
718+
}
719+
conditionFields = {"jobId": job_id}
708720
if len(errors) == 0:
709-
dataHandler.UpdateJobTextField(job_id, "jobStatus", desired_state)
710-
dataHandler.UpdateJobTextField(job_id, "lastUpdated", datetime.datetime.now().isoformat())
721+
dataFields["jobStatus"] = desired_state
722+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
711723
if dataHandlerOri is None:
712724
dataHandler.Close()
713725
return True
714726
else:
715-
dataHandler.UpdateJobTextField(job_id, "jobStatus", "error", "{}".format(errors))
716-
dataHandler.UpdateJobTextField(job_id, "lastUpdated", datetime.datetime.now().isoformat())
727+
dataFields["jobStatus"] = "error"
728+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
717729
if dataHandlerOri is None:
718730
dataHandler.Close()
719731
logger.error("Kill job failed with errors: {}".format(errors))

src/ClusterManager/job_manager.py

+36-11
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,13 @@ def ApproveJob(redis_conn, job, dataHandlerOri=None):
209209
if "preemptionAllowed" in jobParams and jobParams["preemptionAllowed"] is True:
210210
logger.info("Job {} preemptible, approve!".format(job_id))
211211
detail = [{"message": "waiting for available preemptible resource."}]
212-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
213-
dataHandler.UpdateJobTextField(job_id, "jobStatus", "queued")
212+
213+
dataFields = {
214+
"jobStatusDetail": base64.b64encode(json.dumps(detail)),
215+
"jobStatus": "queued"
216+
}
217+
conditionFields = {"jobId": job_id}
218+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
214219
update_job_state_latency(redis_conn, job_id, "approved")
215220
if dataHandlerOri is None:
216221
dataHandler.Close()
@@ -250,8 +255,13 @@ def ApproveJob(redis_conn, job, dataHandlerOri=None):
250255
return False
251256

252257
detail = [{"message": "waiting for available resource."}]
253-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
254-
dataHandler.UpdateJobTextField(job_id, "jobStatus", "queued")
258+
259+
dataFields = {
260+
"jobStatusDetail": base64.b64encode(json.dumps(detail)),
261+
"jobStatus": "queued"
262+
}
263+
conditionFields = {"jobId": job_id}
264+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
255265
update_job_state_latency(redis_conn, job_id, "approved")
256266
if dataHandlerOri is None:
257267
dataHandler.Close()
@@ -293,8 +303,13 @@ def UpdateJobStatus(redis_conn, launcher, job, notifier=None, dataHandlerOri=Non
293303
# TODO: Refactor
294304
detail = get_job_status_detail(job)
295305
detail = job_status_detail_with_finished_time(detail, "finished")
296-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
297-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatus", "finished")
306+
307+
dataFields = {
308+
"jobStatusDetail": base64.b64encode(json.dumps(detail)),
309+
"jobStatus": "finished"
310+
}
311+
conditionFields = {"jobId": job["jobId"]}
312+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
298313

299314
# Retain the old code for reference
300315
# if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath):
@@ -311,8 +326,13 @@ def UpdateJobStatus(redis_conn, launcher, job, notifier=None, dataHandlerOri=Non
311326
if job["jobStatus"] != "running":
312327
started_at = k8sUtils.localize_time(datetime.datetime.now())
313328
detail = [{"startedAt": started_at, "message": "started at: {}".format(started_at)}]
314-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
315-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatus", "running")
329+
330+
dataFields = {
331+
"jobStatusDetail": base64.b64encode(json.dumps(detail)),
332+
"jobStatus":"running"
333+
}
334+
conditionFields = {"jobId": job["jobId"]}
335+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
316336

317337
elif result == "Failed":
318338
logger.warning("Job %s fails, cleaning...", job["jobId"])
@@ -326,9 +346,14 @@ def UpdateJobStatus(redis_conn, launcher, job, notifier=None, dataHandlerOri=Non
326346
# TODO: Refactor
327347
detail = get_job_status_detail(job)
328348
detail = job_status_detail_with_finished_time(detail, "failed")
329-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
330-
dataHandler.UpdateJobTextField(job["jobId"], "jobStatus", "failed")
331-
dataHandler.UpdateJobTextField(job["jobId"], "errorMsg", "pod failed")
349+
350+
dataFields = {
351+
"jobStatusDetail": base64.b64encode(json.dumps(detail)),
352+
"jobStatus": "failed",
353+
"errorMsg": "pod failed"
354+
}
355+
conditionFields = {"jobId": job["jobId"]}
356+
dataHandler.UpdateJobTextFields(conditionFields, dataFields)
332357

333358
# Retain the old code for reference
334359
# if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath):

0 commit comments

Comments
 (0)