Skip to content

Commit

Permalink
VC node hard assignment (#698)
Browse files Browse the repository at this point in the history
* VC node hard assignment

* try catch invalid cpu and memory spec

* Add comma

* Update

* Set gpuType=None in pod description for CPU jobs

* Fix logic for cpu jobs

* Add command explanation
  • Loading branch information
Anbang-Hu authored and hongzhili committed Dec 4, 2019
1 parent 082a66d commit 37c778c
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@ src/ClusterBootstrap/services/monitor/alert-templates.yaml
src/ClusterBootstrap/services/jobmanager/dlws-scripts.yaml
src/ClusterBootstrap/services/monitor/alerting/kill-idle.rules
src/ClusterBootstrap/template/*config.yaml.template

.idea
venv
19 changes: 18 additions & 1 deletion src/ClusterBootstrap/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,17 @@ def kubernetes_label_sku():
kubernetes_label_node("--overwrite", machine_name, "sku=%s" % sku)


def kubernetes_label_vc():
"""Label kubernetes nodes with vc=<vc_value>"""
machines = get_machines_by_roles("all", config)

for machine_name, machine_info in machines.items():
vc = "default"
if "vc" in machine_info and machine_info["vc"] is not None:
vc = machine_info["vc"]
kubernetes_label_node("--overwrite", machine_name, "vc=%s" % vc)


def kubernetes_patch_nodes_provider (provider, scaledOnly):
nodes = []
if scaledOnly:
Expand Down Expand Up @@ -3892,6 +3903,9 @@ def run_command( args, command, nargs, parser ):
elif command == "labelsku":
kubernetes_label_sku()

elif command == "labelvc":
kubernetes_label_vc()

elif command == "genscripts":
gen_platform_wise_config()
gen_dns_config_script()
Expand Down Expand Up @@ -4216,7 +4230,10 @@ def upgrade_masters(hypekube_url="gcr.io/google-containers/hyperkube:v1.15.2"):
upgrade_workers [nodes] Upgrade the worker nodes. If no additional node is specified, all nodes will be updated.
upgrade [nodes] Upgrade the cluster and nodes. If no additional node is specified, all nodes will be updated.
labelcpuworker Label CPU nodes with "worker" role with cpuworker=active if their SKU is defined in sku_meta.
labelsku Label nodes with sku=<sku_value> if their SKU is defined in sku_meta.
labelsku Label nodes with "sku=<sku_value>" if their SKU is defined in sku_meta. In order to run distributed
CPU jobs, ./deploy.py labelcpuworker must be executed as well.
labelvc Label nodes with "vc=<vc_value>" if vc is defined in machine's property in machines sections in config.
Default to "vc=default".
''') )
parser.add_argument("-y", "--yes",
help="Answer yes automatically for all prompt",
Expand Down
2 changes: 1 addition & 1 deletion src/ClusterBootstrap/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@
# 3. default_cpu_sku is set to a valid value that exists in sku_meta
"enable_cpuworker": False,
"enable_blobfuse": False,
"enable_custom_registry_secrets": False
"enable_custom_registry_secrets": False,
"default_cpu_sku": "Standard_D2s_v3",

# SKU meta defines different types of resources for each SKU
Expand Down
1 change: 1 addition & 0 deletions src/ClusterBootstrap/template/RestfulAPI/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,4 @@ local_fast_storage: {{cnf["local_fast_storage"]}}
enable_custom_registry_secrets: {{cnf["enable_custom_registry_secrets"]}}
master_token: {{cnf["master_token"]}}
nccl_ib_disable: {{cnf["nccl_ib_disable"]}}
vc_node_hard_assignment: {{cnf["vc_node_hard_assignment"]}}
13 changes: 13 additions & 0 deletions src/ClusterManager/dist_pod_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ def generate_pods(self, job):
params["nodeSelector"] = {}
if "gpuType" in params:
params["nodeSelector"]["gpuType"] = params["gpuType"]

# Set up VC dedicated node usage
vc_node_hard_assignment = job.get_vc_node_hard_assignment()
if isinstance(vc_node_hard_assignment, dict):
vc = params["vcName"]
# Only consider GPU jobs
if vc in vc_node_hard_assignment and \
vc_node_hard_assignment[vc] is True and \
params["resourcegpu"] > 0:
params["nodeSelector"]["vc"] = vc
else:
params["nodeSelector"]["vc"] = "default"

assignedRack = job.get_rack()
if assignedRack is not None:
params["nodeSelector"]["rack"] = assignedRack
Expand Down
4 changes: 4 additions & 0 deletions src/ClusterManager/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,13 @@ def get_enable_blobfuse(self):

def get_enable_custom_registry_secrets(self):
return self._get_cluster_config("enable_custom_registry_secrets")

def get_nccl_ib_disable(self):
return self._get_cluster_config("nccl_ib_disable")

def get_vc_node_hard_assignment(self):
return self._get_cluster_config("vc_node_hard_assignment")

def _get_cluster_config(self, key):
if key in self.cluster:
return self.cluster[key]
Expand Down
12 changes: 12 additions & 0 deletions src/ClusterManager/pod_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ def generate_pods(self, job):
if "gpuType" in params:
params["nodeSelector"]["gpuType"] = params["gpuType"]

# Set up VC dedicated node usage
vc_node_hard_assignment = job.get_vc_node_hard_assignment()
if isinstance(vc_node_hard_assignment, dict):
vc = params["vcName"]
# Only consider GPU jobs
if vc in vc_node_hard_assignment and \
vc_node_hard_assignment[vc] is True and \
params["resourcegpu"] > 0:
params["nodeSelector"]["vc"] = vc
else:
params["nodeSelector"]["vc"] = "default"

params = enable_cpu_config(params, job.cluster)

local_pod_path = job.get_hostpath(job.job_path, "master")
Expand Down
44 changes: 28 additions & 16 deletions src/ClusterManager/pod_template_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python3
#!/usr/bin/python
def cpu_format(cpu, ratio=1.0):
"""Convert number of cpu to cpu cycle.
Expand All @@ -9,9 +9,12 @@ def cpu_format(cpu, ratio=1.0):
Returns:
Formatted string of cpu cycle if cpu is valid, None otherwise.
"""
if cpu is None:
try:
cpu = float(cpu)
except:
return None
return "%dm" % int(ratio * cpu * 1000)
else:
return "%dm" % int(ratio * cpu * 1000)


def mem_format(memory, ratio=1.0):
Expand All @@ -24,9 +27,12 @@ def mem_format(memory, ratio=1.0):
Returns:
Formatted string of memory size if memory is valid, None otherwise.
"""
if memory is None:
try:
memory = float(memory)
except:
return None
return "%dM" % int(ratio * memory * 1024)
else:
return "%dM" % int(ratio * memory * 1024)


def get_sku_info(sku, config):
Expand Down Expand Up @@ -83,22 +89,31 @@ def enable_cpu_config(pod, config):
Returns:
Potentially modified pod.
"""
# Only works for 0-GPU pod
if "resourcegpu" not in pod or int(pod["resourcegpu"]) != 0:
return pod

# Ignore if cpuworker is not enabled
enable_cpuworker = config.get("enable_cpuworker", False)
if enable_cpuworker is False:
return pod

# Add node selector cpuworker=active
# Only works for 0-GPU job
if "resourcegpu" not in pod or int(pod["resourcegpu"]) != 0:
return pod

# When cpuworker is enabled, CPU job should have gpuType=None
if "nodeSelector" not in pod:
pod["nodeSelector"] = {}
pod["nodeSelector"]["gpuType"] = "None"

job_training_type = pod.get("jobtrainingtype", None)
dist_role = pod.get("distRole", None)

# No special config for ps pod. It is always co-located with a worker
if dist_role == "ps":
return pod

# Add node selector cpuworker=active
pod["nodeSelector"]["cpuworker"] = "active"

# Add node selector sku=<sku_value>
job_training_type = pod.get("jobtrainingtype", None)
default_cpu_sku = config.get("default_cpu_sku", None)
if "sku" in pod:
pod["nodeSelector"]["sku"] = pod["sku"]
Expand All @@ -115,7 +130,7 @@ def enable_cpu_config(pod, config):
default_mem_request = None
default_mem_limit = None

if job_training_type == "PSDistJob" and pod["distRole"] == "worker":
if job_training_type == "PSDistJob" and dist_role == "worker":
full_node = True
else:
full_node = False
Expand All @@ -124,14 +139,11 @@ def enable_cpu_config(pod, config):
sku = pod["nodeSelector"].get("sku", None)
sku_info = get_sku_info(sku=sku, config=config)
if sku_info is not None:
# Do not restrict the limit for full node worker
default_cpu_request = cpu_format(sku_info["cpu"],
sku_info["cpu_ratio"])
default_cpu_limit = cpu_format(sku_info["cpu"],
sku_info["cpu_ratio"])
default_mem_request = mem_format(sku_info["memory"],
sku_info["memory_ratio"])
default_mem_limit = mem_format(sku_info["memory"],
sku_info["memory_ratio"])
else:
default_cpu_request = cpu_format(config.get("default_cpurequest"))
default_cpu_limit = cpu_format(config.get("default_cpulimit"))
Expand Down

0 comments on commit 37c778c

Please sign in to comment.