Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GIE Compiler] Unify execution client which is used to send request to remote engine service #2818

Merged
merged 17 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/local-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ jobs:

- name: Build Wheels
run: |
# sleep infinity
sleep infinity
git submodule update --init
cd ${GITHUB_WORKSPACE}/learning_engine/graph-learn && git submodule update --init third_party/pybind11

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;

/**
* client to submit request to remote engine service
* @param <C>
*/
public abstract class ExecutionClient<C> {
protected final ChannelFetcher<C> channelFetcher;

public ExecutionClient(ChannelFetcher<C> channelFetcher) {
this.channelFetcher = channelFetcher;
}

public abstract void submit(ExecutionRequest request, ExecutionResponseListener listener)
throws Exception;

public abstract void close() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.HQPSConfig;
import com.alibaba.graphscope.gaia.proto.Hqps;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* http client to send request to hqps engine service
*/
public class HttpExecutionClient extends ExecutionClient<URI> {
private static final Logger logger = LoggerFactory.getLogger(HttpExecutionClient.class);
private static final String CONTENT_TYPE = "Content-Type";
private static final String TEXT_PLAIN = "text/plain;charset=UTF-8";
private final HttpClient httpClient;

public HttpExecutionClient(Configs graphConfig, ChannelFetcher<URI> channelFetcher) {
super(channelFetcher);
this.httpClient =
HttpClient.newBuilder()
.connectTimeout(
Duration.ofMillis(HQPSConfig.HQPS_HTTP_TIMEOUT.get(graphConfig)))
.build();
}

@Override
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
throws Exception {
List<CompletableFuture> responseFutures = Lists.newArrayList();
for (URI httpURI : channelFetcher.fetch()) {
HttpRequest httpRequest =
HttpRequest.newBuilder()
.uri(httpURI)
.headers(CONTENT_TYPE, TEXT_PLAIN)
.POST(
HttpRequest.BodyPublishers.ofByteArray(
(byte[]) request.getRequestPhysical().build()))
.build();
CompletableFuture<HttpResponse<byte[]>> responseFuture =
httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.whenComplete(
(bytes, exception) -> {
if (exception != null) {
listener.onError(exception);
}
try {
Hqps.HighQPSResults results =
Hqps.HighQPSResults.parseFrom(bytes.body());
for (IrResult.Results irResult :
results.getResultsList()) {
listener.onNext(irResult.getRecord());
}
} catch (InvalidProtocolBufferException e) {
listener.onError(e);
}
});
responseFutures.add(responseFuture);
}
CompletableFuture<Void> joinFuture =
CompletableFuture.runAsync(
() -> {
try {
CompletableFuture.allOf(
responseFutures.toArray(new CompletableFuture[0]))
.get();
listener.onCompleted();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
joinFuture.whenComplete(
(aVoid, exception) -> {
if (exception != null) {
listener.onError(exception);
}
});
}

@Override
public void close() throws Exception {}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;

import io.grpc.Status;

/**
* rpc client to send request to pegasus engine service
*/
public class RpcExecutionClient extends ExecutionClient<RpcChannel> {
private final Configs graphConfig;
private final RpcClient rpcClient;

public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channelFetcher) {
super(channelFetcher);
this.graphConfig = graphConfig;
this.rpcClient =
new RpcClient(
PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(graphConfig),
channelFetcher.fetch());
}

@Override
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
throws Exception {
PegasusClient.JobRequest jobRequest =
PegasusClient.JobRequest.parseFrom((byte[]) request.getRequestPhysical().build());
PegasusClient.JobConfig jobConfig =
PegasusClient.JobConfig.newBuilder()
.setJobId(request.getRequestId())
.setJobName(request.getRequestName())
.setWorkers(PegasusConfig.PEGASUS_WORKER_NUM.get(graphConfig))
.setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(graphConfig))
.setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(graphConfig))
.setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(graphConfig))
.setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(graphConfig))
.setAll(
com.alibaba.pegasus.service.protocol.PegasusClient.Empty
.newBuilder()
.build())
.build();
jobRequest = jobRequest.toBuilder().setConf(jobConfig).build();
this.rpcClient.submit(
jobRequest,
new ResultProcessor() {
@Override
public void process(PegasusClient.JobResponse jobResponse) {
try {
listener.onNext(
IrResult.Results.parseFrom(jobResponse.getResp()).getRecord());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void finish() {
listener.onCompleted();
}

@Override
public void error(Status status) {
listener.onError(status.asException());
}
});
}

@Override
public void close() throws Exception {
if (rpcClient != null) {
this.rpcClient.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* limitations under the License.
*/

package com.alibaba.graphscope.common.client;

import com.alibaba.pegasus.RpcChannel;
package com.alibaba.graphscope.common.client.channel;

import java.util.List;

public interface RpcChannelFetcher {
List<RpcChannel> fetch();

// dynamic channel need update with the changes of host access url
boolean isDynamic();
/**
* fetch channel from remote
* @param <T>
*/
public interface ChannelFetcher<T> {
List<T> fetch();
}
Loading