Skip to content

Commit

Permalink
Support cypher query in python (#2952)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #2736

---------

Co-authored-by: Xiaoli Zhou <yihe.zxl@alibaba-inc.com>
  • Loading branch information
andydiwenzhu and shirly121 authored Jul 6, 2023
1 parent 6ed05f0 commit 209ff2e
Showing 18 changed files with 330 additions and 127 deletions.
7 changes: 5 additions & 2 deletions coordinator/gscoordinator/cluster_builder.py
Original file line number Diff line number Diff line change
@@ -577,10 +577,13 @@ def get_interactive_frontend_deployment(self, replicas=1):
self._namespace, name, deployment_spec, self._frontend_labels
)

def get_interactive_frontend_service(self, port):
def get_interactive_frontend_service(self, gremlin_port, cypher_port):
name = self.frontend_deployment_name
service_type = self._service_type
ports = [kube_client.V1ServicePort(name="gremlin", port=port)]
ports = [
kube_client.V1ServicePort(name="gremlin", port=gremlin_port),
kube_client.V1ServicePort(name="cypher", port=cypher_port),
]
service_spec = ResourceBuilder.get_service_spec(
service_type, ports, self._frontend_labels, None
)
51 changes: 40 additions & 11 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@
from gscoordinator.object_manager import ObjectManager
from gscoordinator.op_executor import OperationExecutor
from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH
from gscoordinator.utils import check_gremlin_server_ready
from gscoordinator.utils import check_server_ready
from gscoordinator.utils import create_single_op_dag
from gscoordinator.utils import str2bool
from gscoordinator.version import __version__
@@ -441,10 +441,16 @@ def _match_frontend_endpoint(pattern, lines):
return ""

# frontend endpoint pattern
FRONTEND_PATTERN = re.compile("(?<=FRONTEND_ENDPOINT:).*$")
FRONTEND_GREMLIN_PATTERN = re.compile("(?<=FRONTEND_GREMLIN_ENDPOINT:).*$")
FRONTEND_CYPHER_PATTERN = re.compile("(?<=FRONTEND_CYPHER_ENDPOINT:).*$")
# frontend external endpoint, for clients that are outside of cluster to connect
# only available in kubernetes mode, exposed by NodePort or LoadBalancer
FRONTEND_EXTERNAL_PATTERN = re.compile("(?<=FRONTEND_EXTERNAL_ENDPOINT:).*$")
FRONTEND_EXTERNAL_GREMLIN_PATTERN = re.compile(
"(?<=FRONTEND_EXTERNAL_GREMLIN_ENDPOINT:).*$"
)
FRONTEND_EXTERNAL_CYPHER_PATTERN = re.compile(
"(?<=FRONTEND_EXTERNAL_CYPHER_ENDPOINT:).*$"
)

# create instance
object_id = request.object_id
@@ -464,13 +470,22 @@ def _match_frontend_endpoint(pattern, lines):
return_code = proc.poll()
if return_code != 0:
raise RuntimeError(f"Error code: {return_code}, message {outs}")
# match frontend endpoint and check for ready
endpoint = _match_frontend_endpoint(FRONTEND_PATTERN, outs)
# match frontend endpoints and check for ready
gremlin_endpoint = _match_frontend_endpoint(FRONTEND_GREMLIN_PATTERN, outs)
cypher_endpoint = _match_frontend_endpoint(FRONTEND_CYPHER_PATTERN, outs)
logger.debug("Got endpoints: %s %s", gremlin_endpoint, cypher_endpoint)
# coordinator use internal endpoint
gie_manager.set_endpoint(endpoint)
if check_gremlin_server_ready(endpoint): # throws TimeoutError
gie_manager.set_endpoint(gremlin_endpoint)
if check_server_ready(
gremlin_endpoint, server="gremlin"
) and check_server_ready(
cypher_endpoint, server="cypher"
): # throws TimeoutError
logger.info(
"Built interactive frontend %s for graph %ld", endpoint, object_id
"Built interactive frontend gremlin: %s & cypher: %s for graph %ld",
gremlin_endpoint,
cypher_endpoint,
object_id,
)
except Exception as e:
context.set_code(grpc.StatusCode.ABORTED)
@@ -480,11 +495,25 @@ def _match_frontend_endpoint(pattern, lines):
self._launcher.close_interactive_instance(object_id)
self._object_manager.pop(object_id)
return message_pb2.CreateInteractiveInstanceResponse()
external_endpoint = _match_frontend_endpoint(FRONTEND_EXTERNAL_PATTERN, outs)
external_gremlin_endpoint = _match_frontend_endpoint(
FRONTEND_EXTERNAL_GREMLIN_PATTERN, outs
)
external_cypher_endpoint = _match_frontend_endpoint(
FRONTEND_EXTERNAL_CYPHER_PATTERN, outs
)
logger.debug(
"Got external endpoints: %s %s",
external_gremlin_endpoint,
external_cypher_endpoint,
)

# client use external endpoint (k8s mode), or internal endpoint (standalone mode)
endpoint = external_endpoint or endpoint
gremlin_endpoint = external_gremlin_endpoint or gremlin_endpoint
cypher_endpoint = external_cypher_endpoint or cypher_endpoint
return message_pb2.CreateInteractiveInstanceResponse(
gremlin_endpoint=endpoint, object_id=object_id
gremlin_endpoint=gremlin_endpoint,
cypher_endpoint=cypher_endpoint,
object_id=object_id,
)

def CreateLearningInstance(self, request, context):
7 changes: 4 additions & 3 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
@@ -636,12 +636,13 @@ def _distribute_interactive_process(
container,
str(self._interactive_port), # executor port
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2), # frontend port
str(self._interactive_port + 2), # frontend gremlin port
str(self._interactive_port + 3), # frontend cypher port
self._coordinator_name,
engine_selector,
params,
]
self._interactive_port += 3
self._interactive_port += 4
logger.info("Create GIE instance with command: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
@@ -947,7 +948,7 @@ def _create_frontend_deployment(self, name=None, owner_references=None):

def _create_frontend_service(self):
logger.info("Creating frontend service...")
service = self._engine_cluster.get_interactive_frontend_service(8233)
service = self._engine_cluster.get_interactive_frontend_service(8233, 7687)
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)
5 changes: 3 additions & 2 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
@@ -232,12 +232,13 @@ def create_interactive_instance(
str(num_workers), # server size
str(self._interactive_port), # executor port
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2 * num_workers), # frontend port
str(self._interactive_port + 2 * num_workers), # frontend gremlin port
str(self._interactive_port + 2 * num_workers + 1), # frontend cypher port
self.vineyard_socket,
params,
]
logger.info("Create GIE instance with command: %s", " ".join(cmd))
self._interactive_port += 3
self._interactive_port += 2 * num_workers + 2
process = subprocess.Popen(
cmd,
start_new_session=True,
41 changes: 37 additions & 4 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
@@ -1996,33 +1996,64 @@ def check_argument(condition, message=None):
raise ValueError(f"Check failed: {message}")


def check_gremlin_server_ready(endpoint):
def _check_task(endpoint):
def check_server_ready(endpoint, server="gremlin"):
def _check_gremlin_task(endpoint):
from gremlin_python.driver.client import Client

if "MY_POD_NAME" in os.environ:
# inner kubernetes env
if endpoint == "localhost" or endpoint == "127.0.0.1":
# now, used in macOS with docker-desktop kubernetes cluster,
# which external ip is 'localhost' when service type is 'LoadBalancer'
logger.info("In kubernetes env, gremlin server is ready.")
return True

try:
client = Client(f"ws://{endpoint}/gremlin", "g")
# May throw
client.submit("g.V().limit(1)").all().result()
logger.info("Gremlin server is ready.")
finally:
try:
client.close()
except: # noqa: E722
pass
return True

def _check_cypher_task(endpoint):
from neo4j import GraphDatabase

if "MY_POD_NAME" in os.environ:
# inner kubernetes env
if endpoint == "localhost" or endpoint == "127.0.0.1":
logger.info("In kubernetes env, cypher server is ready.")
return True

try:
logger.debug("Try to connect to cypher server.")
driver = GraphDatabase.driver(f"neo4j://{endpoint}", auth=("", ""))
# May throw
driver.verify_connectivity()
logger.info("Checked connectivity to cypher server.")
finally:
try:
driver.close()
except: # noqa: E722
pass
return True

executor = ThreadPoolExecutor(max_workers=20)

begin_time = time.time()
while True:
t = executor.submit(_check_task, endpoint)
if server == "gremlin":
t = executor.submit(_check_gremlin_task, endpoint)
elif server == "cypher":
t = executor.submit(_check_cypher_task, endpoint)
else:
raise ValueError(
f"Unsupported server type: {server} other than 'gremlin' or 'cypher'"
)
try:
_ = t.result(timeout=30)
except Exception as e:
@@ -2034,4 +2065,6 @@ def _check_task(endpoint):
time.sleep(3)
if time.time() - begin_time > INTERACTIVE_INSTANCE_TIMEOUT_SECONDS:
executor.shutdown(wait=False)
raise TimeoutError(f"Gremlin check query failed: {error_message}")
raise TimeoutError(
f"{server.capitalize()} check query failed: {error_message}"
)
Loading

0 comments on commit 209ff2e

Please sign in to comment.