Skip to content

Commit

Permalink
add a hidden feature for deploy.py to separate code and config (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudifsd authored and Anbang-Hu committed Jan 4, 2020
1 parent 025f934 commit bd891b7
Showing 1 changed file with 69 additions and 70 deletions.
139 changes: 69 additions & 70 deletions src/ClusterBootstrap/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2792,21 +2792,26 @@ def update_config_nodes():
update_config_node( node )

# Running a kubectl commands.
def run_kube( prog, commands ):
def run_kube(prog, commands, config_dir):
one_command = " ".join(commands)
kube_command = ""
if (config["isacs"]):
kube_command = "%s --kubeconfig=./deploy/%s %s" % (prog, config["acskubeconfig"], one_command)
kubeconfig_path = os.path.join(config_dir, config["acskubeconfig"])
kube_command = "%s --kubeconfig=%s %s" % (prog, kubeconfig_path, one_command)
else:
ca_path = os.path.join(config_dir, "ssl/ca/ca.pem")
key_path = os.path.join(config_dir, "ssl/kubelet/apiserver-key.pem")
pem_path = os.path.join(config_dir, "ssl/kubelet/apiserver.pem")

nodes = get_ETCD_master_nodes(config["clusterId"])
master_node = random.choice(nodes)
kube_command = ("%s --server=https://%s:%s --certificate-authority=%s --client-key=%s --client-certificate=%s %s" % (prog, master_node, config["k8sAPIport"], "./deploy/ssl/ca/ca.pem", "./deploy/ssl/kubelet/apiserver-key.pem", "./deploy/ssl/kubelet/apiserver.pem", one_command) )
kube_command = ("%s --server=https://%s:%s --certificate-authority=%s --client-key=%s --client-certificate=%s %s" % (prog, master_node, config["k8sAPIport"], ca_path, key_path, pem_path, one_command) )
if verbose:
print kube_command
print("executing command %s" % (kube_command))
os.system(kube_command)

def run_kubectl( commands ):
run_kube( "./deploy/bin/kubectl", commands)
def run_kubectl(commands, config_dir="./deploy"):
run_kube("./deploy/bin/kubectl", commands, config_dir)

def kubernetes_get_node_name(node):
kube_node_name = ""
Expand Down Expand Up @@ -2893,8 +2898,8 @@ def get_service_yaml( use_service ):
fname = servicedic[use_service]
return fname

def kubernetes_label_node(cmdoptions, nodename, label):
run_kubectl(["label nodes %s %s %s" % (cmdoptions, nodename, label)])
def kubernetes_label_node(cmdoptions, nodename, label, config_dir):
run_kubectl(["label nodes %s %s %s" % (cmdoptions, nodename, label)], config_dir)

# Get the list of nodes for a particular service
#
Expand Down Expand Up @@ -2940,7 +2945,7 @@ def get_node_lists_for_service(service):
# worker_node: all worker node
# The kubernete node will be marked accordingly to facilitate the running of daemon service.

def kubernetes_label_nodes( verb, servicelists, force ):
def kubernetes_label_nodes(verb, servicelists, force, config_dir):
servicedic = get_all_services()
print "servicedic\n", servicedic
get_nodes(config["clusterId"])
Expand Down Expand Up @@ -2969,11 +2974,11 @@ def kubernetes_label_nodes( verb, servicelists, force ):
for node in nodes:
nodename = kubernetes_get_node_name(node)
if verb == "active":
kubernetes_label_node(cmdoptions, nodename, label+"=active")
kubernetes_label_node(cmdoptions, nodename, label+"=active", config_dir)
elif verb == "inactive":
kubernetes_label_node(cmdoptions, nodename, label+"=inactive")
kubernetes_label_node(cmdoptions, nodename, label+"=inactive", config_dir)
elif verb == "remove":
kubernetes_label_node(cmdoptions, nodename, label+"-")
kubernetes_label_node(cmdoptions, nodename, label+"-", config_dir)


# Label kubernete nodes with gpu types.skip for CPU workers
Expand Down Expand Up @@ -3085,7 +3090,7 @@ def kubernetes_patch_nodes_provider (provider, scaledOnly):
# Label kubernete nodes according to property of node (usually specified in config.yaml or cluster.yaml)
# Certain property of node:
# E.g., rack
def kubernetes_mark_nodes( marklist, bMark ):
def kubernetes_mark_nodes(marklist, bMark, config_dir):
if marklist == []:
marklist = config["kubemarks"]
if verbose:
Expand All @@ -3100,11 +3105,11 @@ def kubernetes_mark_nodes( marklist, bMark ):
for mark in marklist:
if mark in nodeconfig:
if bMark:
kubernetes_label_node( "--overwrite", nodename, mark+"="+nodeconfig[mark] )
kubernetes_label_node( "--overwrite", nodename, mark+"="+nodeconfig[mark], config_dir)
else:
kubernetes_label_node( "", nodename, mark+"-" )
kubernetes_label_node( "", nodename, mark+"-", config_dir)

def start_one_kube_service(fname):
def start_one_kube_service(fname, config_dir):
if verbose:
# use try/except because yaml.load cannot load yaml file with multiple documents.
try:
Expand All @@ -3118,14 +3123,14 @@ def start_one_kube_service(fname):

if fname == "./deploy/services/jobmanager/jobmanager.yaml":
# recreate the configmap dlws-scripts
run_kubectl( ["create configmap dlws-scripts --from-file=../Jobs_Templete/ -o yaml --dry-run | ./deploy/bin/kubectl apply -f -"] )
run_kubectl(["create configmap dlws-scripts --from-file=../Jobs_Templete/ -o yaml --dry-run | ./deploy/bin/kubectl apply -f -"], config_dir)

run_kubectl( ["create", "-f", fname ] )
run_kubectl(["create", "-f", fname], config_dir)

def stop_one_kube_service(fname):
run_kubectl( ["delete", "-f", fname ] )
def stop_one_kube_service(fname, config_dir):
run_kubectl(["delete", "-f", fname], config_dir)

def start_kube_service( servicename ):
def start_kube_service(servicename, config_dir):
fname = get_service_yaml( servicename )
dirname = os.path.dirname(fname)
if os.path.exists(os.path.join(dirname,"launch_order")) and "/" not in servicename:
Expand All @@ -3141,7 +3146,7 @@ def start_kube_service( servicename ):
else:
start_one_kube_service(fname)

def stop_kube_service( servicename ):
def stop_kube_service(servicename, config_dir):
fname = get_service_yaml( servicename )
dirname = os.path.dirname(fname)
if os.path.exists(os.path.join(dirname,"launch_order")) and "/" not in servicename:
Expand All @@ -3151,27 +3156,27 @@ def stop_kube_service( servicename ):
# If this line is a sleep tag, skip this line.
if not filename.startswith("SLEEP"):
filename = filename.strip('\n')
stop_one_kube_service(os.path.join(dirname,filename))
stop_one_kube_service(os.path.join(dirname,filename), config_dir)
else:
stop_one_kube_service(fname)
stop_one_kube_service(fname, config_dir)


def replace_kube_service( servicename ):
fname = get_service_yaml( servicename )
run_kubectl( ["replace --force", "-f", fname ] )
def replace_kube_service(servicename, config_dir):
fname = get_service_yaml(servicename)
run_kubectl(["replace --force", "-f", fname], config_dir)

def run_kube_command_node(verb, nodes):
def run_kube_command_node(verb, nodes, config_dir):
for node in nodes:
nodename = kubernetes_get_node_name(node)
run_kubectl( [verb, nodename ] )
run_kubectl([verb, nodename], config_dir)

def run_kube_command_on_nodes( nargs ):
def run_kube_command_on_nodes(nargs, config_dir):
verb = nargs[0]
if len(nargs)>1:
nodes = nargs[1:]
else:
nodes = get_ETCD_master_nodes(config["clusterId"])
run_kube_command_node( verb, nodes)
run_kube_command_node(verb, nodes, config_dir)

def render_docker_images():
if verbose:
Expand Down Expand Up @@ -3256,40 +3261,33 @@ def run_command( args, command, nargs, parser ):
exit()

# Cluster Config
config_cluster = os.path.join(dirpath,"cluster.yaml")
config_cluster = os.path.join(args.directory, "cluster.yaml")
if os.path.exists(config_cluster):
merge_config( config, yaml.load(open(config_cluster)))

merge_config(config, yaml.load(open(config_cluster)))

config_file = os.path.join(dirpath,"config.yaml")
config_file = os.path.join(args.directory, "config.yaml")
if not os.path.exists(config_file):
parser.print_help()
print "ERROR: config.yaml does not exist!"
exit()

f = open(config_file)
merge_config(config, yaml.load(f))
f.close()
if os.path.exists("./deploy/clusterID.yml"):
f = open("./deploy/clusterID.yml")
tmp = yaml.load(f)
f.close()
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]
if "copy_sshtemp" in config and config["copy_sshtemp"]:
if "ssh_origfile" not in config:
config["ssh_origfile"] = config["ssh_cert"]
sshfile = os.path.join(dirpath,config["ssh_origfile"])
if os.path.exists(sshfile):
_, sshtempfile = tempfile.mkstemp(dir='/tmp')
if verbose:
print "SSH file is now {0}".format(sshtempfile)
with open (sshtempfile, 'wb') as output:
with open (sshfile, 'rb') as input:
output.write(input.read())
config["ssh_cert"] = sshtempfile
else:
print "SSH Key {0} not found using original".format(sshfile)
with open(config_file) as f:
merge_config(config, yaml.load(f))

cluster_id_f = os.path.join(args.directory, "clusterID/clusterID.yml")
if os.path.exists(cluster_id_f):
with open(cluster_id_f) as f:
tmp = yaml.load(f)
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]
else:
cluster_id_f = os.path.join(dirpath, "deploy/clusterID.yml")
if os.path.exists(cluster_id_f):
with open(cluster_id_f) as f:
tmp = yaml.load(f)
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]

add_acs_config(command)
if verbose and config["isacs"]:
print "Using Azure Container Services"
Expand Down Expand Up @@ -3669,11 +3667,11 @@ def run_command( args, command, nargs, parser ):
dockername = fetch_config_and_check(config, ["glusterFS", "glusterfs_docker"])
push_docker_images( [dockername] )
elif nargs[0] == "start":
start_kube_service("glusterFS")
start_kube_service("glusterFS", args.directory)
launch_glusterFS_endpoint( nodesinfo, glusterFSargs )
elif nargs[0] == "stop":
stop_glusterFS_endpoint()
stop_kube_service("glusterFS")
stop_kube_service("glusterFS", args.directory)
else:
parser.print_help()
print "Unknown subcommand for glusterFS: " + nargs[0]
Expand Down Expand Up @@ -3871,7 +3869,7 @@ def run_command( args, command, nargs, parser ):
update_config_nodes()

elif command == "kubectl":
run_kubectl(nargs)
run_kubectl(nargs, args.directory)

elif command == "kubernetes":
configuration( config, verbose )
Expand All @@ -3884,7 +3882,7 @@ def run_command( args, command, nargs, parser ):
for service in allservices:
servicenames.append(service)
generate_hdfs_containermounts()
configuration( config, verbose )
configuration(config, verbose )
if nargs[0] == "start":
if args.force and "hdfsformat" in servicenames:
print("This operation will WIPEOUT HDFS namenode, and erase all data on the HDFS cluster, " )
Expand All @@ -3893,20 +3891,20 @@ def run_command( args, command, nargs, parser ):
config["hdfsconfig"]["formatoptions"] = "--force "
# Start a kubelet service.
for servicename in servicenames:
start_kube_service(servicename)
start_kube_service(servicename, args.directory)
elif nargs[0] == "stop":
# stop a kubelet service.
for servicename in servicenames:
stop_kube_service(servicename)
stop_kube_service(servicename, args.directory)
elif nargs[0] == "restart":
# restart a kubelet service.
for servicename in servicenames:
replace_kube_service(servicename)
replace_kube_service(servicename, args.directory)
elif nargs[0] == "labels":
if len(nargs)>=2 and ( nargs[1] == "active" or nargs[1] == "inactive" or nargs[1] == "remove" ):
kubernetes_label_nodes(nargs[1], nargs[2:], args.yes)
kubernetes_label_nodes(nargs[1], nargs[2:], args.yes, args.directory)
elif len(nargs)==1:
kubernetes_label_nodes("active", [], args.yes )
kubernetes_label_nodes("active", [], args.yes, args.directory)
else:
parser.print_help()
print "Error: kubernetes labels expect a verb which is either active, inactive or remove, but get: %s" % nargs[1]
Expand All @@ -3920,11 +3918,11 @@ def run_command( args, command, nargs, parser ):
else:
print "Error: kubernetes patchprovider expect a verb which is either aztools, gstools or awstools."
elif nargs[0] == "mark":
kubernetes_mark_nodes( nargs[1:], True)
kubernetes_mark_nodes(nargs[1:], True, args.directory)
elif nargs[0] == "unmark":
kubernetes_mark_nodes( nargs[1:], False)
kubernetes_mark_nodes(nargs[1:], False, args.directory)
elif nargs[0] == "cordon" or nargs[0] == "uncordon":
run_kube_command_on_nodes(nargs)
run_kube_command_on_nodes(nargs, args.directory)
elif nargs[0] == "labelvc":
kubernetes_label_vc(True)
else:
Expand Down Expand Up @@ -4319,6 +4317,7 @@ def upgrade_masters(hypekube_url="gcr.io/google-containers/hyperkube:v1.15.2"):
action="store",
default=None
)
parser.add_argument("--directory", "-d", help = "config root dir to read from", default=".")

parser.add_argument("command",
help = "See above for the list of valid command" )
Expand Down

0 comments on commit bd891b7

Please sign in to comment.