Skip to content

Commit

Permalink
Add load balance service type (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 authored Dec 21, 2020
1 parent edc0b0e commit 69f401e
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 89 deletions.
106 changes: 44 additions & 62 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from graphscope.deploy.kubernetes.resource_builder import ServiceBuilder
from graphscope.deploy.kubernetes.utils import delete_kubernetes_object
from graphscope.deploy.kubernetes.utils import get_kubernetes_object_info
from graphscope.deploy.kubernetes.utils import get_service_endpoints
from graphscope.framework.utils import random_string
from graphscope.proto import types_pb2

Expand Down Expand Up @@ -84,9 +85,6 @@ class KubernetesClusterLauncher(Launcher):

_interactive_engine_manager_port = 8080 # fixed
_zookeeper_port = 2181 # fixed
_random_interactive_engine_service_port = random.randint(
30001, 31000
) # 30000-32767
_random_analytical_engine_rpc_port = random.randint(56001, 57000)
_random_etcd_listen_peer_service_port = random.randint(57001, 58000)
_random_etcd_listen_client_service_port = random.randint(58001, 59000)
Expand All @@ -98,6 +96,7 @@ class KubernetesClusterLauncher(Launcher):
def __init__(
self,
namespace=None,
service_type=None,
gs_image=None,
etcd_image=None,
zookeeper_image=None,
Expand Down Expand Up @@ -142,6 +141,7 @@ def __init__(
)

self._namespace = namespace
self._service_type = service_type
self._num_workers = num_workers

self._coordinator_name = coordinator_name
Expand Down Expand Up @@ -331,7 +331,7 @@ def _create_vineyard_service(self):
labels = {"name": self._engine_name} # vineyard in engine pod
service_builder = ServiceBuilder(
self._vineyard_service_name,
service_type="NodePort",
service_type=self._service_type,
port=self._vineyard_service_port,
selector=labels,
)
Expand All @@ -344,39 +344,24 @@ def _create_vineyard_service(self):
self._resource_object.extend(targets)

def _get_vineyard_service_endpoint(self):
services = self._core_api.list_namespaced_service(self._namespace)
for svc in services.items:
if svc.metadata.name == self._vineyard_service_name:
port = svc.spec.ports[0].node_port

if svc.status.load_balancer.ingress is not None:
ingress = svc.status.load_balancer.ingress[0]
if ingress.hostname is not None:
host = ingress.hostname
else:
host = ingress.ip
else:
selector = ""
for k, v in svc.spec.selector.items():
selector += k + "=" + v + ","
selector = selector[:-1]

# get pod
pods = self._core_api.list_namespaced_pod(
self._namespace, label_selector=selector
)
host = pods.items[0].status.host_ip
return "{}:{}".format(host, port)
raise RuntimeError("Get vineyard service endpoint failed.")
# Always len(endpoints) >= 1
endpoints = get_service_endpoints(
api_client=self._api_client,
namespace=self._namespace,
name=self._vineyard_service_name,
type=self._service_type,
)
return endpoints[0]

def _create_graphlearn_service(self, object_id, start_port, num_workers):
targets = []
labels = {"name": self._engine_name}
service_builder = ServiceBuilder(
self._gle_service_name_prefix + str(object_id),
service_type="NodePort",
service_type=self._service_type,
port=list(range(start_port, start_port + num_workers)),
selector=labels,
external_traffic_policy="Local",
)
targets.append(
self._core_api.create_namespaced_service(
Expand All @@ -387,29 +372,37 @@ def _create_graphlearn_service(self, object_id, start_port, num_workers):
self._resource_object.extend(targets)

def _parse_graphlearn_service_endpoint(self, object_id):
services = self._core_api.list_namespaced_service(self._namespace)
for svc in services.items:
if svc.metadata.name == self._gle_service_name_prefix + str(object_id):
endpoints = []
for ip, port_spec in zip(self._pod_host_ip_list, svc.spec.ports):
endpoints.append(
(
"%s:%s" % (ip, port_spec.node_port),
int(port_spec.name.split("-")[-1]),
if self._service_type == "NodePort":
services = self._core_api.list_namespaced_service(self._namespace)
for svc in services.items:
if svc.metadata.name == self._gle_service_name_prefix + str(object_id):
endpoints = []
for ip, port_spec in zip(self._pod_host_ip_list, svc.spec.ports):
endpoints.append(
(
"%s:%s" % (ip, port_spec.node_port),
int(port_spec.name.split("-")[-1]),
)
)
)
endpoints.sort(key=lambda ep: ep[1])
return [ep[0] for ep in endpoints]
endpoints.sort(key=lambda ep: ep[1])
return [ep[0] for ep in endpoints]
elif self._service_type == "LoadBalancer":
endpoints = get_service_endpoints(
api_client=self._api_client,
namespace=self._namespace,
name=self._gle_service_name_prefix + str(object_id),
type=self._service_type,
)
return endpoints
raise RuntimeError("Get graphlearn service endpoint failed.")

def _create_interactive_engine_service(self):
targets = []
labels = {"app": self._gie_graph_manager_name}
service_builder = ServiceBuilder(
name=self._gie_graph_manager_service_name,
service_type="NodePort",
service_type="ClusterIP",
port=self._interactive_engine_manager_port,
node_port=self._random_interactive_engine_service_port,
selector=labels,
)
targets.append(
Expand Down Expand Up @@ -625,25 +618,14 @@ def _dump_resource_object(self):
json.dump(rlt, f)

def _get_etcd_service_endpoint(self):
services = self._core_api.list_namespaced_service(namespace=self._namespace)
for svc in services.items:
if svc.metadata.name == self._etcd_service_name:
port = svc.spec.ports[0].port
selector = ""
for k, v in svc.spec.selector.items():
selector += k + "=" + v + ","
selector = selector[:-1]

# get pod
pods = self._core_api.list_namespaced_pod(
namespace=self._namespace, label_selector=selector
)
host = pods.items[0].status.pod_ip
if host is None:
time.sleep(5)
return self._get_etcd_service_endpoint()
return "{}:{}".format(host, port)
return ""
# Always len(endpoints) >= 1
endpoints = get_service_endpoints(
api_client=self._api_client,
namespace=self._namespace,
name=self._etcd_service_name,
type="ClusterIP",
)
return endpoints[0]

def _launch_engine_locally(self):
# generate and distribute hostfile
Expand Down
11 changes: 10 additions & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(self, launcher, dangling_seconds, log_level="info"):

self._request = None
self._object_manager = ObjectManager()
self._dangling_detecting_timer = None
self._config_logging(log_level)

# only one connection is allowed at the same time
Expand Down Expand Up @@ -564,7 +565,8 @@ def _cleanup(self, is_dangling=False):
self._analytical_engine_stub = None

# cancel dangling detect timer
self._dangling_detecting_timer.cancel()
if self._dangling_detecting_timer:
self._dangling_detecting_timer.cancel()

# close engines
self._launcher.stop(is_dangling=is_dangling)
Expand Down Expand Up @@ -672,6 +674,12 @@ def parse_sys_args():
default="graphscope",
help="Contains the namespace to create all resource inside, namespace must be exist.",
)
parser.add_argument(
"--k8s_service_type",
type=str,
default="NodePort",
help="Valid options are NodePort, and LoadBalancer.",
)
parser.add_argument(
"--k8s_gs_image",
type=str,
Expand Down Expand Up @@ -782,6 +790,7 @@ def launch_graphscope():
if args.enable_k8s:
launcher = KubernetesClusterLauncher(
namespace=args.k8s_namespace,
service_type=args.k8s_service_type,
gs_image=args.k8s_gs_image,
etcd_image=args.k8s_etcd_image,
zookeeper_image=args.k8s_zookeeper_image,
Expand Down
4 changes: 3 additions & 1 deletion python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ def _fetch_logs_impl(self):
responses = self._stub.FetchLogs(request)
for resp in responses:
resp = check_grpc_response(resp)
print(resp.message, file=sys.stdout, end="", flush=True)
message = resp.message.rstrip()
if message:
print(message, end="\n", file=sys.stdout, flush=True)

@catch_grpc_error
def _close_session_impl(self):
Expand Down
6 changes: 6 additions & 0 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def __init__(
log_level=gs_config.LOG_LEVEL,
show_log=gs_config.SHOW_LOG,
k8s_namespace=gs_config.NAMESPACE,
k8s_service_type=gs_config.SERVICE_TYPE,
k8s_gs_image=gs_config.GS_IMAGE,
k8s_etcd_image=gs_config.ETCD_IMAGE,
k8s_gie_graph_manager_image=gs_config.GIE_GRAPH_MANAGER_IMAGE,
Expand Down Expand Up @@ -176,6 +177,9 @@ def __init__(
If param missing or the namespace not exist, a random namespace will be created and deleted
when service stopping. Defaults to None.
k8s_service_type (str, optional): Type determines how the GraphScope service is exposed.
Valid options are NodePort, and LoadBalancer. Defaults to NodePort.
k8s_gs_image (str, optional): The GraphScope engine's image.
k8s_etcd_image (str, optional): The image of etcd, which used by vineyard.
Expand Down Expand Up @@ -227,6 +231,7 @@ def __init__(
"log_level",
"show_log",
"k8s_namespace",
"k8s_service_type",
"k8s_gs_image",
"k8s_etcd_image",
"k8s_image_pull_policy",
Expand Down Expand Up @@ -575,6 +580,7 @@ def _connect(self):
self._k8s_cluster = KubernetesCluster(
api_client=api_client,
namespace=self._config_params["k8s_namespace"],
service_type=self._config_params["k8s_service_type"],
minikube_vm_driver=self._config_params["k8s_minikube_vm_driver"],
num_workers=self._config_params["num_workers"],
log_level=self._config_params["log_level"],
Expand Down
2 changes: 2 additions & 0 deletions python/graphscope/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class GSConfig(object):
ENGINE_CPU = 0.5
ENGINE_MEM = "4Gi"

SERVICE_TYPE = "NodePort"

NUM_WORKERS = 2
SHOW_LOG = False
LOG_LEVEL = "info"
Expand Down
41 changes: 16 additions & 25 deletions python/graphscope/deploy/kubernetes/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from graphscope.deploy.kubernetes.resource_builder import RoleBuilder
from graphscope.deploy.kubernetes.resource_builder import ServiceBuilder
from graphscope.deploy.kubernetes.utils import delete_kubernetes_object
from graphscope.deploy.kubernetes.utils import get_service_endpoints
from graphscope.deploy.kubernetes.utils import is_minikube_cluster
from graphscope.framework.errors import K8sError
from graphscope.framework.utils import random_string
Expand All @@ -57,6 +58,9 @@ class KubernetesCluster(object):
namespace: str, optional
Kubernetes namespace. Defaults to None.
service_type: str, optional
Type determines how the GraphScope service is exposed.
minikube_vm_driver: bool, optional
True if minikube cluster :code:`--vm-driver` is not :code:`None`
Expand Down Expand Up @@ -132,6 +136,7 @@ def __init__(
self,
api_client=None,
namespace=None,
service_type=None,
minikube_vm_driver=None,
num_workers=None,
log_level=None,
Expand All @@ -158,6 +163,7 @@ def __init__(
self._rbac_api = kube_client.RbacAuthorizationV1Api(api_client)

self._namespace = namespace
self._service_type = service_type
self._minikube_vm_driver = minikube_vm_driver
self._gs_image = gs_image
self._num_workers = num_workers
Expand Down Expand Up @@ -352,7 +358,7 @@ def _create_coordinator(self):
# create coordinator service
service_builder = ServiceBuilder(
self._coordinator_service_name,
service_type="NodePort",
service_type=self._service_type,
port=self._random_coordinator_service_port,
selector=labels,
)
Expand Down Expand Up @@ -392,6 +398,7 @@ def _create_coordinator(self):
num_workers=self._num_workers,
log_level=self._log_level,
namespace=self._namespace,
service_type=self._service_type,
gs_image=self._gs_image,
etcd_image=self._etcd_image,
gie_graph_manager_image=self._gie_graph_manager_image,
Expand Down Expand Up @@ -525,36 +532,20 @@ def minikube_get_service_url(process, rlt, messages):
)

def _get_coordinator_endpoint(self):
# Note that only support NodePort service type
if is_minikube_cluster() and self._minikube_vm_driver:
return self._get_minikube_service(
self._namespace, self._coordinator_service_name
)

services = self._core_api.list_namespaced_service(self._namespace)
for svc in services.items:
if svc.metadata.name == self._coordinator_service_name:
port = svc.spec.ports[0].node_port

if svc.status.load_balancer.ingress is not None:
ingress = svc.status.load_balancer.ingress[0]
if ingress.hostname is not None:
host = ingress.hostname
else:
host = ingress.ip
else:
selector = ""
for k, v in svc.spec.selector.items():
selector += k + "=" + v + ","
selector = selector[:-1]
# Always len(endpoints) >= 1
endpoints = get_service_endpoints(
api_client=self._api_client,
namespace=self._namespace,
name=self._coordinator_service_name,
type=self._service_type,
)

# get pod
pods = self._core_api.list_namespaced_pod(
self._namespace, label_selector=selector
)
host = pods.items[0].status.host_ip
return "{}:{}".format(host, port)
raise RuntimeError("Get coordinator endpoint failed.")
return endpoints[0]

def _dump_cluster_logs(self):
log_dict = dict()
Expand Down
Loading

0 comments on commit 69f401e

Please sign in to comment.