Skip to content

accelerate jobmanager loop #642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/ClusterBootstrap/services/jobmanager/jobmanager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ spec:
hostPort: 9205
name: endpoint-mgr
protocol: TCP
- containerPort: 9206
hostPort: 9206
name: job-mgr2
protocol: TCP
- containerPort: 9207
hostPort: 9207
name: job-mgr3
protocol: TCP
- containerPort: 9208
hostPort: 9208
name: job-mgr4
protocol: TCP
readinessProbe:
failureThreshold: 3
initialDelaySeconds: 3
Expand Down
18 changes: 14 additions & 4 deletions src/ClusterManager/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,18 @@ def Run(args):

cwd = os.path.dirname(__file__)
cmds = {
"job_manager1":
["python", os.path.join(cwd, "job_manager.py"), "--port", str(args.j1), "--updateblock", "1"],
"job_manager2":
["python", os.path.join(cwd, "job_manager.py"), "--port", str(args.j2), "--updateblock", "2"],
"job_manager_killing,pausing,unapproved":
["python", os.path.join(cwd, "job_manager.py"), "--port", str(args.j1),
"--status", "killing,pausing,unapproved"],
"job_manager_running":
["python", os.path.join(cwd, "job_manager.py"), "--port", str(args.j2),
"--status", "running"],
"job_manager_scheduling":
["python", os.path.join(cwd, "job_manager.py"), "--port", str(args.j3),
"--status", "scheduling"],
"job_manager_queued":
["python", os.path.join(cwd, "job_manager.py"), "--port", str(args.j4),
"--status", "queued"],
"user_manager":
["python", os.path.join(cwd, "user_manager.py"), "--port", str(args.u)],
"node_manager":
Expand Down Expand Up @@ -149,6 +157,8 @@ def work(cmds, childs, FNULL):
parser.add_argument("--tictoc", help="how many seconds to wait until kill subprocess", type=int, default=600)
parser.add_argument("-j1", help="port of job_manager", type=int, default=9200)
parser.add_argument("-j2", help="port of job_manager", type=int, default=9206)
parser.add_argument("-j3", help="port of job_manager", type=int, default=9207)
parser.add_argument("-j4", help="port of job_manager", type=int, default=9208)
parser.add_argument("-u", help="port of user_manager", type=int, default=9201)
parser.add_argument("-n", help="port of node_manager", type=int, default=9202)
parser.add_argument("-l", help="port of joblog_manager", type=int, default=9203)
Expand Down
85 changes: 60 additions & 25 deletions src/ClusterManager/job_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
import datetime
import base64
import multiprocessing

from kubernetes import client, config as k8s_config
from kubernetes.client.rest import ApiException
Expand Down Expand Up @@ -271,7 +272,7 @@ def pod_exec(self, pod_name, exec_command, timeout=60):
return [-1, err.message]


class JobRole:
class JobRole(object):
MARK_ROLE_READY_FILE = "/pod/running/ROLE_READY"

@staticmethod
Expand All @@ -286,30 +287,34 @@ def get_job_roles(job_id):
role = pod.metadata.labels["distRole"]
else:
role = "master"
job_role = JobRole(role, pod_name)
job_role = JobRole(role, pod_name, pod)
job_roles.append(job_role)
return job_roles

def __init__(self, role_name, pod_name):
def __init__(self, role_name, pod_name, pod):
self.role_name = role_name
self.pod_name = pod_name
self.pod = pod

def status(self):
# will query api server if refresh is True
def status(self, refresh=False):
"""
Return role status in ["NotFound", "Pending", "Running", "Succeeded", "Failed", "Unknown"]
It's slightly different from pod phase, when pod is running:
CONTAINER_READY -> WORKER_READY -> JOB_READY (then the job finally in "Running" status.)
"""
# pod-phase: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
# node condition: https://kubernetes.io/docs/concepts/architecture/nodes/#condition
deployer = JobDeployer()
pods = deployer.get_pods(field_selector="metadata.name={}".format(self.pod_name))
logging.debug("Pods: {}".format(pods))
if(len(pods) < 1):
return "NotFound"
if refresh:
deployer = JobDeployer()
pods = deployer.get_pods(field_selector="metadata.name={}".format(self.pod_name))
logging.debug("Pods: {}".format(pods))
if(len(pods) < 1):
return "NotFound"

assert(len(pods) == 1)
self.pod = pods[0]

assert(len(pods) == 1)
self.pod = pods[0]
phase = self.pod.status.phase

# !!! Pod is running, doesn't mean "Role" is ready and running.
Expand All @@ -319,29 +324,28 @@ def status(self):
return "Unknown"

# Check if the user command had been ran.
if not self.isRoleReady():
if not self._is_role_ready():
return "Pending"

return phase

# TODO should call after status(), or the self.pod would be None
def pod_details(self):
return self.pod

def isFileExisting(self, file):
def _is_file_exist(self, file):
deployer = JobDeployer()
status_code, _ = deployer.pod_exec(self.pod_name, ["/bin/sh", "-c", "ls -lrt {}".format(file)])
return status_code == 0

def isRoleReady(self):
def _is_role_ready(self):
for container in self.pod.spec.containers:
if container.name == self.pod_name and container.readiness_probe is not None:
for status in self.pod.status.container_statuses:
if status.name == self.pod_name:
log.info("pod %s have readiness_probe result", self.pod_name)
return status.ready
# no readiness_probe defined, fallback to old way
return self.isFileExisting(JobRole.MARK_ROLE_READY_FILE)
return self._is_file_exist(JobRole.MARK_ROLE_READY_FILE)


# Interface class for managing life time of job
Expand All @@ -352,9 +356,6 @@ def __init__(self):
def start(self):
pass

def get_job_status_detail(self, job_id):
pass

def submit_job(self, job_desc):
pass

Expand Down Expand Up @@ -404,14 +405,24 @@ def job_status_detail_with_finished_time(job_status_detail, status, msg=""):


class PythonLauncher(Launcher):
def __init__(self):
pass
def __init__(self, pool_size=3):
self.processes = []
self.queue = None
self.pool_size = pool_size
# items in queue should be tuple of 3 elements: (function name, args, kwargs)

def start(self):
pass
if len(self.processes) == 0:
self.queue = multiprocessing.JoinableQueue()

def get_job_status_detail(self, job_id):
pass
for i in range(self.pool_size):
p = multiprocessing.Process(target=self.run,
args=(self.queue,), name="py-launcher-" + str(i))
self.processes.append(p)
p.start()

def wait_tasks_done(self):
self.queue.join()

def _all_pods_not_existing(self, job_id):
job_deployer = JobDeployer()
Expand All @@ -421,6 +432,9 @@ def _all_pods_not_existing(self, job_id):
return all([status == "NotFound" for status in statuses])

def submit_job(self, job):
self.queue.put(("submit_job", (job,), {}))

def submit_job_impl(self, job):
# check if existing any pod with label: run=job_id
assert("jobId" in job)
job_id = job["jobId"]
Expand Down Expand Up @@ -524,7 +538,10 @@ def submit_job(self, job):
dataHandler.Close()
return ret

def kill_job(self, job_id, desired_state="killed", dataHandlerOri=None):
def kill_job(self, job_id, desired_state="killed"):
self.queue.put(("kill_job", (job_id,), {"desired_state": desired_state}))

def kill_job_impl(self, job_id, desired_state="killed", dataHandlerOri=None):
if dataHandlerOri is None:
dataHandler = DataHandler()
else:
Expand Down Expand Up @@ -552,3 +569,21 @@ def kill_job(self, job_id, desired_state="killed", dataHandlerOri=None):
dataHandler.Close()
logging.error("Kill job failed with errors: {}".format(errors))
return False

def run(self, queue):
# TODO maintain a data_handler so do not need to init it every time
while True:
func_name, args, kwargs = queue.get(True)

try:
if func_name == "submit_job":
self.submit_job_impl(*args, **kwargs)
elif func_name == "kill_job":
self.kill_job_impl(*args, **kwargs)
else:
logger.error("unknown func_name %s, with args %s %s",
func_name, args, kwargs)
except Exception:
logging.exception("processing job failed")
finally:
queue.task_done()
71 changes: 35 additions & 36 deletions src/ClusterManager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,7 @@ def get_job_priority(priority_dict, job_id):


@record
def TakeJobActions(redis_conn, launcher, jobs):
data_handler = DataHandler()

def TakeJobActions(data_handler, redis_conn, launcher, jobs):
vc_list = data_handler.ListVCs()
cluster_status, _ = data_handler.GetClusterStatus()
cluster_total = cluster_status["gpu_capacity"]
Expand Down Expand Up @@ -569,60 +567,61 @@ def TakeJobActions(redis_conn, launcher, jobs):

logging.info("TakeJobActions : job desired actions taken")


def Run(redis_port, updateblock=0):
def Run(redis_port, target_status):
register_stack_trace_dump()
create_log()

notifier = notify.Notifier(config.get("job-manager"))
notifier.start()
create_log()

launcher = PythonLauncher()
launcher.start()

redis_conn = redis.StrictRedis(host="localhost",
port=redis_port, db=0)

process_name = "job_manager_" + target_status

while True:
if updateblock == 0:
update_file_modification_time("job_manager")
else:
update_file_modification_time("job_manager" + str(updateblock))
update_file_modification_time(process_name)

with manager_iteration_histogram.labels("job_manager").time():
with manager_iteration_histogram.labels(process_name).time():
try:
config["racks"] = k8sUtils.get_node_labels("rack")
config["skus"] = k8sUtils.get_node_labels("sku")
except Exception as e:
logging.exception("get node labels failed")

try:
dataHandler = DataHandler()
launcher.wait_tasks_done() # wait for tasks from previous batch done

if updateblock == 0 or updateblock == 1:
pendingJobs = dataHandler.GetPendingJobs()
TakeJobActions(redis_conn, launcher, pendingJobs)
dataHandler = DataHandler()

pendingJobs = dataHandler.GetPendingJobs()
logging.info("Updating status for %d jobs" % len(pendingJobs))
for job in pendingJobs:
try:
if target_status == "queued":
jobs = dataHandler.GetJobList("all", "all", num=None,
status="queued,scheduling,running")
TakeJobActions(dataHandler, redis_conn, launcher, jobs)
else:
jobs = dataHandler.GetJobList("all", "all", num=None,
status=target_status)
logging.info("Updating status for %d %s jobs",
len(jobs), target_status)

for job in jobs:
logging.info("Processing job: %s, status: %s" % (job["jobId"], job["jobStatus"]))
if updateblock == 0 or updateblock == 2:
if job["jobStatus"] == "killing":
launcher.kill_job(job["jobId"], "killed", dataHandlerOri=dataHandler)
elif job["jobStatus"] == "pausing":
launcher.kill_job(job["jobId"], "paused", dataHandlerOri=dataHandler)
elif job["jobStatus"] == "running":
UpdateJobStatus(redis_conn, launcher, job, notifier, dataHandlerOri=dataHandler)

if updateblock == 0 or updateblock == 1:
if job["jobStatus"] == "scheduling":
UpdateJobStatus(redis_conn, launcher, job, notifier, dataHandlerOri=dataHandler)
elif job["jobStatus"] == "unapproved":
ApproveJob(redis_conn, job, dataHandlerOri=dataHandler)
except Exception as e:
logging.warning(e, exc_info=True)
if job["jobStatus"] == "killing":
launcher.kill_job(job["jobId"], "killed")
elif job["jobStatus"] == "pausing":
launcher.kill_job(job["jobId"], "paused")
elif job["jobStatus"] == "running":
UpdateJobStatus(redis_conn, launcher, job, notifier, dataHandlerOri=dataHandler)
elif job["jobStatus"] == "scheduling":
UpdateJobStatus(redis_conn, launcher, job, notifier, dataHandlerOri=dataHandler)
elif job["jobStatus"] == "unapproved":
ApproveJob(redis_conn, job, dataHandlerOri=dataHandler)
else:
logging.error("unknown job status %s for job %s",
job["jobStatus"], job["jobId"])
except Exception as e:
logging.warning("Process job failed!", exc_info=True)
finally:
Expand All @@ -638,9 +637,9 @@ def Run(redis_port, updateblock=0):
parser = argparse.ArgumentParser()
parser.add_argument("--redis_port", "-r", help="port of redis", type=int, default=9300)
parser.add_argument("--port", "-p", help="port of exporter", type=int, default=9200)
parser.add_argument("--updateblock", "-u", help="updateblock", type=int, default=0)
parser.add_argument("--status", "-s", help="target status to update, queued is a special status", type=str, default="queued")

args = parser.parse_args()
setup_exporter_thread(args.port)

Run(args.redis_port, args.updateblock)
Run(args.redis_port, args.status)
6 changes: 5 additions & 1 deletion src/utils/MySQLDataHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,13 @@ def GetJobList(self, userName, vcName, num = None, status = None, op = ("=","or"
ret = []
cursor = self.conn.cursor()
try:
query = "SELECT `jobId`,`jobName`,`userName`, `vcName`, `jobStatus`, `jobStatusDetail`, `jobType`, `jobDescriptionPath`, `jobDescription`, `jobTime`, `endpoints`, `jobParams`,`errorMsg` ,`jobMeta` FROM `%s` where `vcName` = '%s'" % (self.jobtablename, vcName)
query = "SELECT `jobId`,`jobName`,`userName`, `vcName`, `jobStatus`, `jobStatusDetail`, `jobType`, `jobDescriptionPath`, `jobDescription`, `jobTime`, `endpoints`, `jobParams`,`errorMsg` ,`jobMeta` FROM `%s` where 1" % (self.jobtablename)
if userName != "all":
query += " and `userName` = '%s'" % userName

if vcName != "all":
query += " and `vcName` = '%s'" % vcName

if status is not None:
if "," not in status:
query += " and `jobStatus` %s '%s'" % (op[0], status)
Expand Down