From 6fcf68d3d2eddf5602abf408d98610fadbebbb35 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Mon, 5 Jun 2023 15:59:50 +0800 Subject: [PATCH 01/12] [GIE Compiler] unify execution client which is used to send request to engine service --- .../common/client/ExecutionClient.java | 38 ++++++ .../common/client/HttpExecutionClient.java | 114 ++++++++++++++++++ .../common/client/RpcBroadcastProcessor.java | 75 ------------ .../common/client/RpcExecutionClient.java | 99 +++++++++++++++ .../ChannelFetcher.java} | 15 ++- .../client/channel/HostURIChannelFetcher.java | 45 +++++++ .../HostsRpcChannelFetcher.java} | 14 +-- .../common/client/type/ExecutionRequest.java | 46 +++++++ .../type/ExecutionResponseListener.java | 30 +++++ .../graphscope/common/config/HQPSConfig.java | 25 ++++ .../common/config/PegasusConfig.java | 3 + .../common/ir/runtime/PhysicalBuilder.java | 61 ++++++++++ .../common/ir/tools/LogicalPlan.java | 64 ++++++++++ .../common/result/RecordParser.java | 34 ++++++ .../{client => result}/ResultParser.java | 6 +- .../processor/IrTestOpProcessor.java | 9 +- .../processor/IrStandardOpProcessor.java | 29 +++-- .../gremlin/result/GremlinResultParser.java | 2 +- .../result/parser/CypherResultParser.java | 2 +- .../processor/AbstractResultProcessor.java | 6 +- .../gremlin/service/GraphServiceMain.java | 6 +- .../gremlin/service/IrGremlinServer.java | 4 +- .../com/alibaba/pegasus/ClientExample.java | 75 ++++++------ .../java/com/alibaba/pegasus/RpcClient.java | 72 +++++------ .../pegasus/common/StreamIterator.java | 15 +-- .../executor/ir/proto/hqps.proto | 24 ++++ interactive_engine/pom.xml | 6 +- 27 files changed, 711 insertions(+), 208 deletions(-) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java delete mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcBroadcastProcessor.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java rename interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/{RpcChannelFetcher.java => channel/ChannelFetcher.java} (71%) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java rename interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/{HostsChannelFetcher.java => channel/HostsRpcChannelFetcher.java} (84%) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionResponseListener.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/PhysicalBuilder.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/LogicalPlan.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/RecordParser.java rename interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/{client => result}/ResultParser.java (83%) create mode 100644 interactive_engine/executor/ir/proto/hqps.proto diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java new file mode 100644 index 000000000000..fc591b25c4d6 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.client; + +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.type.ExecutionRequest; +import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; + +/** + * client to submit request to remote engine service + * @param + */ +public abstract class ExecutionClient { + protected final ChannelFetcher channelFetcher; + + public ExecutionClient(ChannelFetcher channelFetcher) { + this.channelFetcher = channelFetcher; + } + + public abstract void submit(ExecutionRequest request, ExecutionResponseListener listener) + throws Exception; + + public abstract void close() throws Exception; +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java new file mode 100644 index 000000000000..9c1be4fe6336 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -0,0 +1,114 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.client; + +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.type.ExecutionRequest; +import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.HQPSConfig; +import com.alibaba.graphscope.gaia.proto.Hqps; +import com.alibaba.graphscope.gaia.proto.IrResult; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * http client to send request to hqps engine service + */ +public class HttpExecutionClient extends ExecutionClient { + private static final Logger logger = LoggerFactory.getLogger(HttpExecutionClient.class); + private static final String CONTENT_TYPE = "Content-Type"; + private static final String TEXT_PLAIN = "text/plain;charset=UTF-8"; + private final HttpClient httpClient; + + public HttpExecutionClient(Configs graphConfig, ChannelFetcher channelFetcher) { + super(channelFetcher); + this.httpClient = + HttpClient.newBuilder() + .connectTimeout( + Duration.ofMillis(HQPSConfig.HQPS_HTTP_TIMEOUT.get(graphConfig))) + .build(); + } + + @Override + public void submit(ExecutionRequest request, ExecutionResponseListener listener) + throws Exception { + List responseFutures = Lists.newArrayList(); + for (URI httpURI : channelFetcher.fetch()) { + HttpRequest httpRequest = + HttpRequest.newBuilder() + .uri(httpURI) + .headers(CONTENT_TYPE, TEXT_PLAIN) + .POST( + HttpRequest.BodyPublishers.ofByteArray( + (byte[]) request.getRequestPhysical().build())) + .build(); + CompletableFuture> responseFuture = + httpClient + .sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) + .whenComplete( + (bytes, exception) -> { + if (exception != null) { + listener.onError(exception); + } + try { + Hqps.HighQPSResults results = + Hqps.HighQPSResults.parseFrom(bytes.body()); + for (IrResult.Results irResult : + results.getResultsList()) { + listener.onNext(irResult.getRecord()); + } + } catch (InvalidProtocolBufferException e) { + listener.onError(e); + } + }); + responseFutures.add(responseFuture); + } + CompletableFuture joinFuture = + CompletableFuture.runAsync( + () -> { + try { + CompletableFuture.allOf( + responseFutures.toArray(new CompletableFuture[0])) + .get(); + listener.onCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + joinFuture.whenComplete( + (aVoid, exception) -> { + if (exception != null) { + listener.onError(exception); + } + }); + } + + @Override + public void close() throws Exception {} +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcBroadcastProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcBroadcastProcessor.java deleted file mode 100644 index 6b31baa7f7ae..000000000000 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcBroadcastProcessor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.graphscope.common.client; - -import com.alibaba.pegasus.RpcClient; -import com.alibaba.pegasus.intf.CloseableIterator; -import com.alibaba.pegasus.intf.ResultProcessor; -import com.alibaba.pegasus.service.protocol.PegasusClient; - -import io.grpc.Status; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class RpcBroadcastProcessor implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(RpcBroadcastProcessor.class); - - protected RpcClient rpcClient; - protected RpcChannelFetcher fetcher; - - public RpcBroadcastProcessor(RpcChannelFetcher fetcher) { - this.fetcher = fetcher; - if (!fetcher.isDynamic()) { - this.rpcClient = new RpcClient(fetcher.fetch()); - } - } - - public void broadcast(PegasusClient.JobRequest request, ResultProcessor processor) { - CloseableIterator iterator = null; - try { - if (fetcher.isDynamic()) { - this.rpcClient = new RpcClient(fetcher.fetch()); - } - iterator = rpcClient.submit(request); - // process response - while (iterator.hasNext()) { - PegasusClient.JobResponse response = iterator.next(); - processor.process(response); - } - processor.finish(); - } catch (Throwable e) { - logger.error("get result from grpc returns error {}", e); - processor.error(Status.fromThrowable(e)); - } finally { - if (iterator != null) { - try { - iterator.close(); - } catch (IOException ioe) { - logger.error("iterator close fail {}", ioe); - } - } - } - } - - @Override - public void close() throws Exception { - this.rpcClient.shutdown(); - } -} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java new file mode 100644 index 000000000000..06989c81d070 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java @@ -0,0 +1,99 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.client; + +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.type.ExecutionRequest; +import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.PegasusConfig; +import com.alibaba.graphscope.gaia.proto.IrResult; +import com.alibaba.pegasus.RpcChannel; +import com.alibaba.pegasus.RpcClient; +import com.alibaba.pegasus.intf.ResultProcessor; +import com.alibaba.pegasus.service.protocol.PegasusClient; + +import io.grpc.Status; + +/** + * rpc client to send request to pegasus engine service + */ +public class RpcExecutionClient extends ExecutionClient { + private final Configs graphConfig; + private final RpcClient rpcClient; + + public RpcExecutionClient(Configs graphConfig, ChannelFetcher channelFetcher) { + super(channelFetcher); + this.graphConfig = graphConfig; + this.rpcClient = + new RpcClient( + PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(graphConfig), + channelFetcher.fetch()); + } + + @Override + public void submit(ExecutionRequest request, ExecutionResponseListener listener) + throws Exception { + PegasusClient.JobRequest jobRequest = + PegasusClient.JobRequest.parseFrom((byte[]) request.getRequestPhysical().build()); + PegasusClient.JobConfig jobConfig = + PegasusClient.JobConfig.newBuilder() + .setJobId(request.getRequestId()) + .setJobName(request.getRequestName()) + .setWorkers(PegasusConfig.PEGASUS_WORKER_NUM.get(graphConfig)) + .setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(graphConfig)) + .setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(graphConfig)) + .setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(graphConfig)) + .setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(graphConfig)) + .setAll( + com.alibaba.pegasus.service.protocol.PegasusClient.Empty + .newBuilder() + .build()) + .build(); + jobRequest = jobRequest.toBuilder().setConf(jobConfig).build(); + this.rpcClient.submit( + jobRequest, + new ResultProcessor() { + @Override + public void process(PegasusClient.JobResponse jobResponse) { + try { + listener.onNext( + IrResult.Results.parseFrom(jobResponse.getResp()).getRecord()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void finish() { + listener.onCompleted(); + } + + @Override + public void error(Status status) { + listener.onError(status.asException()); + } + }); + } + + @Override + public void close() throws Exception { + if (rpcClient != null) { + this.rpcClient.shutdown(); + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/ChannelFetcher.java similarity index 71% rename from interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcChannelFetcher.java rename to interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/ChannelFetcher.java index daf79cbb7895..0e88d4398917 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/ChannelFetcher.java @@ -14,15 +14,14 @@ * limitations under the License. */ -package com.alibaba.graphscope.common.client; - -import com.alibaba.pegasus.RpcChannel; +package com.alibaba.graphscope.common.client.channel; import java.util.List; -public interface RpcChannelFetcher { - List fetch(); - - // dynamic channel need update with the changes of host access url - boolean isDynamic(); +/** + * fetch channel from remote + * @param + */ +public interface ChannelFetcher { + List fetch(); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java new file mode 100644 index 000000000000..89b101e7a0e9 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.client.channel; + +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.HQPSConfig; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * http implementation of {@link ChannelFetcher}, init http from local config + */ +public class HostURIChannelFetcher implements ChannelFetcher { + private Configs graphConfig; + + public HostURIChannelFetcher(Configs graphConfig) { + this.graphConfig = graphConfig; + } + + @Override + public List fetch() { + String hosts = HQPSConfig.HQPS_URIS.get(graphConfig); + String[] hostsArr = hosts.split(","); + return Arrays.asList(hostsArr).stream() + .map(k -> URI.create(k)) + .collect(Collectors.toList()); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HostsChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java similarity index 84% rename from interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HostsChannelFetcher.java rename to interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java index 3e69bfb5033b..dc069e57b33c 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HostsChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.alibaba.graphscope.common.client; +package com.alibaba.graphscope.common.client.channel; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.PegasusConfig; @@ -24,10 +24,13 @@ import java.util.Arrays; import java.util.List; -public class HostsChannelFetcher implements RpcChannelFetcher { +/** + * rpc implementation of {@link ChannelFetcher}, init rpc from local config + */ +public class HostsRpcChannelFetcher implements ChannelFetcher { private Configs config; - public HostsChannelFetcher(Configs config) { + public HostsRpcChannelFetcher(Configs config) { this.config = config; } @@ -44,9 +47,4 @@ public List fetch() { }); return rpcChannels; } - - @Override - public boolean isDynamic() { - return false; - } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java new file mode 100644 index 000000000000..5321c2515f4d --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.client.type; + +import com.alibaba.graphscope.common.ir.runtime.PhysicalBuilder; + +/** + * request to submit to remote engine service + */ +public class ExecutionRequest { + private final long requestId; + private final String requestName; + private final PhysicalBuilder requestPhysical; + + public ExecutionRequest(long requestId, String requestName, PhysicalBuilder requestPhysical) { + this.requestId = requestId; + this.requestName = requestName; + this.requestPhysical = requestPhysical; + } + + public long getRequestId() { + return requestId; + } + + public String getRequestName() { + return requestName; + } + + public PhysicalBuilder getRequestPhysical() { + return requestPhysical; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionResponseListener.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionResponseListener.java new file mode 100644 index 000000000000..6570aebb385f --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionResponseListener.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.client.type; + +import com.alibaba.graphscope.gaia.proto.IrResult; + +/** + * listener to handle response from remote engine service + */ +public interface ExecutionResponseListener { + void onNext(IrResult.Record record); + + void onCompleted(); + + void onError(Throwable t); +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java new file mode 100644 index 000000000000..4794eff864b2 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.config; + +public class HQPSConfig { + public static final Config HQPS_URIS = + Config.stringConfig("hqps.uris", "http://localhost:8080"); + + public static final Config HQPS_HTTP_TIMEOUT = + Config.longConfig("hqps.http.timeout", 6000000l); +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java index 9de06e24a815..4a32b122bb84 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java @@ -34,4 +34,7 @@ public class PegasusConfig { public static final Config PEGASUS_HOSTS = Config.stringConfig("pegasus.hosts", "localhost:8080"); + + public static final Config PEGASUS_GRPC_TIMEOUT = + Config.longConfig("pegasus.grpc.timeout", 6000000l); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/PhysicalBuilder.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/PhysicalBuilder.java new file mode 100644 index 000000000000..16b6eeac87c1 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/PhysicalBuilder.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.ir.runtime; + +import com.alibaba.graphscope.common.ir.tools.LogicalPlan; + +import org.apache.commons.lang3.StringUtils; + +/** + * build physical plan from logical plan + * @param is the actual type of physical plan + */ +public abstract class PhysicalBuilder implements AutoCloseable { + protected final LogicalPlan logicalPlan; + + protected PhysicalBuilder(LogicalPlan logicalPlan) { + this.logicalPlan = logicalPlan; + } + + /** + * print physical plan + */ + public abstract String explain(); + + /** + * build physical plan + * @return + */ + public abstract R build(); + + public static final PhysicalBuilder createEmpty(LogicalPlan logicalPlan) { + return new PhysicalBuilder(logicalPlan) { + @Override + public String explain() { + return StringUtils.EMPTY; + } + + @Override + public Object build() { + return null; + } + + @Override + public void close() throws Exception {} + }; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/LogicalPlan.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/LogicalPlan.java new file mode 100644 index 000000000000..bf0c0dd66a2f --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/LogicalPlan.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.ir.tools; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.commons.lang3.StringUtils; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Objects; + +/** + * logical plan for a query which can be a regular query or a procedure call + */ +public class LogicalPlan { + private @Nullable RelNode regularQuery; + private @Nullable RexNode procedureCall; + private boolean returnEmpty; + + public LogicalPlan(RelNode regularQuery, boolean returnEmpty) { + this.regularQuery = Objects.requireNonNull(regularQuery); + this.returnEmpty = returnEmpty; + } + + public LogicalPlan(RexNode procedureCall) { + this.procedureCall = Objects.requireNonNull(procedureCall); + } + + public @Nullable RelNode getRegularQuery() { + return regularQuery; + } + + public @Nullable RexNode getProcedureCall() { + return procedureCall; + } + + public boolean isReturnEmpty() { + return returnEmpty; + } + + public String explain() { + if (this.regularQuery != null) { + return this.regularQuery.explain(); + } else if (this.procedureCall != null) { + return this.procedureCall.toString(); + } else { + return StringUtils.EMPTY; + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/RecordParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/RecordParser.java new file mode 100644 index 000000000000..ce6340ddc4f8 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/RecordParser.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.result; + +import com.alibaba.graphscope.gaia.proto.IrResult; + +import org.apache.calcite.rel.type.RelDataType; + +import java.util.List; + +/** + * parse record returning from remote engine service + * @param + */ +public interface RecordParser { + List parseFrom(IrResult.Record record); + + // schema of returning data + RelDataType schema(); +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ResultParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/ResultParser.java similarity index 83% rename from interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ResultParser.java rename to interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/ResultParser.java index 3207d6263a25..7ccfa4bdb0aa 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ResultParser.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/result/ResultParser.java @@ -14,12 +14,12 @@ * limitations under the License. */ -package com.alibaba.graphscope.common.client; +package com.alibaba.graphscope.common.result; import com.alibaba.pegasus.service.protocol.PegasusClient; import java.util.List; -public interface ResultParser { - List parseFrom(PegasusClient.JobResponse response); +public interface ResultParser { + List parseFrom(PegasusClient.JobResponse response); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java index 870dfdfc9472..f108d242a7aa 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.graphscope.gremlin.integration.processor; -import com.alibaba.graphscope.common.client.RpcChannelFetcher; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.store.IrMeta; @@ -58,7 +58,7 @@ public class IrTestOpProcessor extends IrStandardOpProcessor { public IrTestOpProcessor( Configs configs, IrMetaFetcher irMetaFetcher, - RpcChannelFetcher fetcher, + ChannelFetcher fetcher, IrMetaQueryCallback metaQueryCallback, Graph graph, GraphTraversalSource g, @@ -117,11 +117,6 @@ public ThrowingConsumer select(Context ctx) { } } - @Override - public void close() throws Exception { - this.broadcastProcessor.close(); - } - private String getScript(Bytecode byteCode) { String script = GroovyTranslator.of("g").translate(byteCode).getScript(); // remove type cast from original script, g.V().has("age",P.gt((int) 30)) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 82586f4f85c6..d052ef9bda10 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -26,7 +26,7 @@ package com.alibaba.graphscope.gremlin.plugin.processor; import com.alibaba.graphscope.common.IrPlan; -import com.alibaba.graphscope.common.client.*; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.config.PlannerConfig; @@ -52,12 +52,12 @@ import com.alibaba.graphscope.gremlin.plugin.strategy.ScanFusionStepStrategy; import com.alibaba.graphscope.gremlin.result.processor.CypherResultProcessor; import com.alibaba.graphscope.gremlin.result.processor.GremlinResultProcessor; +import com.alibaba.pegasus.RpcClient; import com.alibaba.pegasus.intf.ResultProcessor; import com.alibaba.pegasus.service.protocol.PegasusClient; import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.sun.jna.Pointer; - import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.GraphOptCluster; import org.apache.calcite.plan.RelOptPlanner; @@ -88,6 +88,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.script.SimpleBindings; import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -97,8 +98,6 @@ import java.util.function.Function; import java.util.function.Supplier; -import javax.script.SimpleBindings; - public class IrStandardOpProcessor extends StandardOpProcessor { private static Logger metricLogger = LoggerFactory.getLogger("MetricLog"); private static Logger logger = LoggerFactory.getLogger(IrStandardOpProcessor.class); @@ -108,7 +107,11 @@ public class IrStandardOpProcessor extends StandardOpProcessor { protected GraphTraversalSource g; protected Configs configs; protected PlannerConfig plannerConfig; - protected RpcBroadcastProcessor broadcastProcessor; + /** + * todo: replace with {@link com.alibaba.graphscope.common.client.ExecutionClient} after unifying Gremlin into the Calcite stack + */ + protected RpcClient rpcClient; + protected IrMetaFetcher irMetaFetcher; protected IrMetaQueryCallback metaQueryCallback; @@ -117,7 +120,7 @@ public class IrStandardOpProcessor extends StandardOpProcessor { public IrStandardOpProcessor( Configs configs, IrMetaFetcher irMetaFetcher, - RpcChannelFetcher fetcher, + ChannelFetcher fetcher, IrMetaQueryCallback metaQueryCallback, Graph graph, GraphTraversalSource g) { @@ -126,7 +129,8 @@ public IrStandardOpProcessor( this.configs = configs; this.plannerConfig = PlannerConfig.create(this.configs); this.irMetaFetcher = irMetaFetcher; - this.broadcastProcessor = new RpcBroadcastProcessor(fetcher); + this.rpcClient = + new RpcClient(PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(configs), fetcher.fetch()); this.metaQueryCallback = metaQueryCallback; RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl()); @@ -430,7 +434,7 @@ protected void processTraversal( .setAll(PegasusClient.Empty.newBuilder().build()) .build(); request = request.toBuilder().setConf(jobConfig).build(); - broadcastProcessor.broadcast(request, resultProcessor); + this.rpcClient.submit(request, resultProcessor); } protected void processRelNode( @@ -482,7 +486,7 @@ protected void processRelNode( .setAll(PegasusClient.Empty.newBuilder().build()) .build(); request = request.toBuilder().setConf(jobConfig).build(); - broadcastProcessor.broadcast(request, resultProcessor); + this.rpcClient.submit(request, resultProcessor); } } } @@ -542,4 +546,11 @@ private RelOptPlanner getRelOptPlanner() { return new HepPlanner(HepProgram.builder().build()); } } + + @Override + public void close() throws Exception { + if (this.rpcClient != null) { + this.rpcClient.shutdown(); + } + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParser.java index 960e8dba8798..c364d058b244 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParser.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParser.java @@ -16,7 +16,7 @@ package com.alibaba.graphscope.gremlin.result; -import com.alibaba.graphscope.common.client.ResultParser; +import com.alibaba.graphscope.common.result.ResultParser; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gremlin.exception.GremlinResultParserException; import com.alibaba.pegasus.service.protocol.PegasusClient; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/parser/CypherResultParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/parser/CypherResultParser.java index 273b7d541dfb..8260815e69b9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/parser/CypherResultParser.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/parser/CypherResultParser.java @@ -16,11 +16,11 @@ package com.alibaba.graphscope.gremlin.result.parser; -import com.alibaba.graphscope.common.client.ResultParser; import com.alibaba.graphscope.common.ir.type.GraphLabelType; import com.alibaba.graphscope.common.ir.type.GraphPxdElementType; import com.alibaba.graphscope.common.ir.type.GraphSchemaType; import com.alibaba.graphscope.common.ir.type.GraphSchemaTypeList; +import com.alibaba.graphscope.common.result.ResultParser; import com.alibaba.graphscope.gaia.proto.Common; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gremlin.exception.GremlinResultParserException; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index b6497739b569..56630bebf540 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.graphscope.gremlin.result.processor; -import com.alibaba.graphscope.common.client.ResultParser; +import com.alibaba.graphscope.common.result.ResultParser; import com.alibaba.pegasus.intf.ResultProcessor; import com.alibaba.pegasus.service.protocol.PegasusClient; @@ -36,7 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; public abstract class AbstractResultProcessor extends StandardOpProcessor diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/GraphServiceMain.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/GraphServiceMain.java index 424886cdb6dd..0aa6c49f7529 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/GraphServiceMain.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/GraphServiceMain.java @@ -11,8 +11,8 @@ package com.alibaba.graphscope.gremlin.service; -import com.alibaba.graphscope.common.client.HostsChannelFetcher; -import com.alibaba.graphscope.common.client.RpcChannelFetcher; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.channel.HostsRpcChannelFetcher; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.FileLoadType; import com.alibaba.graphscope.common.config.GraphConfig; @@ -28,7 +28,7 @@ public class GraphServiceMain { public static void main(String[] args) throws Exception { Configs configs = new Configs("conf/ir.compiler.properties", FileLoadType.RELATIVE_PATH); IrMetaFetcher irMetaFetcher = new ExperimentalMetaFetcher(configs); - RpcChannelFetcher fetcher = new HostsChannelFetcher(configs); + ChannelFetcher fetcher = new HostsRpcChannelFetcher(configs); IrGremlinServer server = new IrGremlinServer(); String storeType = GraphConfig.GRAPH_STORE.get(configs); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java index 70ccc9aedd5e..656f4e1440e4 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java @@ -16,7 +16,7 @@ package com.alibaba.graphscope.gremlin.service; -import com.alibaba.graphscope.common.client.RpcChannelFetcher; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.store.IrMetaFetcher; @@ -69,7 +69,7 @@ public IrGremlinServer(int gremlinPort) { public void start( Configs configs, IrMetaFetcher irMetaFetcher, - RpcChannelFetcher fetcher, + ChannelFetcher fetcher, IrMetaQueryCallback metaQueryCallback, GraphProperties testGraph) throws Exception { diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java index 56c8c5f31734..5e05a311f697 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/ClientExample.java @@ -1,22 +1,24 @@ -/** +/* * Copyright 2020 Alibaba Group Holding Limited. - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.pegasus; import com.alibaba.pegasus.builder.JobBuilder; -import com.alibaba.pegasus.intf.CloseableIterator; +import com.alibaba.pegasus.common.StreamIterator; +import com.alibaba.pegasus.intf.ResultProcessor; import com.alibaba.pegasus.service.protocol.PegasusClient; import com.alibaba.pegasus.service.protocol.PegasusClient.JobConfig; import com.alibaba.pegasus.service.protocol.PegasusClient.JobRequest; @@ -28,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -36,20 +37,6 @@ public class ClientExample { private static final Logger logger = LoggerFactory.getLogger(ClientExample.class); - private static void process(JobResponse response) { - ByteString data = response.getResp(); - ArrayList res = toLongArray(data.toByteArray(), data.size()); - logger.info( - "got one response: job id {}, array size {}, job data {}", - response.getJobId(), - res.size(), - res.toString()); - } - - private static void finish() { - logger.info("finish process"); - } - private static void error(Status status) { logger.error("on error {}", status.toString()); } @@ -112,7 +99,7 @@ public static void main(String[] args) throws Exception { List channels = new ArrayList<>(); channels.add(rpcChannel0); channels.add(rpcChannel1); - RpcClient rpcClient = new RpcClient(channels); + RpcClient rpcClient = new RpcClient(600000, channels); logger.info("Will try to send request"); JobConfig confPb = @@ -134,26 +121,34 @@ public static void main(String[] args) throws Exception { .sink(getSink()); JobRequest req = jobBuilder.build(); - CloseableIterator iterator = rpcClient.submit(req); - // process response - try { - while (iterator.hasNext()) { - JobResponse response = iterator.next(); - process(response); - } - } catch (Exception e) { - if (iterator != null) { - try { - iterator.close(); - } catch (IOException ioe) { - // Ignore - } - } - error(Status.fromThrowable(e)); - throw e; - } - finish(); + StreamIterator resultIterator = new StreamIterator(); + rpcClient.submit( + req, + new ResultProcessor() { + @Override + public void process(JobResponse response) { + try { + resultIterator.putData(response); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void finish() { + try { + resultIterator.finish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void error(Status status) { + resultIterator.fail(status.getCause()); + } + }); rpcClient.shutdown(); } } diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java index bc99fc9f3e27..088552af7cbd 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java @@ -1,22 +1,22 @@ -/** +/* * Copyright 2020 Alibaba Group Holding Limited. - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.pegasus; -import com.alibaba.pegasus.common.StreamIterator; -import com.alibaba.pegasus.intf.CloseableIterator; +import com.alibaba.pegasus.intf.ResultProcessor; import com.alibaba.pegasus.service.protocol.JobServiceGrpc; import com.alibaba.pegasus.service.protocol.JobServiceGrpc.JobServiceStub; import com.alibaba.pegasus.service.protocol.PegasusClient.JobRequest; @@ -29,34 +29,38 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class RpcClient { private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); + private final List channels; + private final List serviceStubs; + private final long rpcTimeout; - private List channels; - - public RpcClient(List channels) { - this.channels = channels; + public RpcClient(long grpcTimeout, List channels) { + this.rpcTimeout = grpcTimeout; + this.channels = Objects.requireNonNull(channels); + this.serviceStubs = + channels.stream() + .map(k -> JobServiceGrpc.newStub(k.getChannel())) + .collect(Collectors.toList()); } - public CloseableIterator submit(JobRequest jobRequest) - throws InterruptedException { - StreamIterator responseIterator = new StreamIterator<>(); + public void submit(JobRequest jobRequest, ResultProcessor processor) { AtomicInteger counter = new AtomicInteger(this.channels.size()); AtomicBoolean finished = new AtomicBoolean(false); - for (RpcChannel rpcChannel : channels) { - JobServiceStub asyncStub = JobServiceGrpc.newStub(rpcChannel.getChannel()); - // todo: make timeout configurable - asyncStub - .withDeadlineAfter(600000, TimeUnit.MILLISECONDS) - .submit( - jobRequest, - new JobResponseObserver(responseIterator, finished, counter)); - } - return responseIterator; + serviceStubs.forEach( + asyncStub -> { + asyncStub + .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS) + .submit( + jobRequest, + new JobResponseObserver(processor, finished, counter)); + }); } public void shutdown() throws InterruptedException { @@ -66,15 +70,13 @@ public void shutdown() throws InterruptedException { } private static class JobResponseObserver implements StreamObserver { - private final StreamIterator iterator; + private final ResultProcessor processor; private final AtomicBoolean finished; private final AtomicInteger counter; public JobResponseObserver( - StreamIterator iterator, - AtomicBoolean finished, - AtomicInteger counter) { - this.iterator = iterator; + ResultProcessor processor, AtomicBoolean finished, AtomicInteger counter) { + this.processor = processor; this.finished = finished; this.counter = counter; } @@ -84,11 +86,7 @@ public void onNext(JobResponse jobResponse) { if (finished.get()) { return; } - try { - this.iterator.putData(jobResponse); - } catch (InterruptedException e) { - onError(e); - } + processor.process(jobResponse); } @Override @@ -98,7 +96,7 @@ public void onError(Throwable throwable) { } Status status = Status.fromThrowable(throwable); logger.error("get job response error: {}", status); - this.iterator.fail(throwable); + processor.error(status); } @Override @@ -106,11 +104,7 @@ public void onCompleted() { logger.info("finish get job response from one server"); if (counter.decrementAndGet() == 0) { logger.info("finish get job response from all servers"); - try { - this.iterator.finish(); - } catch (InterruptedException e) { - onError(e); - } + processor.finish(); } } } diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/common/StreamIterator.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/common/StreamIterator.java index 6fea23c22a52..26e764891abd 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/common/StreamIterator.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/common/StreamIterator.java @@ -1,18 +1,19 @@ -/** +/* * Copyright 2020 Alibaba Group Holding Limited. - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.pegasus.common; import com.alibaba.pegasus.intf.CloseableIterator; @@ -52,9 +53,9 @@ public boolean hasNext() { } catch (InterruptedException ie) { throw new RuntimeException(ie); } - if (head == PILL) { - return false; - } + } + if (head == PILL) { + return false; } return true; } diff --git a/interactive_engine/executor/ir/proto/hqps.proto b/interactive_engine/executor/ir/proto/hqps.proto new file mode 100644 index 000000000000..cb3b341a0c3c --- /dev/null +++ b/interactive_engine/executor/ir/proto/hqps.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; +package hqps; +option java_package = "com.alibaba.graphscope.gaia.proto"; +option java_outer_classname = "Hqps"; + +import "common.proto"; +import "results.proto"; + +message Argument { + string param_name = 1; // param name + int32 param_ind = 2; // index of param + common.Value value = 3; // real value +} + +message HighQPSQuery { + common.NameOrId query_name = 1; + repeated Argument arguments = 2; +} + +message HighQPSResults { + // may be add schema here. + repeated results.Results results = 1; +} + diff --git a/interactive_engine/pom.xml b/interactive_engine/pom.xml index 783080d1cf7b..574fd62c8ff4 100644 --- a/interactive_engine/pom.xml +++ b/interactive_engine/pom.xml @@ -572,9 +572,9 @@ maven-compiler-plugin ${maven.compile.version} - 1.8 - 1.8 - + 11 + 11 + maven-surefire-plugin From edd3ea93740f9a3addf4896f77ee9e20bcc425d8 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Tue, 6 Jun 2023 09:57:38 +0800 Subject: [PATCH 02/12] format java codes --- .../gremlin/plugin/processor/IrStandardOpProcessor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index d052ef9bda10..58cb8d66e5bc 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -58,6 +58,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.sun.jna.Pointer; + import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.GraphOptCluster; import org.apache.calcite.plan.RelOptPlanner; @@ -88,7 +89,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.script.SimpleBindings; import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -98,6 +98,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import javax.script.SimpleBindings; + public class IrStandardOpProcessor extends StandardOpProcessor { private static Logger metricLogger = LoggerFactory.getLogger("MetricLog"); private static Logger logger = LoggerFactory.getLogger(IrStandardOpProcessor.class); From 10daf0dad3780348bdf7379dcab6854444287962 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Tue, 6 Jun 2023 13:17:49 +0800 Subject: [PATCH 03/12] [GIE Compiler] fix compile error --- .../java/com/alibaba/graphscope/frontend/Frontend.java | 6 +++--- .../graphscope/groot/servers/ir/IrServiceProducer.java | 4 ++-- .../groot/servers/ir/RpcChannelManagerFetcher.java | 9 ++------- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/Frontend.java b/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/Frontend.java index 89f671dac201..85a3b8da8e02 100644 --- a/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/Frontend.java +++ b/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/Frontend.java @@ -1,7 +1,7 @@ package com.alibaba.graphscope.frontend; -import com.alibaba.graphscope.common.client.HostsChannelFetcher; -import com.alibaba.graphscope.common.client.RpcChannelFetcher; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.channel.HostsRpcChannelFetcher; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.FrontendConfig; import com.alibaba.graphscope.common.config.GraphConfig; @@ -34,7 +34,7 @@ public void start() throws Exception { String vineyardSchemaPath = GraphConfig.GRAPH_SCHEMA.get(configs); logger.info("Read schema from vineyard schema file {}", vineyardSchemaPath); IrMetaFetcher irMetaFetcher = new VineyardMetaFetcher(vineyardSchemaPath); - RpcChannelFetcher channelFetcher = new HostsChannelFetcher(configs); + ChannelFetcher channelFetcher = new HostsRpcChannelFetcher(configs); int port = FrontendConfig.FRONTEND_SERVICE_PORT.get(configs); IrMetaQueryCallback queryCallback = new IrMetaQueryCallback(irMetaFetcher); server = new IrGremlinServer(port); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java index ef910307cd5c..c636dfbd05d1 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java @@ -16,7 +16,7 @@ package com.alibaba.graphscope.groot.servers.ir; -import com.alibaba.graphscope.common.client.RpcChannelFetcher; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.store.IrMetaFetcher; import com.alibaba.graphscope.compiler.api.schema.SchemaFetcher; @@ -55,7 +55,7 @@ public AbstractService makeGraphService( SchemaFetcher schemaFetcher, ChannelManager channelManager) { int executorCount = CommonConfig.STORE_NODE_COUNT.get(configs); int port = GremlinConfig.GREMLIN_PORT.get(configs); - RpcChannelFetcher channelFetcher = + ChannelFetcher channelFetcher = new RpcChannelManagerFetcher(channelManager, executorCount, RoleType.GAIA_RPC); com.alibaba.graphscope.common.config.Configs irConfigs = getConfigs(); IrMetaFetcher irMetaFetcher = new GrootMetaFetcher(schemaFetcher); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/RpcChannelManagerFetcher.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/RpcChannelManagerFetcher.java index 230c14071652..e97c860c8d87 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/RpcChannelManagerFetcher.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/RpcChannelManagerFetcher.java @@ -16,7 +16,7 @@ package com.alibaba.graphscope.groot.servers.ir; -import com.alibaba.graphscope.common.client.RpcChannelFetcher; +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.rpc.ChannelManager; import com.alibaba.pegasus.RpcChannel; @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.List; -public class RpcChannelManagerFetcher implements RpcChannelFetcher { +public class RpcChannelManagerFetcher implements ChannelFetcher { private ChannelManager manager; private int nodeCount; private RoleType targetRole; @@ -44,9 +44,4 @@ public List fetch() { } return channels; } - - @Override - public boolean isDynamic() { - return true; - } } From e247a3567d4555e9e7e97c6773e2c356833eba0a Mon Sep 17 00:00:00 2001 From: shirly121 Date: Tue, 6 Jun 2023 13:33:44 +0800 Subject: [PATCH 04/12] [GIE Compiler] refine codes according to code refactor --- .../java/com/alibaba/graphscope/common/config/HQPSConfig.java | 2 +- .../com/alibaba/graphscope/common/config/PegasusConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java index 4794eff864b2..971f7a3f7e7e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java @@ -21,5 +21,5 @@ public class HQPSConfig { Config.stringConfig("hqps.uris", "http://localhost:8080"); public static final Config HQPS_HTTP_TIMEOUT = - Config.longConfig("hqps.http.timeout", 6000000l); + Config.longConfig("hqps.http.timeout", 6000000L); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java index 4a32b122bb84..06f45d7a13e4 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PegasusConfig.java @@ -36,5 +36,5 @@ public class PegasusConfig { Config.stringConfig("pegasus.hosts", "localhost:8080"); public static final Config PEGASUS_GRPC_TIMEOUT = - Config.longConfig("pegasus.grpc.timeout", 6000000l); + Config.longConfig("pegasus.grpc.timeout", 6000000L); } From 29ca5d32b37dfef78024df7435958ede42c0ff72 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Tue, 6 Jun 2023 15:18:20 +0800 Subject: [PATCH 05/12] [GIE Compiler] for debug --- .github/workflows/local-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/local-ci.yml b/.github/workflows/local-ci.yml index 961ad18c8e50..11b2e3879d69 100644 --- a/.github/workflows/local-ci.yml +++ b/.github/workflows/local-ci.yml @@ -235,7 +235,7 @@ jobs: - name: Build Wheels run: | - # sleep infinity + sleep infinity git submodule update --init cd ${GITHUB_WORKSPACE}/learning_engine/graph-learn && git submodule update --init third_party/pybind11 From 13db72b9c89fb521158f7f19cc2fd18627582ccd Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 6 Jun 2023 17:33:33 +0800 Subject: [PATCH 06/12] install openjdk 11 --- .github/workflows/local-ci.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/local-ci.yml b/.github/workflows/local-ci.yml index 11b2e3879d69..aa2820b87272 100644 --- a/.github/workflows/local-ci.yml +++ b/.github/workflows/local-ci.yml @@ -233,9 +233,14 @@ jobs: restore-keys: | ${{ runner.os }}-pip- + - name: Install java 11 (TODO(siyuan) Remove after newer wheel image are released) + run: | + sudo yum install java-11-openjdk + sudo yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y + - name: Build Wheels run: | - sleep infinity + #sleep infinity git submodule update --init cd ${GITHUB_WORKSPACE}/learning_engine/graph-learn && git submodule update --init third_party/pybind11 From 22880819c6bce492ab7f523d5ce226f39aef6cbe Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 6 Jun 2023 18:00:31 +0800 Subject: [PATCH 07/12] update --- .github/workflows/local-ci.yml | 5 ----- k8s/internal/Makefile | 3 +++ 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/local-ci.yml b/.github/workflows/local-ci.yml index aa2820b87272..52d1d33e1d28 100644 --- a/.github/workflows/local-ci.yml +++ b/.github/workflows/local-ci.yml @@ -233,11 +233,6 @@ jobs: restore-keys: | ${{ runner.os }}-pip- - - name: Install java 11 (TODO(siyuan) Remove after newer wheel image are released) - run: | - sudo yum install java-11-openjdk - sudo yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y - - name: Build Wheels run: | #sleep infinity diff --git a/k8s/internal/Makefile b/k8s/internal/Makefile index 247c7b5133d0..832223b7be12 100644 --- a/k8s/internal/Makefile +++ b/k8s/internal/Makefile @@ -101,6 +101,9 @@ graphscope-darwin-py3: done graphscope-manylinux2014-py3-nodocker: + # TODO(siyuan) Remove after newer wheel image are released + sudo yum install java-11-openjdk-devel -y && \ + sudo yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y && \ cd $(WORKING_DIR)/../.. && \ if [[ "${ARCH}" == "aarch64" ]]; then python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ export AUDITWHEEL_PLAT=manylinux2014_${ARCH}; \ From 739fe49cfbf25c133550dc6a0c7e823188050184 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 6 Jun 2023 19:15:44 +0800 Subject: [PATCH 08/12] use tab --- k8s/internal/Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/k8s/internal/Makefile b/k8s/internal/Makefile index 832223b7be12..bce71eb40fe0 100644 --- a/k8s/internal/Makefile +++ b/k8s/internal/Makefile @@ -101,9 +101,9 @@ graphscope-darwin-py3: done graphscope-manylinux2014-py3-nodocker: - # TODO(siyuan) Remove after newer wheel image are released - sudo yum install java-11-openjdk-devel -y && \ - sudo yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y && \ + # TODO(siyuan) Remove after newer wheel image are released + sudo yum install java-11-openjdk-devel -y && \ + sudo yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y && \ cd $(WORKING_DIR)/../.. && \ if [[ "${ARCH}" == "aarch64" ]]; then python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ export AUDITWHEEL_PLAT=manylinux2014_${ARCH}; \ From 98d0387c601ec0d4f700047906b989db24f58104 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Wed, 7 Jun 2023 11:46:30 +0800 Subject: [PATCH 09/12] [GIE Compiler] refine codes according to review --- .../common/client/HttpExecutionClient.java | 11 ++++++----- .../client/channel/HostURIChannelFetcher.java | 7 ++++--- .../config/{HQPSConfig.java => HiactorConfig.java} | 10 +++++----- .../ir/proto/{hqps.proto => stored_procedure.proto} | 13 ++++++------- 4 files changed, 21 insertions(+), 20 deletions(-) rename interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/{HQPSConfig.java => HiactorConfig.java} (71%) rename interactive_engine/executor/ir/proto/{hqps.proto => stored_procedure.proto} (63%) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index 9c1be4fe6336..b9bb41df1a7f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -20,9 +20,9 @@ import com.alibaba.graphscope.common.client.type.ExecutionRequest; import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; -import com.alibaba.graphscope.common.config.HQPSConfig; -import com.alibaba.graphscope.gaia.proto.Hqps; +import com.alibaba.graphscope.common.config.HiactorConfig; import com.alibaba.graphscope.gaia.proto.IrResult; +import com.alibaba.graphscope.gaia.proto.StoredProcedure; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; @@ -51,7 +51,7 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher channelFetch this.httpClient = HttpClient.newBuilder() .connectTimeout( - Duration.ofMillis(HQPSConfig.HQPS_HTTP_TIMEOUT.get(graphConfig))) + Duration.ofMillis(HiactorConfig.HIACTOR_TIMEOUT.get(graphConfig))) .build(); } @@ -77,8 +77,9 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) listener.onError(exception); } try { - Hqps.HighQPSResults results = - Hqps.HighQPSResults.parseFrom(bytes.body()); + StoredProcedure.StoredProcedureResults results = + StoredProcedure.StoredProcedureResults + .parseFrom(bytes.body()); for (IrResult.Results irResult : results.getResultsList()) { listener.onNext(irResult.getRecord()); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java index 89b101e7a0e9..dade9e1cfad5 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java @@ -17,7 +17,7 @@ package com.alibaba.graphscope.common.client.channel; import com.alibaba.graphscope.common.config.Configs; -import com.alibaba.graphscope.common.config.HQPSConfig; +import com.alibaba.graphscope.common.config.HiactorConfig; import java.net.URI; import java.util.Arrays; @@ -28,6 +28,7 @@ * http implementation of {@link ChannelFetcher}, init http from local config */ public class HostURIChannelFetcher implements ChannelFetcher { + private static final String schema = "http"; private Configs graphConfig; public HostURIChannelFetcher(Configs graphConfig) { @@ -36,10 +37,10 @@ public HostURIChannelFetcher(Configs graphConfig) { @Override public List fetch() { - String hosts = HQPSConfig.HQPS_URIS.get(graphConfig); + String hosts = HiactorConfig.HIACTOR_HOSTS.get(graphConfig); String[] hostsArr = hosts.split(","); return Arrays.asList(hostsArr).stream() - .map(k -> URI.create(k)) + .map(k -> URI.create(schema + "://" + k)) .collect(Collectors.toList()); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HiactorConfig.java similarity index 71% rename from interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java rename to interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HiactorConfig.java index 971f7a3f7e7e..14a6326d1389 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HQPSConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/HiactorConfig.java @@ -16,10 +16,10 @@ package com.alibaba.graphscope.common.config; -public class HQPSConfig { - public static final Config HQPS_URIS = - Config.stringConfig("hqps.uris", "http://localhost:8080"); +public class HiactorConfig { + public static final Config HIACTOR_HOSTS = + Config.stringConfig("hiactor.hosts", "localhost:8080"); - public static final Config HQPS_HTTP_TIMEOUT = - Config.longConfig("hqps.http.timeout", 6000000L); + public static final Config HIACTOR_TIMEOUT = + Config.longConfig("hiactor.timeout", 6000000L); } diff --git a/interactive_engine/executor/ir/proto/hqps.proto b/interactive_engine/executor/ir/proto/stored_procedure.proto similarity index 63% rename from interactive_engine/executor/ir/proto/hqps.proto rename to interactive_engine/executor/ir/proto/stored_procedure.proto index cb3b341a0c3c..6130847cfb65 100644 --- a/interactive_engine/executor/ir/proto/hqps.proto +++ b/interactive_engine/executor/ir/proto/stored_procedure.proto @@ -1,24 +1,23 @@ syntax = "proto3"; -package hqps; +package hiactor; option java_package = "com.alibaba.graphscope.gaia.proto"; -option java_outer_classname = "Hqps"; +option java_outer_classname = "StoredProcedure"; import "common.proto"; import "results.proto"; -message Argument { +message StoredProcedureArgument { string param_name = 1; // param name int32 param_ind = 2; // index of param common.Value value = 3; // real value } -message HighQPSQuery { +message StoredProcedureQuery { common.NameOrId query_name = 1; - repeated Argument arguments = 2; + repeated StoredProcedureArgument arguments = 2; } -message HighQPSResults { +message StoredProcedureResults { // may be add schema here. repeated results.Results results = 1; } - From 664943823671a21efc53dc7a2d34a61a700dbcbe Mon Sep 17 00:00:00 2001 From: shirly121 Date: Wed, 7 Jun 2023 15:02:40 +0800 Subject: [PATCH 10/12] minor fix --- .../common/client/HttpExecutionClient.java | 16 +++++++++++++++- .../common/client/type/ExecutionRequest.java | 13 ++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index b9bb41df1a7f..be5d96629ea2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.HiactorConfig; +import com.alibaba.graphscope.common.ir.tools.LogicalPlan; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gaia.proto.StoredProcedure; import com.google.common.collect.Lists; @@ -44,6 +45,8 @@ public class HttpExecutionClient extends ExecutionClient { private static final Logger logger = LoggerFactory.getLogger(HttpExecutionClient.class); private static final String CONTENT_TYPE = "Content-Type"; private static final String TEXT_PLAIN = "text/plain;charset=UTF-8"; + private static final String INTERACTIVE_QUERY_PATH = "/interactive/query"; + private static final String INTERACTIVE_ADHOC_QUERY_PATH = "/interactive/adhoc_query"; private final HttpClient httpClient; public HttpExecutionClient(Configs graphConfig, ChannelFetcher channelFetcher) { @@ -62,7 +65,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) for (URI httpURI : channelFetcher.fetch()) { HttpRequest httpRequest = HttpRequest.newBuilder() - .uri(httpURI) + .uri(resolvePath(httpURI, request)) .headers(CONTENT_TYPE, TEXT_PLAIN) .POST( HttpRequest.BodyPublishers.ofByteArray( @@ -110,6 +113,17 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) }); } + private URI resolvePath(URI original, ExecutionRequest request) { + LogicalPlan logicalPlan = request.getRequestLogical(); + if (logicalPlan.getRegularQuery() != null) { + return original.resolve(INTERACTIVE_ADHOC_QUERY_PATH); + } else if (logicalPlan.getProcedureCall() != null) { + return original.resolve(INTERACTIVE_QUERY_PATH); + } else { + throw new IllegalArgumentException("the request can not be sent to the remote service"); + } + } + @Override public void close() throws Exception {} } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java index 5321c2515f4d..ed459a3c433d 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionRequest.java @@ -17,6 +17,7 @@ package com.alibaba.graphscope.common.client.type; import com.alibaba.graphscope.common.ir.runtime.PhysicalBuilder; +import com.alibaba.graphscope.common.ir.tools.LogicalPlan; /** * request to submit to remote engine service @@ -24,11 +25,17 @@ public class ExecutionRequest { private final long requestId; private final String requestName; + private final LogicalPlan requestLogical; private final PhysicalBuilder requestPhysical; - public ExecutionRequest(long requestId, String requestName, PhysicalBuilder requestPhysical) { + public ExecutionRequest( + long requestId, + String requestName, + LogicalPlan requestLogical, + PhysicalBuilder requestPhysical) { this.requestId = requestId; this.requestName = requestName; + this.requestLogical = requestLogical; this.requestPhysical = requestPhysical; } @@ -40,6 +47,10 @@ public String getRequestName() { return requestName; } + public LogicalPlan getRequestLogical() { + return requestLogical; + } + public PhysicalBuilder getRequestPhysical() { return requestPhysical; } From b84ac725c16f4b776bf34df124989bee9236f743 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Wed, 7 Jun 2023 16:17:38 +0800 Subject: [PATCH 11/12] [GIE Compiler] update stored_procedure.proto --- .../common/client/HttpExecutionClient.java | 6 ++-- .../executor/ir/proto/results.proto | 7 ++++- .../executor/ir/proto/stored_procedure.proto | 30 ++++++++++++------- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index be5d96629ea2..3ada4ec37319 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -23,7 +23,6 @@ import com.alibaba.graphscope.common.config.HiactorConfig; import com.alibaba.graphscope.common.ir.tools.LogicalPlan; import com.alibaba.graphscope.gaia.proto.IrResult; -import com.alibaba.graphscope.gaia.proto.StoredProcedure; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; @@ -80,9 +79,8 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) listener.onError(exception); } try { - StoredProcedure.StoredProcedureResults results = - StoredProcedure.StoredProcedureResults - .parseFrom(bytes.body()); + IrResult.HiactorResults results = + IrResult.HiactorResults.parseFrom(bytes.body()); for (IrResult.Results irResult : results.getResultsList()) { listener.onNext(irResult.getRecord()); diff --git a/interactive_engine/executor/ir/proto/results.proto b/interactive_engine/executor/ir/proto/results.proto index 5fdba39f0a23..5643c6c31100 100644 --- a/interactive_engine/executor/ir/proto/results.proto +++ b/interactive_engine/executor/ir/proto/results.proto @@ -85,4 +85,9 @@ message Results { oneof inner { Record record = 1; } -} \ No newline at end of file +} + +message HiactorResults { + // may be add schema here. + repeated Results results = 1; +} diff --git a/interactive_engine/executor/ir/proto/stored_procedure.proto b/interactive_engine/executor/ir/proto/stored_procedure.proto index 6130847cfb65..fb0a44dbc71a 100644 --- a/interactive_engine/executor/ir/proto/stored_procedure.proto +++ b/interactive_engine/executor/ir/proto/stored_procedure.proto @@ -1,23 +1,33 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + syntax = "proto3"; -package hiactor; +package query; option java_package = "com.alibaba.graphscope.gaia.proto"; option java_outer_classname = "StoredProcedure"; import "common.proto"; -import "results.proto"; -message StoredProcedureArgument { +message Argument { string param_name = 1; // param name int32 param_ind = 2; // index of param common.Value value = 3; // real value } -message StoredProcedureQuery { +message Query { common.NameOrId query_name = 1; - repeated StoredProcedureArgument arguments = 2; -} - -message StoredProcedureResults { - // may be add schema here. - repeated results.Results results = 1; + repeated Argument arguments = 2; } From 9f41c3e6181fcca7bb98b2d7f3fdcee4ba46e6dd Mon Sep 17 00:00:00 2001 From: shirly121 Date: Wed, 7 Jun 2023 17:06:44 +0800 Subject: [PATCH 12/12] minor fix --- .../graphscope/common/client/HttpExecutionClient.java | 5 +++-- interactive_engine/executor/ir/proto/results.proto | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index 3ada4ec37319..feaece1e25be 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -79,8 +79,9 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener) listener.onError(exception); } try { - IrResult.HiactorResults results = - IrResult.HiactorResults.parseFrom(bytes.body()); + IrResult.CollectiveResults results = + IrResult.CollectiveResults.parseFrom( + bytes.body()); for (IrResult.Results irResult : results.getResultsList()) { listener.onNext(irResult.getRecord()); diff --git a/interactive_engine/executor/ir/proto/results.proto b/interactive_engine/executor/ir/proto/results.proto index 5643c6c31100..7c032a6f25d6 100644 --- a/interactive_engine/executor/ir/proto/results.proto +++ b/interactive_engine/executor/ir/proto/results.proto @@ -87,7 +87,7 @@ message Results { } } -message HiactorResults { +message CollectiveResults { // may be add schema here. repeated Results results = 1; }