Skip to content

Commit

Permalink
support dangling handle for each session connection (#198)
Browse files Browse the repository at this point in the history
* support dangling handle for each session connection
  • Loading branch information
lidongze0629 authored Mar 23, 2021
1 parent b8cb7c4 commit 509b915
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 39 deletions.
42 changes: 30 additions & 12 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class CoordinatorServiceServicer(
"""

def __init__(self, launcher, dangling_seconds, log_level="INFO"):
def __init__(self, launcher, dangling_timeout_seconds, log_level="INFO"):
self._launcher = launcher

self._request = None
Expand Down Expand Up @@ -149,10 +149,15 @@ def __init__(self, launcher, dangling_seconds, log_level="INFO"):
self._streaming_logs = True

# dangling check
self._dangling_seconds = dangling_seconds
if self._dangling_seconds >= 0:
self._dangling_timeout_seconds = dangling_timeout_seconds
if self._dangling_timeout_seconds >= 0:
self._dangling_detecting_timer = threading.Timer(
interval=self._dangling_seconds, function=self._cleanup, args=(True,)
interval=self._dangling_timeout_seconds,
function=self._cleanup,
args=(
True,
True,
),
)
self._dangling_detecting_timer.start()

Expand Down Expand Up @@ -219,13 +224,21 @@ def ConnectSession(self, request, context):
)

def HeartBeat(self, request, context):
if self._dangling_seconds >= 0:
if self._request and self._request.dangling_timeout_seconds >= 0:
# Reset dangling detect timer
self._dangling_detecting_timer.cancel()
if self._dangling_detecting_timer:
self._dangling_detecting_timer.cancel()

self._dangling_detecting_timer = threading.Timer(
interval=self._dangling_seconds, function=self._cleanup, args=(True,)
interval=self._request.dangling_timeout_seconds,
function=self._cleanup,
args=(
self._request.cleanup_instance,
True,
),
)
self._dangling_detecting_timer.start()

# analytical engine
request = message_pb2.HeartBeatRequest()
try:
Expand Down Expand Up @@ -442,7 +455,9 @@ def CloseSession(self, request, context):
"Session handle does not match",
)

self._cleanup(stop_instance=request.stop_instance)
self._cleanup(
cleanup_instance=self._request.cleanup_instance, is_dangling=False
)
self._request = None

# Session closed, stop streaming logs
Expand Down Expand Up @@ -589,7 +604,7 @@ def _make_response(resp_cls, code, error_msg="", op=None, **args):
resp.status.op.CopyFrom(op)
return resp

def _cleanup(self, stop_instance=True, is_dangling=False):
def _cleanup(self, cleanup_instance=True, is_dangling=False):
# clean up session resources.
for key in self._object_manager.keys():
obj = self._object_manager.get(key)
Expand Down Expand Up @@ -625,12 +640,14 @@ def _cleanup(self, stop_instance=True, is_dangling=False):

self._object_manager.clear()

self._request = None

# cancel dangling detect timer
if self._dangling_detecting_timer:
self._dangling_detecting_timer.cancel()

# close engines
if stop_instance:
if cleanup_instance:
self._analytical_engine_stub = None
self._analytical_engine_endpoint = None
self._launcher.stop(is_dangling=is_dangling)
Expand Down Expand Up @@ -958,7 +975,7 @@ def launch_graphscope():

coordinator_service_servicer = CoordinatorServiceServicer(
launcher=launcher,
dangling_seconds=args.dangling_timeout_seconds,
dangling_timeout_seconds=args.dangling_timeout_seconds,
log_level=args.log_level,
)

Expand All @@ -974,7 +991,8 @@ def launch_graphscope():

# handle SIGTERM signal
def terminate(signum, frame):
del coordinator_service_servicer # noqa: F821
global coordinator_service_servicer
del coordinator_service_servicer

signal.signal(signal.SIGTERM, terminate)

Expand Down
3 changes: 2 additions & 1 deletion proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ message ResponseStatus {
////////////////////////////////////////////////////////////////////////////////

message ConnectSessionRequest {
bool cleanup_instance = 1;
int32 dangling_timeout_seconds = 2;
}

message ConnectSessionResponse {
Expand Down Expand Up @@ -129,7 +131,6 @@ message CloseSessionRequest {
// REQUIRED: session_id must be returned by a CreateSession call
// to the same master service.
string session_id = 1;
bool stop_instance = 2;
}

message CloseSessionResponse {
Expand Down
38 changes: 23 additions & 15 deletions python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ def waiting_service_ready(self, timeout_seconds=60):
msg = response.status.error_msg
raise ConnectionError("Connect coordinator timeout, {}".format(msg))

def connect(self):
return self._connect_session_impl()
def connect(self, cleanup_instance=True, dangling_timeout_seconds=60):
return self._connect_session_impl(
cleanup_instance=cleanup_instance,
dangling_timeout_seconds=dangling_timeout_seconds,
)

@property
def session_id(self):
Expand Down Expand Up @@ -188,21 +191,28 @@ def close_learning_engine(self, object_id):
response = self._stub.CloseLearningInstance(request)
return check_grpc_response(response)

def close(self, stop_instance=True):
"""
Args:
stop_instance (bool, optional): If true,
also delete graphscope instance (such as pod) in closing process.
"""
def close(self):
if self._session_id:
self._close_session_impl(stop_instance=stop_instance)
self._close_session_impl()
self._session_id = None
if self._logs_fetching_thread:
self._logs_fetching_thread.join(timeout=5)

@catch_grpc_error
def _connect_session_impl(self):
request = message_pb2.ConnectSessionRequest()
def _connect_session_impl(self, cleanup_instance=True, dangling_timeout_seconds=60):
"""
Args:
cleanup_instance (bool, optional): If True, also delete graphscope
instance (such as pod) in closing process.
dangling_timeout_seconds (int, optional): After seconds of client
disconnect, coordinator will kill this graphscope instance.
Disable dangling check by setting -1.
"""
request = message_pb2.ConnectSessionRequest(
cleanup_instance=cleanup_instance,
dangling_timeout_seconds=dangling_timeout_seconds,
)

response = self._stub.ConnectSession(request)
response = check_grpc_response(response)
Expand All @@ -228,10 +238,8 @@ def _fetch_logs_impl(self):
logger.info(message, extra={"simple": True})

@catch_grpc_error
def _close_session_impl(self, stop_instance=True):
request = message_pb2.CloseSessionRequest(
session_id=self._session_id, stop_instance=stop_instance
)
def _close_session_impl(self):
request = message_pb2.CloseSessionRequest(session_id=self._session_id)
response = self._stub.CloseSession(request)
return check_grpc_response(response)

Expand Down
12 changes: 8 additions & 4 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def __init__(
dangling_timeout_seconds (int, optional): After seconds of client disconnect,
coordinator will kill this graphscope instance. Defaults to 600.
Expect this value to be greater than 5 (heartbeat interval).
Disable dangling check by setting -1.
k8s_waiting_for_delete (bool, optional): Waiting for service delete or not. Defaults to False.
Expand Down Expand Up @@ -563,9 +564,7 @@ def close(self):
self._learning_instance_dict.clear()

if self._grpc_client:
self._grpc_client.close(
stop_instance=False if self._config_params["addr"] else True
)
self._grpc_client.close()
self._grpc_client = None
_session_dict.pop(self._session_id, None)

Expand Down Expand Up @@ -778,7 +777,12 @@ def _connect(self):
self._pod_name_list,
self._config_params["num_workers"],
self._config_params["k8s_namespace"],
) = self._grpc_client.connect()
) = self._grpc_client.connect(
cleanup_instance=not bool(self._config_params["addr"]),
dangling_timeout_seconds=self._config_params[
"dangling_timeout_seconds"
],
)
# fetch logs
if self._config_params["enable_k8s"]:
self._grpc_client.fetch_logs()
Expand Down
13 changes: 6 additions & 7 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,6 @@ def _from_graph_def(self, graph_def):
self._e_labels = self._schema.edge_labels
self._e_relationships = self._schema.edge_relationships

# create gremlin server pod asynchronously
if gs_config.initializing_interactive_engine:
self._interactive_instance_launching_thread = threading.Thread(
target=self._launch_interactive_instance_impl, args=()
)
self._interactive_instance_launching_thread.start()

def _ensure_loaded(self):
if self._key is not None and self._pending_op is None:
return
Expand All @@ -211,6 +204,12 @@ def _ensure_loaded(self):
self._unsealed_edges.clear()
# init saved_signature (must be after init schema)
self._saved_signature = self.signature
# create gremlin server pod asynchronously
if gs_config.initializing_interactive_engine:
self._interactive_instance_launching_thread = threading.Thread(
target=self._launch_interactive_instance_impl, args=()
)
self._interactive_instance_launching_thread.start()

@property
def key(self):
Expand Down

0 comments on commit 509b915

Please sign in to comment.