Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move session params show_log and log_level as a global configuration. #42

Merged
merged 5 commits into from
Dec 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def get_gie_graph_manager_service_name(self):
return self._gie_graph_manager_service_name

def _create_engine_replicaset(self):
logger.info("Launching GraphScope engines pod ...")
targets = []
labels = {"name": self._engine_name}
# create engine replicaset
Expand Down Expand Up @@ -283,6 +284,7 @@ def _create_engine_replicaset(self):
self._resource_object.extend(targets)

def _create_etcd(self):
logger.info("Launching etcd ...")
targets = []
labels = {"name": self._etcd_name}
# should create service first
Expand Down Expand Up @@ -397,6 +399,7 @@ def _parse_graphlearn_service_endpoint(self, object_id):
raise RuntimeError("Get graphlearn service endpoint failed.")

def _create_interactive_engine_service(self):
logger.info("Launching GIE graph manager ...")
targets = []
labels = {"app": self._gie_graph_manager_name}
service_builder = ServiceBuilder(
Expand Down Expand Up @@ -500,7 +503,7 @@ def _waiting_interactive_engine_service_ready(self):
):
raise TimeoutError("Waiting GIE graph manager start timeout.")
time.sleep(2)
logger.info("Interactive engine graph manager service is ready.")
logger.info("GIE graph manager service is ready.")

def _create_services(self):
# create interactive engine service
Expand Down Expand Up @@ -568,9 +571,8 @@ def _waiting_for_services_ready(self):
self._timeout_seconds
and self._timeout_seconds + start_time < time.time()
):
raise TimeoutError("Engine launching timeout.")
raise TimeoutError("GraphScope Engines launching timeout.")
time.sleep(2)
logger.info("Analytical engine service is ready.")

self._pod_name_list = []
self._pod_ip_list = []
Expand All @@ -591,6 +593,7 @@ def _waiting_for_services_ready(self):

# get vineyard service endpoint
self._vineyard_service_endpoint = self._get_vineyard_service_endpoint()
logger.info("GraphScope engines pod is ready.")

def _dump_cluster_logs(self):
log_dict = dict()
Expand Down Expand Up @@ -627,7 +630,13 @@ def _get_etcd_service_endpoint(self):
)
return endpoints[0]

def _launch_engine_locally(self):
def _launch_analytical_engine_locally(self):
logger.info(
"Starting GAE rpc service on {} ...".format(
str(self._analytical_engine_endpoint)
)
)

# generate and distribute hostfile
with open("/tmp/kube_hosts", "w") as f:
for i in range(len(self._pod_ip_list)):
Expand Down Expand Up @@ -716,18 +725,13 @@ def start(self):
self._create_services()
self._waiting_for_services_ready()
self._dump_resource_object()
logger.info("Engine pod name list: {}".format(self._pod_name_list))
logger.info("Engine pod ip list: {}".format(self._pod_ip_list))
logger.info("Engine pod host ip list: {}".format(self._pod_host_ip_list))
logger.info(
"Analytical engine service endpoint: {}".format(
self._analytical_engine_endpoint
)
)
logger.info("Engines pod name list: {}".format(self._pod_name_list))
logger.info("Engines pod ip list: {}".format(self._pod_ip_list))
logger.info("Engines pod host ip list: {}".format(self._pod_host_ip_list))
logger.info(
"Vineyard service endpoint: {}".format(self._vineyard_service_endpoint)
)
self._launch_engine_locally()
self._launch_analytical_engine_locally()
except Exception as e:
time.sleep(1)
logger.error(
Expand Down
7 changes: 5 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class CoordinatorServiceServicer(

"""

def __init__(self, launcher, dangling_seconds, log_level="info"):
def __init__(self, launcher, dangling_seconds, log_level="INFO"):
self._launcher = launcher

self._request = None
Expand Down Expand Up @@ -825,13 +825,16 @@ def launch_graphscope():
log_level=args.log_level,
)

# after GraphScope ready, fetch logs via gRPC.
sys.stdout.drop(False)

# register gRPC server
server = grpc.server(futures.ThreadPoolExecutor(os.cpu_count() or 1))
coordinator_service_pb2_grpc.add_CoordinatorServiceServicer_to_server(
coordinator_service_servicer, server
)
server.add_insecure_port("0.0.0.0:{}".format(args.port))
logger.info("server listen at 0.0.0.0:%d", args.port)
logger.info("Coordinator server listen at 0.0.0.0:%d", args.port)

server.start()
try:
Expand Down
9 changes: 7 additions & 2 deletions coordinator/gscoordinator/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@


class StdoutWrapper(object):
def __init__(self, stdout, queue=None):
def __init__(self, stdout, queue=None, drop=True):
self._stdout_backup = stdout
if queue is None:
self._lines = Queue()
else:
self._lines = queue
self._drop = drop

@property
def stdout(self):
return self._stdout_backup

def drop(self, drop=True):
self._drop = drop

def write(self, line):
line = line.encode("ascii", "ignore").decode("ascii")
self._stdout_backup.write(line)
self._lines.put(line)
if not self._drop:
self._lines.put(line)

def flush(self):
self._stdout_backup.flush()
Expand Down
34 changes: 18 additions & 16 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,29 @@ def compile_app(workspace: str, app_name: str, attr, engine_config: dict):
f.write(content)

# compile
logger.info("Building app ...")
cmake_process = subprocess.Popen(
cmake_commands,
env=os.environ.copy(),
universal_newlines=True,
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
cmake_stdout_watcher = PipeWatcher(cmake_process.stdout, sys.stdout)
setattr(cmake_process, "stdout_watcher", cmake_stdout_watcher)
cmake_stderr_watcher = PipeWatcher(cmake_process.stderr, sys.stdout)
setattr(cmake_process, "stderr_watcher", cmake_stderr_watcher)
cmake_process.wait()

make_process = subprocess.Popen(
["make", "-j4"],
env=os.environ.copy(),
universal_newlines=True,
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
make_stdout_watcher = PipeWatcher(make_process.stdout, sys.stdout)
setattr(make_process, "stdout_watcher", make_stdout_watcher)
make_stderr_watcher = PipeWatcher(make_process.stderr, sys.stdout)
setattr(make_process, "stderr_watcher", make_stderr_watcher)
make_process.wait()

return get_lib_path(app_dir, app_name)
Expand Down Expand Up @@ -293,28 +294,29 @@ def compile_graph_frame(
f.write(content)

# compile
logger.info("Building graph library ...")
cmake_process = subprocess.Popen(
cmake_commands,
env=os.environ.copy(),
universal_newlines=True,
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
cmake_stdout_watcher = PipeWatcher(cmake_process.stdout, sys.stdout)
setattr(cmake_process, "stdout_watcher", cmake_stdout_watcher)
cmake_stderr_watcher = PipeWatcher(cmake_process.stderr, sys.stdout)
setattr(cmake_process, "stderr_watcher", cmake_stderr_watcher)
cmake_process.wait()

make_process = subprocess.Popen(
["make", "-j4"],
env=os.environ.copy(),
universal_newlines=True,
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
make_stdout_watcher = PipeWatcher(make_process.stdout, sys.stdout)
setattr(make_process, "stdout_watcher", make_stdout_watcher)
make_stderr_watcher = PipeWatcher(make_process.stderr, sys.stdout)
setattr(make_process, "stderr_watcher", make_stderr_watcher)
make_process.wait()

return get_lib_path(frame_dir, frame_name)
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ Session Functions

graphscope.session
graphscope.get_default_session
graphscope.set_option
graphscope.get_option
1 change: 0 additions & 1 deletion k8s/kube_ssh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/sh
set -x
POD_NAME=$1
shift
kubectl exec ${POD_NAME} -c engine -- /bin/sh -c "cat /etc/hosts_of_nodes >> /etc/hosts && $*"
2 changes: 2 additions & 0 deletions python/graphscope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from graphscope.analytical.udf.types import Vertex
from graphscope.client.session import Session
from graphscope.client.session import get_default_session
from graphscope.client.session import get_option
from graphscope.client.session import session
from graphscope.client.session import set_option
from graphscope.framework.errors import *
from graphscope.framework.graph import Graph
from graphscope.framework.graph_utils import g
Expand Down
11 changes: 5 additions & 6 deletions python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import grpc

from graphscope.config import GSConfig as gs_config
from graphscope.framework.errors import ConnectionError
from graphscope.framework.errors import GRPCError
from graphscope.framework.errors import check_grpc_response
Expand Down Expand Up @@ -92,9 +93,7 @@ def __init__(self, endpoint):
self._session_id = None
self._logs_fetching_thread = None

def waiting_service_ready(
self, timeout_seconds=60, enable_k8s=False, show_log=False
):
def waiting_service_ready(self, timeout_seconds=60, enable_k8s=True):
begin_time = time.time()
request = message_pb2.HeartBeatRequest()
while True:
Expand All @@ -106,10 +105,10 @@ def waiting_service_ready(
finally:
if response is not None:
# connnect to coordinator, fetch log
if enable_k8s and show_log:
if enable_k8s:
self.fetch_logs()
if response.status.code == error_codes_pb2.OK:
logger.info("GraphScope coordinator service ready.")
logger.info("GraphScope coordinator service connected.")
break
time.sleep(1)
if time.time() - begin_time >= timeout_seconds:
Expand Down Expand Up @@ -208,7 +207,7 @@ def _fetch_logs_impl(self):
resp = check_grpc_response(resp)
message = resp.message.rstrip()
if message:
print(message, end="\n", file=sys.stdout, flush=True)
logger.info(message, extra={"simple": True})

@catch_grpc_error
def _close_session_impl(self):
Expand Down
Loading