Skip to content

Commit

Permalink
Capture KeyboardInterrupt during the session launching and cleanup re… (
Browse files Browse the repository at this point in the history
#162)

* Capture KeyboardInterrupt and exceptions during the session launching and cleanup resource
  • Loading branch information
lidongze0629 authored Feb 24, 2021
1 parent ef3df2c commit f643d9f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 41 deletions.
114 changes: 74 additions & 40 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#

import base64
import copy
import json
import logging
import os
Expand Down Expand Up @@ -62,6 +63,66 @@
logger = logging.getLogger("graphscope")


class ResourceManager(object):
"""A class to manager kubernetes object.
Object managed by this class will dump meta info to disk file
for pod preStop lifecycle management.
meta info format:
{
"my-deployment": "Deployment",
"my-service": "Service"
}
"""

_resource_object_path = "/tmp/resource_object" # fixed

def __init__(self, api_client):
self._api_client = api_client
self._resource_object = []
self._meta_info = {}

def append(self, target):
self._resource_object.append(target)
self._meta_info.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
self.dump()

def extend(self, targets):
self._resource_object.extend(targets)
for target in targets:
self._meta_info.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
self.dump()

def clear(self):
self._resource_object.clear()
self._meta_info.clear()

def __str__(self):
return str(self._meta_info)

def __getitem__(self, index):
return self._resource_object[index]

def dump(self):
with open(self._resource_object_path, "w") as f:
json.dump(self._meta_info, f)

def dump_with_extra_resource(self, resource):
"""Also dump with extra resources. A typical scenario is
dump meta info of namespace for coordinator dangling processing.
"""
rlt = copy.deepcopy(self._meta_info)
rlt.update(resource)
with open(self._resource_object_path, "w") as f:
json.dump(rlt, f)


class KubernetesClusterLauncher(Launcher):
_gs_etcd_builder_cls = GSEtcdBuilder
_gs_engine_builder_cls = GSEngineBuilder
Expand Down Expand Up @@ -90,8 +151,6 @@ class KubernetesClusterLauncher(Launcher):

_vineyard_service_port = 9600 # fixed

_resource_object_path = "/tmp/resource_object" # fixed

def __init__(
self,
namespace=None,
Expand Down Expand Up @@ -156,7 +215,7 @@ def __init__(
self._coordinator_name = coordinator_name
self._coordinator_service_name = coordinator_service_name

self._resource_object = []
self._resource_object = ResourceManager(self._api_client)

# engine container info
self._gs_image = gs_image
Expand Down Expand Up @@ -238,7 +297,6 @@ def get_gie_graph_manager_service_name(self):

def _create_engine_replicaset(self):
logger.info("Launching GraphScope engines pod ...")
targets = []
labels = {"name": self._engine_name}
# create engine replicaset
engine_builder = self._gs_engine_builder_cls(
Expand Down Expand Up @@ -296,17 +354,15 @@ def _create_engine_replicaset(self):
)
for name in self._image_pull_secrets:
engine_builder.add_image_pull_secret(name)
targets.append(

self._resource_object.append(
self._app_api.create_namespaced_replica_set(
self._namespace, engine_builder.build()
)
)

self._resource_object.extend(targets)

def _create_etcd(self):
logger.info("Launching etcd ...")
targets = []
labels = {"name": self._etcd_name}
# should create service first
service_builder = ServiceBuilder(
Expand All @@ -315,7 +371,7 @@ def _create_etcd(self):
port=self._random_etcd_listen_client_service_port,
selector=labels,
)
targets.append(
self._resource_object.append(
self._core_api.create_namespaced_service(
self._namespace, service_builder.build()
)
Expand Down Expand Up @@ -343,31 +399,26 @@ def _create_etcd(self):
listen_peer_service_port=self._random_etcd_listen_peer_service_port,
listen_client_service_port=self._random_etcd_listen_client_service_port,
)
targets.append(
self._resource_object.append(
self._app_api.create_namespaced_deployment(
self._namespace, etcd_builder.build()
)
)

self._resource_object.extend(targets)

def _create_vineyard_service(self):
targets = []
labels = {"name": self._engine_name} # vineyard in engine pod
service_builder = ServiceBuilder(
self._vineyard_service_name,
service_type=self._service_type,
port=self._vineyard_service_port,
selector=labels,
)
targets.append(
self._resource_object.append(
self._core_api.create_namespaced_service(
self._namespace, service_builder.build()
)
)

self._resource_object.extend(targets)

def _get_vineyard_service_endpoint(self):
# Always len(endpoints) >= 1
endpoints = get_service_endpoints(
Expand Down Expand Up @@ -423,15 +474,14 @@ def _parse_graphlearn_service_endpoint(self, object_id):

def _create_interactive_engine_service(self):
logger.info("Launching GIE graph manager ...")
targets = []
labels = {"app": self._gie_graph_manager_name}
service_builder = ServiceBuilder(
name=self._gie_graph_manager_service_name,
service_type="ClusterIP",
port=self._interactive_engine_manager_port,
selector=labels,
)
targets.append(
self._resource_object.append(
self._core_api.create_namespaced_service(
self._namespace, service_builder.build()
)
Expand Down Expand Up @@ -475,14 +525,12 @@ def _create_interactive_engine_service(self):
port=self._zookeeper_port,
)

targets.append(
self._resource_object.append(
self._app_api.create_namespaced_deployment(
self._namespace, graph_manager_builder.build()
)
)

self._resource_object.extend(targets)

def _waiting_interactive_engine_service_ready(self):
start_time = time.time()
event_messages = []
Expand Down Expand Up @@ -623,30 +671,16 @@ def _waiting_for_services_ready(self):
self._vineyard_service_endpoint = self._get_vineyard_service_endpoint()
logger.info("GraphScope engines pod is ready.")

def _dump_cluster_logs(self):
log_dict = dict()
pod_items = self._core_api.list_namespaced_pod(self._namespace).to_dict()
for item in pod_items["items"]:
log_dict[item["metadata"]["name"]] = self._core_api.read_namespaced_pod_log(
name=item["metadata"]["name"], namespace=self._namespace
)
return log_dict

def _dump_resource_object(self):
rlt = {}
for target in self._resource_object:
rlt.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
resource = {}
if self._delete_namespace:
rlt[self._namespace] = "Namespace"
resource[self._namespace] = "Namespace"
else:
# coordinator info
rlt[self._coordinator_name] = "Deployment"
rlt[self._coordinator_service_name] = "Service"
resource[self._coordinator_name] = "Deployment"
resource[self._coordinator_service_name] = "Service"

with open(self._resource_object_path, "w") as f:
json.dump(rlt, f)
self._resource_object.dump_with_extra_resource(resource)

def _get_etcd_service_endpoint(self):
# Always len(endpoints) >= 1
Expand Down
2 changes: 2 additions & 0 deletions python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def __init__(self, endpoint):
def waiting_service_ready(self, timeout_seconds=60, enable_k8s=True):
begin_time = time.time()
request = message_pb2.HeartBeatRequest()
# Do not drop this line, which is for handling KeyboardInterrupt.
response = None
while True:
try:
response = self._stub.HeartBeat(request)
Expand Down
4 changes: 3 additions & 1 deletion python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import graphscope
from graphscope.client.rpc import GRPCClient
from graphscope.client.utils import CaptureKeyboardInterrupt
from graphscope.client.utils import GSLogger
from graphscope.client.utils import set_defaults
from graphscope.config import GSConfig as gs_config
Expand Down Expand Up @@ -422,7 +423,8 @@ def __init__(

atexit.register(self.close)
# create and connect session
self._proc, self._endpoint = self._connect()
with CaptureKeyboardInterrupt(self.close):
self._proc, self._endpoint = self._connect()

self._disconnected = False

Expand Down
28 changes: 28 additions & 0 deletions python/graphscope/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,34 @@ def update():
GSLogger.init()


class CaptureKeyboardInterrupt(object):
"""Context Manager for capture keyboard interrupt
Args:
callback: function
Callback function when KeyboardInterrupt occurs.
Examples:
>>> with CaptureKeyboardInterrupt(callback):
>>> do_somethings()
"""

def __init__(self, callback=None):
self._callback = callback

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type is not None:
if self._callback:
try:
self._callback()
except: # noqa: E722
pass
return False


def set_defaults(defaults):
"""Decorator to update default params to the latest defaults value.
Expand Down

0 comments on commit f643d9f

Please sign in to comment.