-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathk8s_helper.py
94 lines (71 loc) · 2.58 KB
/
k8s_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import yaml
from kubernetes import client, config
from os import path
import json
import datetime
NAMESPACE = "second-carrier-prediction"
IMAGE = "registry.ailab.rnd.ki.sw.ericsson.se/second-carrier-prediction/main/fl-moe"
num_trainers = 8
def default(o):
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
def create_job_object(command, gen_name="eisamar-fed-moe-job-"):
resources = client.V1ResourceRequirements(
limits={"cpu": 46, "memory": f"{8*num_trainers}Gi", "nvidia.com/gpu": "8"},
requests={"cpu": 46, "memory": f"{8*num_trainers}Gi", "nvidia.com/gpu": "8"})
volumeMounts = client.V1VolumeMount(
name="projectdisk",
mount_path="/proj/second-carrier-prediction/")
container = client.V1Container(
name="fed-moe",
image=IMAGE,
command=command,
volume_mounts=[volumeMounts],
resources=resources,
working_dir="/proj/second-carrier-prediction/federated-learning-mixture/")
secrets = client.V1SecretReference(name="eisamar-fl-moe-token")
#node_selector = {"nvidia.com/gpu": "true"}
tolerations = client.V1Toleration(
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule")
pvc = client.V1PersistentVolumeClaimVolumeSource(
claim_name="cephfs-second-carrier-prediction")
volumes = client.V1Volume(name="projectdisk", persistent_volume_claim=pvc)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": "fed-moe",
"ailab-job-type": "batch"}),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[container],
tolerations=[tolerations],
image_pull_secrets=[secrets],
volumes=[volumes]
))
spec = client.V1JobSpec(
template=template,
backoff_limit=4,
active_deadline_seconds=3*24*60*60)
metadata = client.V1ObjectMeta(
generate_name=gen_name)
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=spec)
return job
def create_job(api_instance, job):
api_response = api_instance.create_namespaced_job(
body=job,
namespace=NAMESPACE)
return api_response.to_dict()
def main():
config.load_kube_config()
batch_v1 = client.BatchV1Api()
job = create_job_object(
["python", "iterator_clusters.py", "-h"],
gen_name="test-")
return create_job(batch_v1, job)
if __name__ == '__main__':
print(json.dumps(main(), default=default))