Skip to content

Commit

Permalink
Some fixes for running with helm deployed (#194)
Browse files Browse the repository at this point in the history
* Some Fixes for running GNN with helm deployed
* Stop streaming logs when session close
* Return namespace info from coordinator
  • Loading branch information
lidongze0629 authored Mar 19, 2021
1 parent b76b193 commit 7a8f3f6
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 24 deletions.
4 changes: 3 additions & 1 deletion charts/graphscope/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ GraphScope charts

```shell
$ helm repo add graphscope https://dl.bintray.com/graphscope/charts/
$ helm update
$ helm repo update
```
See [*helm repo*](https://helm.sh/docs/helm/helm_repo/) for command documentation.

Expand All @@ -28,6 +28,8 @@ See [*helm install*](https://helm.sh/docs/helm/helm_install/) for command docume

## Get GraphScope Service Endpoint

Note that it may take a few minutes for pulling image at first time, you can watch the status by running `helm test` many times.

```shell
# Helm 3 or 2
# After installation, you can check service availability by:
Expand Down
26 changes: 18 additions & 8 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self, launcher, dangling_seconds, log_level="INFO"):
)
else:
self._pods_list = [] # locally launched
self._k8s_namespace = ""

# analytical engine
self._analytical_engine_stub = self._create_grpc_stub()
Expand Down Expand Up @@ -202,13 +203,19 @@ def ConnectSession(self, request, context):
self._session_id = self._generate_session_id()
self._udf_app_workspace = os.path.join(WORKSPACE, self._session_id)

# Session connected, fetch logs via gRPC.
self._streaming_logs = True
sys.stdout.drop(False)

return self._make_response(
message_pb2.ConnectSessionResponse,
code=error_codes_pb2.OK,
session_id=self._session_id,
session_type=self._launcher.type(),
num_workers=self._launcher.num_workers,
engine_config=json.dumps(self._analytical_engine_config),
pod_name_list=self._pods_list,
namespace=self._k8s_namespace,
)

def HeartBeat(self, request, context):
Expand Down Expand Up @@ -417,9 +424,12 @@ def FetchLogs(self, request, context):
except queue.Empty:
pass
else:
yield self._make_response(
message_pb2.FetchLogsResponse, error_codes_pb2.OK, message=message
)
if self._streaming_logs:
yield self._make_response(
message_pb2.FetchLogsResponse,
error_codes_pb2.OK,
message=message,
)

def CloseSession(self, request, context):
"""
Expand All @@ -434,8 +444,11 @@ def CloseSession(self, request, context):

self._cleanup(stop_instance=request.stop_instance)
self._request = None
if request.stop_instance:
self._streaming_logs = False

# Session closed, stop streaming logs
sys.stdout.drop(True)
self._streaming_logs = False

return self._make_response(message_pb2.CloseSessionResponse, error_codes_pb2.OK)

def CreateInteractiveInstance(self, request, context):
Expand Down Expand Up @@ -942,9 +955,6 @@ def launch_graphscope():
log_level=args.log_level,
)

# after GraphScope ready, fetch logs via gRPC.
sys.stdout.drop(False)

# register gRPC server
server = grpc.server(futures.ThreadPoolExecutor(os.cpu_count() or 1))
coordinator_service_pb2_grpc.add_CoordinatorServiceServicer_to_server(
Expand Down
9 changes: 6 additions & 3 deletions proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package gs.rpc;
import "proto/error_codes.proto";
import "proto/graph_def.proto";
import "proto/op_def.proto";
import "proto/types.proto";

message ResponseStatus {
enum NullDetail { NULL_DETAIL = 0; }
Expand Down Expand Up @@ -49,9 +50,11 @@ message ConnectSessionResponse {
// The client must arrange to call CloseSession with this returned
// session handle to close the session.
string session_id = 2;
string engine_config = 3;
repeated string pod_name_list = 4;
int32 num_workers = 5;
SessionType session_type = 3;
string engine_config = 4;
repeated string pod_name_list = 5;
int32 num_workers = 6;
string namespace = 7;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
7 changes: 3 additions & 4 deletions python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, endpoint):
self._session_id = None
self._logs_fetching_thread = None

def waiting_service_ready(self, timeout_seconds=60, enable_k8s=True):
def waiting_service_ready(self, timeout_seconds=60):
begin_time = time.time()
request = message_pb2.HeartBeatRequest()
# Do not drop this line, which is for handling KeyboardInterrupt.
Expand All @@ -106,9 +106,6 @@ def waiting_service_ready(self, timeout_seconds=60, enable_k8s=True):
response = None
finally:
if response is not None:
# connnect to coordinator, fetch log
if enable_k8s:
self.fetch_logs()
if response.status.code == error_codes_pb2.OK:
logger.info("GraphScope coordinator service connected.")
break
Expand Down Expand Up @@ -213,9 +210,11 @@ def _connect_session_impl(self):
self._session_id = response.session_id
return (
response.session_id,
response.session_type,
json.loads(response.engine_config),
response.pod_name_list,
response.num_workers,
response.namespace,
)

@suppress_grpc_error
Expand Down
24 changes: 16 additions & 8 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,16 @@ def __init__(
if kw:
raise ValueError("Not recognized value: ", list(kw.keys()))

logger.info(
"Initializing graphscope session with parameters: %s", self._config_params
)
if self._config_params["addr"]:
logger.info(
"Connecting graphscope session with address: %s",
self._config_params["addr"],
)
else:
logger.info(
"Initializing graphscope session with parameters: %s",
self._config_params,
)

self._closed = False

Expand Down Expand Up @@ -691,7 +698,6 @@ def _parse_value(self, op, response: message_pb2.RunStepResponse):
def _connect(self):
if self._config_params["addr"] is not None:
# try connect to exist coordinator
self._session_type = types_pb2.REMOTE
proc, endpoint = None, self._config_params["addr"]
elif self._config_params["enable_k8s"]:
if (
Expand All @@ -703,7 +709,6 @@ def _connect(self):
**self._config_params["k8s_client_config"]
)
proc = None
self._session_type = types_pb2.K8S
self._k8s_cluster = KubernetesCluster(
api_client=api_client,
namespace=self._config_params["k8s_namespace"],
Expand Down Expand Up @@ -748,25 +753,28 @@ def _connect(self):
):
# lanuch coordinator with hosts
proc, endpoint = _launch_coordinator_on_local(self._config_params)
self._session_type = types_pb2.HOSTS
else:
raise RuntimeError("Session initialize failed.")

# waiting service ready
self._grpc_client = GRPCClient(endpoint)
self._grpc_client.waiting_service_ready(
timeout_seconds=self._config_params["timeout_seconds"],
enable_k8s=self._config_params["enable_k8s"],
)

# connect to rpc server
# connect and fetch logs from rpc server
try:
(
self._session_id,
self._session_type,
self._engine_config,
self._pod_name_list,
self._config_params["num_workers"],
self._config_params["k8s_namespace"],
) = self._grpc_client.connect()
# fetch logs
if self._config_params["enable_k8s"]:
self._grpc_client.fetch_logs()
_session_dict[self._session_id] = self
except Exception:
if proc is not None and proc.poll() is None:
Expand Down

0 comments on commit 7a8f3f6

Please sign in to comment.