Skip to content

Commit

Permalink
Connect to frontend directly, bypass coordinator for GIE query (#2923)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Jun 26, 2023
1 parent 96d5093 commit 25496f8
Show file tree
Hide file tree
Showing 20 changed files with 416 additions and 523 deletions.
10 changes: 4 additions & 6 deletions .github/workflows/local-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -298,18 +298,16 @@ jobs:
# install java
sudo apt update -y && sudo apt install openjdk-11-jdk -y
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: false

- name: Run Minimum Test
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
run: |
git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR}
python3 -m pytest -s -v $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/tests/minitest
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: false

- name: Upload GIE log
if: failure()
uses: actions/upload-artifact@v3
Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from gscoordinator.dag_manager import GSEngine
from gscoordinator.kubernetes_launcher import KubernetesClusterLauncher
from gscoordinator.monitor import Monitor
from gscoordinator.object_manager import InteractiveQueryManager
from gscoordinator.object_manager import InteractiveInstanceManager
from gscoordinator.object_manager import LearningInstanceManager
from gscoordinator.object_manager import ObjectManager
from gscoordinator.op_executor import OperationExecutor
Expand Down Expand Up @@ -454,7 +454,7 @@ def _match_frontend_endpoint(pattern, lines):
proc = self._launcher.create_interactive_instance(
object_id, schema_path, params
)
gie_manager = InteractiveQueryManager(object_id)
gie_manager = InteractiveInstanceManager(object_id)
# Put it to object_manager to ensure it could be killed during coordinator cleanup
# If coordinator is shutdown by force when creating interactive instance
self._object_manager.put(object_id, gie_manager)
Expand Down
2 changes: 0 additions & 2 deletions coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class DAGManager(object):

_interactive_engine_split_op = [
types_pb2.SUBGRAPH,
types_pb2.GREMLIN_QUERY,
types_pb2.FETCH_GREMLIN_RESULT,
]

_learning_engine_split_op = []
Expand Down
2 changes: 0 additions & 2 deletions coordinator/gscoordinator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
22: "INDUCE_SUBGRAPH",
23: "UNLOAD_CONTEXT",
32: "SUBGRAPH",
33: "GREMLIN_QUERY",
34: "FETCH_GREMLIN_RESULT",
46: "DATA_SOURCE",
47: "DATA_SINK",
50: "CONTEXT_TO_NUMPY",
Expand Down
15 changes: 3 additions & 12 deletions coordinator/gscoordinator/object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ def __init__(self, key, object_id, graph_def, schema_path=None):
self.schema_path = schema_path


class InteractiveQueryManager(object):
def __init__(self, object_id, endpoint=None):
class InteractiveInstanceManager(object):
def __init__(self, object_id):
self.type = "gie_manager"
# graph object id in vineyard
self.object_id = object_id
self.endpoint = endpoint
self.endpoint = None
self.client = None

def set_endpoint(self, endpoint):
Expand All @@ -51,7 +50,6 @@ def __del__(self):
try:
self.client.close()
except Exception:
# TODO(siyuan): throws no event loop exception with tornado 5.1.1
pass

def submit(self, message, bindings=None, request_options=None):
Expand All @@ -62,13 +60,6 @@ def submit(self, message, bindings=None, request_options=None):
return self.client.submit(message, bindings, request_options)


class GremlinResultSet(object):
def __init__(self, key, result_set):
self.key = key
self.type = "result_set"
self.result_set = result_set


class LearningInstanceManager(object):
def __init__(self, object_id):
self.type = "gle_manager"
Expand Down
40 changes: 1 addition & 39 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from gscoordinator.monitor import Monitor
from gscoordinator.object_manager import GraphMeta
from gscoordinator.object_manager import GremlinResultSet
from gscoordinator.object_manager import LibMeta
from gscoordinator.utils import ANALYTICAL_BUILTIN_SPACE
from gscoordinator.utils import ANALYTICAL_ENGINE_JAVA_INIT_CLASS_PATH
Expand Down Expand Up @@ -441,11 +440,7 @@ def run_on_interactive_engine(self, dag_def: op_def_pb2.DagDef):
for op in dag_def.op:
self._key_to_op[op.key] = op
op_pre_process(op, self._op_result_pool, self._key_to_op)
if op.op == types_pb2.GREMLIN_QUERY:
op_result = self._execute_gremlin_query(op)
elif op.op == types_pb2.FETCH_GREMLIN_RESULT:
op_result = self._fetch_gremlin_result(op)
elif op.op == types_pb2.SUBGRAPH:
if op.op == types_pb2.SUBGRAPH:
op_result = self._gremlin_to_subgraph(op)
else:
raise RuntimeError("Unsupported op type: " + str(op.op))
Expand All @@ -454,39 +449,6 @@ def run_on_interactive_engine(self, dag_def: op_def_pb2.DagDef):
self._op_result_pool[op.key] = op_result
return message_pb2.RunStepResponse(head=response_head), []

def _execute_gremlin_query(self, op: op_def_pb2.OpDef):
logger.debug("execute gremlin query")
message = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
request_options = None
if types_pb2.GIE_GREMLIN_REQUEST_OPTIONS in op.attr:
request_options = json.loads(
op.attr[types_pb2.GIE_GREMLIN_REQUEST_OPTIONS].s.decode()
)
object_id = op.attr[types_pb2.VINEYARD_ID].i
gremlin_client = self._object_manager.get(object_id)
rlt = gremlin_client.submit(message, request_options=request_options)
logger.debug("put %s, client %s", op.key, gremlin_client)
self._object_manager.put(op.key, GremlinResultSet(op.key, rlt))
return op_def_pb2.OpResult(code=OK, key=op.key)

def _fetch_gremlin_result(self, op: op_def_pb2.OpDef):
fetch_result_type = op.attr[types_pb2.GIE_GREMLIN_FETCH_RESULT_TYPE].s.decode()
key_of_parent_op = op.parents[0]
result_set = self._object_manager.get(key_of_parent_op).result_set
if fetch_result_type == "one":
rlt = result_set.one()
elif fetch_result_type == "all":
rlt = result_set.all().result()
else:
raise RuntimeError("Not supported fetch result type: " + fetch_result_type)
# Large data should be fetched use gremlin pagination
# meta = op_def_pb2.OpResult.Meta(has_large_result=True)
return op_def_pb2.OpResult(
code=OK,
key=op.key,
result=pickle.dumps(rlt),
)

def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef):
gremlin_script = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
oid_type = op.attr[types_pb2.OID_TYPE].s.decode()
Expand Down
15 changes: 11 additions & 4 deletions docs/interactive_engine/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,33 @@ graph = load_modern_graph()
g = gs.gremlin(graph)
# then `execute` any supported gremlin query.
q1 = g.execute('g.V().count()')
print(q1.all()) # should print [6]
print(q1.all().result()) # should print [6]

q2 = g.execute('g.V().hasLabel(\'person\')')
print(q2.all()) # should print [[v[2], v[3], v[0], v[1]]]
print(q2.all().result()) # should print [[v[2], v[3], v[0], v[1]]]
```

You may see something like:
```Shell
...
... [INFO][coordinator:453]: Built interactive frontend xxx.xxx.xxx.xxx:pppp for graph xxx
... [INFO][op_executor:455]: execute gremlin query
[6]
...
... [INFO][op_executor:455]: execute gremlin query
[v[2], v[3], v[0], v[1]]
...
```
The number 6 is printed, which is the number of vertices in modern graph.
### Retrieve the gremlin client
The `g` returned by `gs.gremlin()` is a wrapper around `Client` of `gremlinpython`, you could get the `Client` by
```python
client = g.gremlin_client
print(client.submit('g.V()').all().result())
```
### Customize Configurations for GIE instance
You could pass additional key-value pairs to customize the startup configuration of GIE, for example:
Expand Down
Loading

0 comments on commit 25496f8

Please sign in to comment.