Skip to content

Commit

Permalink
add user experience related metrics (#617)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudifsd authored and Anbang-Hu committed Nov 1, 2019
1 parent 79fb4c0 commit 635e81f
Showing 1 changed file with 90 additions and 0 deletions.
90 changes: 90 additions & 0 deletions src/ClusterManager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,90 @@
float("inf")),
labelnames=("fn_name",))

job_state_change_histogram = Histogram("job_state_change_latency_seconds",
"latency for job to change state(seconds)",
buckets=(1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0,
float("inf")),
labelnames=("current_state",))

class JobTimeRecord(object):
def __init__(self):
self.create_time = None
self.approve_time = None
self.submit_time = None


class LRUDefatulDict(object):
class Node(object):
def __init__(self, key, val, next=None, prev=None):
self.key = key
self.val = val
self.next = next
self.prev = prev

def __init__(self, cap, factory):
assert cap > 0
self.m = {}
self.head = self.tail = None
self.cap = cap
self.factory = factory

def __getitem__(self, key):
return self._get(key)

def _get(self, key):
if self.m.get(key) is None:
self._put(key, self.factory())

return self.m[key].val

def _put(self, key, value):
if self.m.get(key) is not None:
node = self.m[key]
if node == self.head:
node.val = value
else:
node.prev.next = node.next
if node.next is not None:
node.next.prev = node.prev
node.prev = None
node.next = self.head
self.head.prev = node
self.head = node
else:
self.head = LRUDefatulDict.Node(key, value, self.head, None)
if self.tail is None:
self.tail = self.head
else:
self.head.next.prev = self.head
self.m[key] = self.head
if len(self.m) > self.cap:
self.m.pop(self.tail.key)
self.tail = self.tail.prev
self.tail.next = None

# pure memory data structure
job_time_recorder = LRUDefatulDict(500, lambda : JobTimeRecord())

# If previous state has no record, which means the job_manager get restarted
# or previous entry is expired, we ignore this entry.
def update_job_state_latency(job_id, state, event_time=None):
if event_time is None:
event_time = datetime.datetime.utcnow()

if state == "create":
job_time_recorder[job_id].create_time = event_time
elif state == "approve":
job_time_recorder[job_id].approve_time = event_time
if job_time_recorder[job_id].create_time is not None:
elapsed = (event_time - job_time_recorder[job_id].create_time).seconds
job_state_change_histogram.labels(state).observe(elapsed)
elif state == "submit":
job_time_recorder[job_id].submit_time = event_time
if job_time_recorder[job_id].approve_time is not None:
elapsed = (event_time - job_time_recorder[job_id].approve_time).seconds
job_state_change_histogram.labels(state).observe(elapsed)

def record(fn):
@functools.wraps(fn)
def wrapped(*args, **kwargs):
Expand Down Expand Up @@ -98,6 +182,9 @@ def ApproveJob(job, dataHandlerOri=None):
try:
job_id = job["jobId"]
vcName = job["vcName"]

update_job_state_latency(job_id, "create", event_time=job["jobTime"])

jobParams = json.loads(base64.b64decode(job["jobParams"]))
job_total_gpus = GetJobTotalGpu(jobParams)

Expand All @@ -111,6 +198,7 @@ def ApproveJob(job, dataHandlerOri=None):
detail = [{"message": "waiting for available preemptible resource."}]
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
dataHandler.UpdateJobTextField(job_id, "jobStatus", "queued")
update_job_state_latency(job_id, "approve")
if dataHandlerOri is None:
dataHandler.Close()
return True
Expand Down Expand Up @@ -151,6 +239,7 @@ def ApproveJob(job, dataHandlerOri=None):
detail = [{"message": "waiting for available resource."}]
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
dataHandler.UpdateJobTextField(job_id, "jobStatus", "queued")
update_job_state_latency(job_id, "approve")
if dataHandlerOri is None:
dataHandler.Close()
return True
Expand Down Expand Up @@ -443,6 +532,7 @@ def TakeJobActions(launcher, jobs):
try:
if sji["job"]["jobStatus"] == "queued" and (sji["allowed"] is True):
launcher.submit_job(sji["job"])
update_job_state_latency(sji["jobId"], "submit")
logging.info("TakeJobActions : submitting job : %s : %s" % (sji["jobId"], sji["sortKey"]))
elif sji["preemptionAllowed"] and (sji["job"]["jobStatus"] == "scheduling" or sji["job"]["jobStatus"] == "running") and (sji["allowed"] is False):
launcher.kill_job(sji["job"]["jobId"], "queued")
Expand Down

0 comments on commit 635e81f

Please sign in to comment.