Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture KeyboardInterrupt during the session launching and cleanup re… #162

Merged
merged 4 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 74 additions & 40 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#

import base64
import copy
import json
import logging
import os
Expand Down Expand Up @@ -62,6 +63,66 @@
logger = logging.getLogger("graphscope")


class ResourceManager(object):
"""A class to manager kubernetes object.

Object managed by this class will dump meta info to disk file
for pod preStop lifecycle management.

meta info format:

{
"my-deployment": "Deployment",
"my-service": "Service"
}
"""

_resource_object_path = "/tmp/resource_object" # fixed

def __init__(self, api_client):
self._api_client = api_client
self._resource_object = []
self._meta_info = {}

def append(self, target):
self._resource_object.append(target)
self._meta_info.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
self.dump()

def extend(self, targets):
self._resource_object.extend(targets)
for target in targets:
self._meta_info.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
self.dump()

def clear(self):
self._resource_object.clear()
self._meta_info.clear()

def __str__(self):
return str(self._meta_info)

def __getitem__(self, index):
return self._resource_object[index]

def dump(self):
with open(self._resource_object_path, "w") as f:
json.dump(self._meta_info, f)

def dump_with_extra_resource(self, resource):
"""Also dump with extra resources. A typical scenario is
dump meta info of namespace for coordinator dangling processing.
"""
rlt = copy.deepcopy(self._meta_info)
rlt.update(resource)
with open(self._resource_object_path, "w") as f:
json.dump(rlt, f)


class KubernetesClusterLauncher(Launcher):
_gs_etcd_builder_cls = GSEtcdBuilder
_gs_engine_builder_cls = GSEngineBuilder
Expand Down Expand Up @@ -90,8 +151,6 @@ class KubernetesClusterLauncher(Launcher):

_vineyard_service_port = 9600 # fixed

_resource_object_path = "/tmp/resource_object" # fixed

def __init__(
self,
namespace=None,
Expand Down Expand Up @@ -156,7 +215,7 @@ def __init__(
self._coordinator_name = coordinator_name
self._coordinator_service_name = coordinator_service_name

self._resource_object = []
self._resource_object = ResourceManager(self._api_client)

# engine container info
self._gs_image = gs_image
Expand Down Expand Up @@ -238,7 +297,6 @@ def get_gie_graph_manager_service_name(self):

def _create_engine_replicaset(self):
logger.info("Launching GraphScope engines pod ...")
targets = []
labels = {"name": self._engine_name}
# create engine replicaset
engine_builder = self._gs_engine_builder_cls(
Expand Down Expand Up @@ -296,17 +354,15 @@ def _create_engine_replicaset(self):
)
for name in self._image_pull_secrets:
engine_builder.add_image_pull_secret(name)
targets.append(

self._resource_object.append(
self._app_api.create_namespaced_replica_set(
self._namespace, engine_builder.build()
)
)

self._resource_object.extend(targets)

def _create_etcd(self):
logger.info("Launching etcd ...")
targets = []
labels = {"name": self._etcd_name}
# should create service first
service_builder = ServiceBuilder(
Expand All @@ -315,7 +371,7 @@ def _create_etcd(self):
port=self._random_etcd_listen_client_service_port,
selector=labels,
)
targets.append(
self._resource_object.append(
self._core_api.create_namespaced_service(
self._namespace, service_builder.build()
)
Expand Down Expand Up @@ -343,31 +399,26 @@ def _create_etcd(self):
listen_peer_service_port=self._random_etcd_listen_peer_service_port,
listen_client_service_port=self._random_etcd_listen_client_service_port,
)
targets.append(
self._resource_object.append(
self._app_api.create_namespaced_deployment(
self._namespace, etcd_builder.build()
)
)

self._resource_object.extend(targets)

def _create_vineyard_service(self):
targets = []
labels = {"name": self._engine_name} # vineyard in engine pod
service_builder = ServiceBuilder(
self._vineyard_service_name,
service_type=self._service_type,
port=self._vineyard_service_port,
selector=labels,
)
targets.append(
self._resource_object.append(
self._core_api.create_namespaced_service(
self._namespace, service_builder.build()
)
)

self._resource_object.extend(targets)

def _get_vineyard_service_endpoint(self):
# Always len(endpoints) >= 1
endpoints = get_service_endpoints(
Expand Down Expand Up @@ -423,15 +474,14 @@ def _parse_graphlearn_service_endpoint(self, object_id):

def _create_interactive_engine_service(self):
logger.info("Launching GIE graph manager ...")
targets = []
labels = {"app": self._gie_graph_manager_name}
service_builder = ServiceBuilder(
name=self._gie_graph_manager_service_name,
service_type="ClusterIP",
port=self._interactive_engine_manager_port,
selector=labels,
)
targets.append(
self._resource_object.append(
self._core_api.create_namespaced_service(
self._namespace, service_builder.build()
)
Expand Down Expand Up @@ -475,14 +525,12 @@ def _create_interactive_engine_service(self):
port=self._zookeeper_port,
)

targets.append(
self._resource_object.append(
self._app_api.create_namespaced_deployment(
self._namespace, graph_manager_builder.build()
)
)

self._resource_object.extend(targets)

def _waiting_interactive_engine_service_ready(self):
start_time = time.time()
event_messages = []
Expand Down Expand Up @@ -623,30 +671,16 @@ def _waiting_for_services_ready(self):
self._vineyard_service_endpoint = self._get_vineyard_service_endpoint()
logger.info("GraphScope engines pod is ready.")

def _dump_cluster_logs(self):
log_dict = dict()
pod_items = self._core_api.list_namespaced_pod(self._namespace).to_dict()
for item in pod_items["items"]:
log_dict[item["metadata"]["name"]] = self._core_api.read_namespaced_pod_log(
name=item["metadata"]["name"], namespace=self._namespace
)
return log_dict

def _dump_resource_object(self):
rlt = {}
for target in self._resource_object:
rlt.update(
get_kubernetes_object_info(api_client=self._api_client, target=target)
)
resource = {}
if self._delete_namespace:
rlt[self._namespace] = "Namespace"
resource[self._namespace] = "Namespace"
else:
# coordinator info
rlt[self._coordinator_name] = "Deployment"
rlt[self._coordinator_service_name] = "Service"
resource[self._coordinator_name] = "Deployment"
resource[self._coordinator_service_name] = "Service"

with open(self._resource_object_path, "w") as f:
json.dump(rlt, f)
self._resource_object.dump_with_extra_resource(resource)

def _get_etcd_service_endpoint(self):
# Always len(endpoints) >= 1
Expand Down
2 changes: 2 additions & 0 deletions python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def __init__(self, endpoint):
def waiting_service_ready(self, timeout_seconds=60, enable_k8s=True):
begin_time = time.time()
request = message_pb2.HeartBeatRequest()
# Do not drop this line, which is for handling KeyboardInterrupt.
response = None
lidongze0629 marked this conversation as resolved.
Show resolved Hide resolved
while True:
try:
response = self._stub.HeartBeat(request)
Expand Down
4 changes: 3 additions & 1 deletion python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import graphscope
from graphscope.client.rpc import GRPCClient
from graphscope.client.utils import CaptureKeyboardInterrupt
from graphscope.client.utils import GSLogger
from graphscope.client.utils import set_defaults
from graphscope.config import GSConfig as gs_config
Expand Down Expand Up @@ -422,7 +423,8 @@ def __init__(

atexit.register(self.close)
# create and connect session
self._proc, self._endpoint = self._connect()
with CaptureKeyboardInterrupt(self.close):
self._proc, self._endpoint = self._connect()

self._disconnected = False

Expand Down
28 changes: 28 additions & 0 deletions python/graphscope/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,34 @@ def update():
GSLogger.init()


class CaptureKeyboardInterrupt(object):
"""Context Manager for capture keyboard interrupt

Args:
callback: function
Callback function when KeyboardInterrupt occurs.

Examples:
>>> with CaptureKeyboardInterrupt(callback):
>>> do_somethings()
"""

def __init__(self, callback=None):
self._callback = callback

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type is not None:
if self._callback:
try:
self._callback()
except: # noqa: E722
pass
return False


def set_defaults(defaults):
"""Decorator to update default params to the latest defaults value.

Expand Down