Skip to content

Commit

Permalink
Optimize time of sess.gremlin and sess.close (#170)
Browse files Browse the repository at this point in the history
* Optimize time of sess.gremlin and sess.close
* Pass waiting for delete to InteravtiveQuery
* Launch gremlin server's pod implicitly when a property graph created.
  • Loading branch information
lidongze0629 authored Mar 2, 2021
1 parent 5980176 commit b3cf8f6
Show file tree
Hide file tree
Showing 18 changed files with 190 additions and 52 deletions.
3 changes: 3 additions & 0 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ def get_vineyard_rpc_endpoint(self):
def get_pods_list(self):
return self._pod_name_list

def waiting_for_delete(self):
return self._waiting_for_delete

def get_namespace(self):
return self._namespace

Expand Down
13 changes: 6 additions & 7 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,6 @@ def CloseSession(self, request, context):
)

def CreateInteractiveInstance(self, request, context):
logger.info(
"Coordinator create interactive instance with object id %ld",
request.object_id,
)
object_id = request.object_id
gremlin_server_cpu = request.gremlin_server_cpu
gremlin_server_mem = request.gremlin_server_mem
Expand Down Expand Up @@ -493,9 +489,12 @@ def CloseInteractiveInstance(self, request, context):
self._gie_graph_manager_service_name,
self._k8s_namespace,
)
close_url = (
"%s/instance/close?graphName=%ld&podNameList=%s&containerName=%s"
% (manager_host, object_id, pod_name_list, ENGINE_CONTAINER)
close_url = "%s/instance/close?graphName=%ld&podNameList=%s&containerName=%s&waitingForDelete=%s" % (
manager_host,
object_id,
pod_name_list,
ENGINE_CONTAINER,
str(self._launcher.waiting_for_delete()),
)
logger.info("Coordinator close interactive instance with url[%s]" % close_url)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ function _delete_maxgraph_instance {
parallel_run "$kill_cmd" "$pod_hosts" $ENGINE_CONTAINER
}
function _delete_pod {
kubectl delete -f /root/maxgraph/pod_${object_id}.yaml
kubectl delete -f /root/maxgraph/pod_${object_id}.yaml --wait=${waiting_for_delete}
kubectl delete configmap config-${object_id}
kubectl delete service gremlin-${object_id}
}
export object_id=$1
export pod_hosts=`echo $2 | awk -F"," '{for(i=1;i<=NF;++i) {print $i" "}}'`
export ENGINE_CONTAINER=$3
export waiting_for_delete=$4
_delete_maxgraph_instance
rm -rf /home/maxgraph/config_$object_id /root/maxgraph/pod_${object_id}.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ fi

cat $inner_config

sleep 10
#server_id=$RANDOM
flag="maxgraph"$object_id"executor"
RUST_BACKTRACE=full /home/maxgraph/executor --config $inner_config $flag $server_id 1>> logs_$object_id/maxgraph-executor.out 2>> logs_$object_id/maxgraph-executor.err &
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ public CreateInstanceEntity createInstanceByPath(@RequestParam("graphName") Stri
@RequestMapping("close")
public CloseInstanceEntity closeInstance(@RequestParam("graphName") String graphName,
@RequestParam("podNameList") String podNameList,
@RequestParam("containerName") String containerName) {
@RequestParam("containerName") String containerName,
@RequestParam("waitingForDelete") String waitingForDelete) {
int errorCode;
String errorMessage;
try {
Expand All @@ -283,6 +284,7 @@ public CloseInstanceEntity closeInstance(@RequestParam("graphName") String graph
closeCommandList.add(graphName);
closeCommandList.add(podNameList);
closeCommandList.add(containerName);
closeCommandList.add(waitingForDelete);
String command = StringUtils.join(closeCommandList, " ");
logger.info("start to close instance with command " + command);
Process process = Runtime.getRuntime().exec(command);
Expand Down
81 changes: 63 additions & 18 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
from graphscope.framework.errors import LearningEngineInternalError
from graphscope.framework.errors import check_argument
from graphscope.framework.operation import Operation
from graphscope.interactive.query import InteractiveQuery
from graphscope.interactive.query import InteractiveQueryStatus
from graphscope.proto import message_pb2
from graphscope.proto import op_def_pb2
from graphscope.proto import types_pb2
Expand Down Expand Up @@ -894,12 +896,27 @@ def group_property_types(props):
handle_json_string = json.dumps(handle)
return base64.b64encode(handle_json_string.encode("utf-8")).decode("utf-8")

@set_defaults(gs_config)
def gremlin(self, graph, engine_params=None):
"""Get a interactive engine handler to execute gremlin queries.
Note that this method will be executed implicitly when a property graph created
and cache a instance of InteractiveQuery in session if `initializing_interactive_engine`
is True. If you want to create a new instance under the same graph by different params,
you should close the instance first.
.. code:: python
>>> # close and recreate InteractiveQuery.
>>> interactive_query = sess.gremlin(g)
>>> interactive_query.close()
>>> interactive_query = sess.gremlin(g, engine_params={"xxx":"xxx"})
Args:
graph (:class:`Graph`): Use the graph to create interactive instance.
engine_params (dict, optional): Configure startup parameters of interactive engine.
You can also configure this param by `graphscope.set_option(engine_params={})`.
See a list of configurable keys in
`interactive_engine/deploy/docker/dockerfile/executor.vineyard.properties`
Expand All @@ -909,40 +926,64 @@ def gremlin(self, graph, engine_params=None):
Returns:
:class:`InteractiveQuery`
"""

# self._interactive_instance_dict[graph.vineyard_id] will be None if
# InteractiveQuery closed
if (
graph.vineyard_id in self._interactive_instance_dict
and self._interactive_instance_dict[graph.vineyard_id] is not None
):
return self._interactive_instance_dict[graph.vineyard_id]
interactive_query = self._interactive_instance_dict[graph.vineyard_id]
if interactive_query.status == InteractiveQueryStatus.Running:
return interactive_query
elif interactive_query.status == InteractiveQueryStatus.Failed:
raise InteractiveEngineInternalError(interactive_query.error_msg)
else:
# Initializing.
# while True is ok, as the status is either running or failed eventually after timeout.
while True:
time.sleep(1)
if interactive_query.status == InteractiveQueryStatus.Running:
return interactive_query
elif interactive_query.status == InteractiveQueryStatus.Failed:
raise InteractiveEngineInternalError(
interactive_query.error_msg
)

if not graph.loaded():
raise InvalidArgumentError("The graph has already been unloaded")
if not graph.graph_type == types_pb2.ARROW_PROPERTY:
raise InvalidArgumentError("The graph should be a property graph.")

interactive_query = InteractiveQuery(session=self, object_id=graph.vineyard_id)
self._interactive_instance_dict[graph.vineyard_id] = interactive_query

if engine_params is not None:
engine_params = {
str(key): str(value) for key, value in engine_params.items()
}
else:
engine_params = {}
from graphscope.interactive.query import InteractiveQuery

response = self._grpc_client.create_interactive_engine(
object_id=graph.vineyard_id,
schema_path=graph.schema_path,
gremlin_server_cpu=gs_config.k8s_gie_gremlin_server_cpu,
gremlin_server_mem=gs_config.k8s_gie_gremlin_server_mem,
engine_params=engine_params,
)
interactive_query = InteractiveQuery(
graphscope_session=self,
object_id=graph.vineyard_id,
front_ip=response.frontend_host,
front_port=response.frontend_port,
)
self._interactive_instance_dict[graph.vineyard_id] = interactive_query
graph.attach_interactive_instance(interactive_query)

try:
response = self._grpc_client.create_interactive_engine(
object_id=graph.vineyard_id,
schema_path=graph.schema_path,
gremlin_server_cpu=gs_config.k8s_gie_gremlin_server_cpu,
gremlin_server_mem=gs_config.k8s_gie_gremlin_server_mem,
engine_params=engine_params,
)
except Exception as e:
interactive_query.status = InteractiveQueryStatus.Failed
interactive_query.error_msg = str(e)
raise InteractiveEngineInternalError(str(e)) from e
else:
interactive_query.set_frontend(
front_ip=response.frontend_host, front_port=response.frontend_port
)
interactive_query.status = InteractiveQueryStatus.Running
graph.attach_interactive_instance(interactive_query)

return interactive_query

def learning(self, graph, nodes=None, edges=None, gen_labels=None):
Expand Down Expand Up @@ -1093,6 +1134,8 @@ def set_option(**kwargs):
- k8s_engine_cpu
- k8s_engine_mem
- k8s_waiting_for_delete
- engine_params
- initializing_interactive_engine
- timeout_seconds
Args:
Expand Down Expand Up @@ -1138,6 +1181,8 @@ def get_option(key):
- k8s_engine_cpu
- k8s_engine_mem
- k8s_waiting_for_delete
- engine_params
- initializing_interactive_engine
- timeout_seconds
Args:
Expand Down
8 changes: 8 additions & 0 deletions python/graphscope/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,12 @@ class GSConfig(object):
show_log = False
log_level = "INFO"

# GIE engine params
engine_params = None

# GIE instance will be created automatically when a property graph loaded.
# Otherwise, you should create a GIE instance manually by `sess.gremlin` if
# `initializing_interactive_engine` is False
initializing_interactive_engine = True

timeout_seconds = 600
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

import graphscope

graphscope.set_option(show_log=True)


@pytest.fixture(scope="session")
def graphscope_session():
graphscope.set_option(show_log=True)
graphscope.set_option(initializing_interactive_engine=False)

sess = graphscope.session(run_on_local=True, num_workers=1)

sess.as_default()
Expand Down
5 changes: 3 additions & 2 deletions python/graphscope/experimental/nx/tests/classes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

import graphscope

graphscope.set_option(show_log=True)


@pytest.fixture(scope="session")
def graphscope_session():
graphscope.set_option(show_log=True)
graphscope.set_option(initializing_interactive_engine=False)

sess = graphscope.session(run_on_local=True, num_workers=1)
sess.as_default()
yield sess
Expand Down
5 changes: 3 additions & 2 deletions python/graphscope/experimental/nx/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

import graphscope

graphscope.set_option(show_log=True)


@pytest.fixture(scope="session")
def graphscope_session():
graphscope.set_option(show_log=True)
graphscope.set_option(initializing_interactive_engine=False)

sess = graphscope.session(run_on_local=True)

sess.as_default()
Expand Down
5 changes: 3 additions & 2 deletions python/graphscope/experimental/nx/tests/test_nx.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
from graphscope.framework.loader import Loader
from graphscope.proto import types_pb2

graphscope.set_option(show_log=True)


@pytest.fixture(scope="session")
def graphscope_session():
graphscope.set_option(show_log=True)
graphscope.set_option(initializing_interactive_engine=False)

sess = graphscope.session(run_on_local=True, num_workers=1)
sess.as_default()
yield sess
Expand Down
24 changes: 24 additions & 0 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import hashlib
import json
import logging
import threading
from typing import Mapping

import vineyard

from graphscope.client.session import get_session_by_id
from graphscope.config import GSConfig as gs_config
from graphscope.framework.dag_utils import add_column
from graphscope.framework.dag_utils import copy_graph
from graphscope.framework.dag_utils import create_graph
Expand Down Expand Up @@ -112,6 +114,7 @@ def __init__(self, session_id, incoming_data=None):
self._session_id = session_id
self._detached = False

self._interactive_instance_launching_thread = None
self._interactive_instance_list = []
self._learning_instance_list = []

Expand Down Expand Up @@ -140,6 +143,13 @@ def __init__(self, session_id, incoming_data=None):
# init saved_signature (must be after init schema)
self._saved_signature = self.signature

# create gremlin server pod asynchronously
if gs_config.initializing_interactive_engine:
self._interactive_instance_launching_thread = threading.Thread(
target=self._launch_interactive_instance_impl, args=()
)
self._interactive_instance_launching_thread.start()

def __del__(self):
# cleanly ignore all exceptions, cause session may already closed / destroyed.
try:
Expand All @@ -159,6 +169,15 @@ def _close_learning_instances(self):
instance.close()
self._learning_instance_list.clear()

def _launch_interactive_instance_impl(self):
try:
sess = get_session_by_id(self.session_id)
sess.gremlin(self)
except: # noqa: E722
# Record error msg in `InteractiveQuery` when launching failed.
# Unexpect and suppress all exceptions here.
pass

@property
def op(self):
"""The DAG op of this graph."""
Expand Down Expand Up @@ -265,6 +284,11 @@ def unload(self):
raise RuntimeError("The graph is not registered in remote.")
# close interactive instances first
try:
if (
self._interactive_instance_launching_thread is not None
and self._interactive_instance_launching_thread.is_alive()
):
self._interactive_instance_launching_thread.join()
self._close_interactive_instances()
except Exception as e:
logger.error("Failed to close interactive instances: %s" % e)
Expand Down
3 changes: 2 additions & 1 deletion python/graphscope/framework/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import numpy as np
import pandas as pd

from graphscope.client.session import get_default_session
from graphscope.framework import utils
from graphscope.framework.errors import check_argument
from graphscope.proto import attr_value_pb2
Expand Down Expand Up @@ -276,6 +275,8 @@ def func(source, storage_options, read_options, sess):
self.preprocessor = func

def finish(self):
from graphscope.client.session import get_default_session

if self.finished:
return
if self.preprocessor is not None:
Expand Down
2 changes: 2 additions & 0 deletions python/graphscope/interactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

# The gremlinpython has a async event loop, which may conflicts with
# jupyter notebook's event loop.

import nest_asyncio

nest_asyncio.apply()

from graphscope.interactive.query import InteractiveQuery
from graphscope.interactive.query import InteractiveQueryStatus
Loading

0 comments on commit b3cf8f6

Please sign in to comment.