()
+ {
+ @Override
+ public void onSuccess(@Nullable Integer result)
+ {
+ lastUpdateNanos.set(System.nanoTime());
+ requestInflight.compareAndSet(true, false);
+ if (result != null) {
+ nodeState.set(Optional.of(NodeState.valueOf(result)));
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ lastUpdateNanos.set(System.nanoTime());
+ requestInflight.compareAndSet(true, false);
+ }
+ }, directExecutor());
+ }
+ }
+}
diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java
index 516cb9f437794..134aa6ba6fcbd 100644
--- a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java
+++ b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java
@@ -14,12 +14,15 @@
package com.facebook.presto.operator;
import com.facebook.airlift.http.client.HttpClient;
+import com.facebook.drift.client.DriftClient;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.PageCodecMarker;
import com.facebook.presto.execution.buffer.SerializedPage;
import com.facebook.presto.memory.context.LocalMemoryContext;
-import com.facebook.presto.operator.HttpPageBufferClient.ClientCallback;
+import com.facebook.presto.operator.PageBufferClient.ClientCallback;
import com.facebook.presto.operator.WorkProcessor.ProcessState;
+import com.facebook.presto.server.thrift.ThriftTaskClient;
+import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -37,6 +40,7 @@
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -46,6 +50,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -60,7 +65,7 @@
/**
* {@link ExchangeClient} is the client on receiver side, used in operators requiring data exchange from other tasks,
* such as {@link ExchangeOperator} and {@link MergeOperator}.
- * For each sender that ExchangeClient receives data from, a {@link HttpPageBufferClient} is used in ExchangeClient to communicate with the sender, i.e.
+ * For each sender that ExchangeClient receives data from, a {@link PageBufferClient} is used in ExchangeClient to communicate with the sender, i.e.
*
*
* / HttpPageBufferClient_1 - - - Remote Source 1
@@ -82,20 +87,21 @@ public class ExchangeClient
private final Duration maxErrorDuration;
private final boolean acknowledgePages;
private final HttpClient httpClient;
+ private final DriftClient driftClient;
private final ScheduledExecutorService scheduler;
@GuardedBy("this")
private boolean noMoreLocations;
- private final ConcurrentMap allClients = new ConcurrentHashMap<>();
+ private final ConcurrentMap allClients = new ConcurrentHashMap<>();
private final ConcurrentMap taskIdToLocationMap = new ConcurrentHashMap<>();
private final Set removedRemoteSourceTaskIds = ConcurrentHashMap.newKeySet();
@GuardedBy("this")
- private final Deque queuedClients = new LinkedList<>();
+ private final Deque queuedClients = new LinkedList<>();
- private final Set completedClients = newConcurrentHashSet();
- private final Set removedClients = newConcurrentHashSet();
+ private final Set completedClients = newConcurrentHashSet();
+ private final Set removedClients = newConcurrentHashSet();
private final LinkedBlockingDeque pageBuffer = new LinkedBlockingDeque<>();
@GuardedBy("this")
@@ -126,6 +132,7 @@ public ExchangeClient(
boolean acknowledgePages,
double responseSizeExponentialMovingAverageDecayingAlpha,
HttpClient httpClient,
+ DriftClient driftClient,
ScheduledExecutorService scheduler,
LocalMemoryContext systemMemoryContext,
Executor pageBufferClientCallbackExecutor)
@@ -137,6 +144,7 @@ public ExchangeClient(
this.maxErrorDuration = maxErrorDuration;
this.acknowledgePages = acknowledgePages;
this.httpClient = httpClient;
+ this.driftClient = driftClient;
this.scheduler = scheduler;
this.systemMemoryContext = systemMemoryContext;
this.maxBufferRetainedSizeInBytes = Long.MIN_VALUE;
@@ -150,7 +158,7 @@ public ExchangeClientStatus getStatus()
// It does not guarantee a consistent view between different exchange clients.
// Guaranteeing a consistent view introduces significant lock contention.
ImmutableList.Builder pageBufferClientStatusBuilder = ImmutableList.builder();
- for (HttpPageBufferClient client : allClients.values()) {
+ for (PageBufferClient client : allClients.values()) {
pageBufferClientStatusBuilder.add(client.getStatus());
}
List pageBufferClientStatus = pageBufferClientStatusBuilder.build();
@@ -185,8 +193,21 @@ public synchronized void addLocation(URI location, TaskId remoteSourceTaskId)
checkState(!noMoreLocations, "No more locations already set");
- HttpPageBufferClient client = new HttpPageBufferClient(
- httpClient,
+ RpcShuffleClient resultClient;
+ switch (location.getScheme().toLowerCase(Locale.ENGLISH)) {
+ case "http":
+ case "https":
+ resultClient = new HttpRpcShuffleClient(httpClient, location);
+ break;
+ case "thrift":
+ resultClient = new ThriftRpcShuffleClient(driftClient, location);
+ break;
+ default:
+ throw new PrestoException(GENERIC_INTERNAL_ERROR, "unsupported task result client scheme " + location.getScheme());
+ }
+
+ PageBufferClient client = new PageBufferClient(
+ resultClient,
maxErrorDuration,
acknowledgePages,
location,
@@ -216,7 +237,7 @@ public synchronized void removeRemoteSource(TaskId sourceTaskId)
return;
}
- HttpPageBufferClient client = allClients.get(location);
+ PageBufferClient client = allClients.get(location);
if (client == null) {
return;
}
@@ -318,7 +339,7 @@ public synchronized void close()
return;
}
- for (HttpPageBufferClient client : allClients.values()) {
+ for (PageBufferClient client : allClients.values()) {
closeQuietly(client);
}
pageBuffer.clear();
@@ -360,7 +381,7 @@ public synchronized void scheduleRequestIfNecessary()
clientCount -= pendingClients;
for (int i = 0; i < clientCount; ) {
- HttpPageBufferClient client = queuedClients.poll();
+ PageBufferClient client = queuedClients.poll();
if (client == null) {
// no more clients available
return;
@@ -426,7 +447,7 @@ private synchronized void notifyBlockedCallers()
}
}
- private synchronized void requestComplete(HttpPageBufferClient client)
+ private synchronized void requestComplete(PageBufferClient client)
{
if (!queuedClients.contains(client)) {
queuedClients.add(client);
@@ -434,14 +455,14 @@ private synchronized void requestComplete(HttpPageBufferClient client)
scheduleRequestIfNecessary();
}
- private synchronized void clientFinished(HttpPageBufferClient client)
+ private synchronized void clientFinished(PageBufferClient client)
{
requireNonNull(client, "client is null");
completedClients.add(client);
scheduleRequestIfNecessary();
}
- private synchronized void clientFailed(HttpPageBufferClient client, Throwable cause)
+ private synchronized void clientFailed(PageBufferClient client, Throwable cause)
{
// ignore failure for removed clients
if (removedClients.contains(client)) {
@@ -474,7 +495,7 @@ private class ExchangeClientCallback
implements ClientCallback
{
@Override
- public boolean addPages(HttpPageBufferClient client, List pages)
+ public boolean addPages(PageBufferClient client, List pages)
{
requireNonNull(client, "client is null");
requireNonNull(pages, "pages is null");
@@ -482,20 +503,20 @@ public boolean addPages(HttpPageBufferClient client, List pages)
}
@Override
- public void requestComplete(HttpPageBufferClient client)
+ public void requestComplete(PageBufferClient client)
{
requireNonNull(client, "client is null");
ExchangeClient.this.requestComplete(client);
}
@Override
- public void clientFinished(HttpPageBufferClient client)
+ public void clientFinished(PageBufferClient client)
{
ExchangeClient.this.clientFinished(client);
}
@Override
- public void clientFailed(HttpPageBufferClient client, Throwable cause)
+ public void clientFailed(PageBufferClient client, Throwable cause)
{
requireNonNull(client, "client is null");
requireNonNull(cause, "cause is null");
@@ -504,7 +525,7 @@ public void clientFailed(HttpPageBufferClient client, Throwable cause)
}
}
- private static void closeQuietly(HttpPageBufferClient client)
+ private static void closeQuietly(PageBufferClient client)
{
try {
client.close();
diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java
index d3950865d0280..e4dc7032c010f 100644
--- a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java
+++ b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java
@@ -15,7 +15,9 @@
import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.http.client.HttpClient;
+import com.facebook.drift.client.DriftClient;
import com.facebook.presto.memory.context.LocalMemoryContext;
+import com.facebook.presto.server.thrift.ThriftTaskClient;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
@@ -41,6 +43,7 @@ public class ExchangeClientFactory
private final int concurrentRequestMultiplier;
private final Duration maxErrorDuration;
private final HttpClient httpClient;
+ private final DriftClient driftClient;
private final DataSize maxResponseSize;
private final boolean acknowledgePages;
private final double responseSizeExponentialMovingAverageDecayingAlpha;
@@ -52,6 +55,7 @@ public class ExchangeClientFactory
public ExchangeClientFactory(
ExchangeClientConfig config,
@ForExchange HttpClient httpClient,
+ @ForExchange DriftClient driftClient,
@ForExchange ScheduledExecutorService scheduler)
{
this(
@@ -63,6 +67,7 @@ public ExchangeClientFactory(
config.getPageBufferClientMaxCallbackThreads(),
config.getResponseSizeExponentialMovingAverageDecayingAlpha(),
httpClient,
+ driftClient,
scheduler);
}
@@ -75,6 +80,7 @@ public ExchangeClientFactory(
int pageBufferClientMaxCallbackThreads,
double responseSizeExponentialMovingAverageDecayingAlpha,
HttpClient httpClient,
+ DriftClient driftClient,
ScheduledExecutorService scheduler)
{
this.maxBufferedBytes = requireNonNull(maxBufferedBytes, "maxBufferedBytes is null");
@@ -82,6 +88,7 @@ public ExchangeClientFactory(
this.maxErrorDuration = requireNonNull(maxErrorDuration, "maxErrorDuration is null");
this.acknowledgePages = acknowledgePages;
this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.driftClient = requireNonNull(driftClient, "driftClient is null");
// Use only 0.75 of the maxResponseSize to leave room for additional bytes from the encoding
// TODO figure out a better way to compute the size of data that will be transferred over the network
@@ -126,6 +133,7 @@ public ExchangeClient get(LocalMemoryContext systemMemoryContext)
acknowledgePages,
responseSizeExponentialMovingAverageDecayingAlpha,
httpClient,
+ driftClient,
scheduler,
systemMemoryContext,
pageBufferClientCallbackExecutor);
diff --git a/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java b/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java
new file mode 100644
index 0000000000000..788afc5d71921
--- /dev/null
+++ b/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.operator;
+
+import com.facebook.airlift.http.client.HttpClient;
+import com.facebook.airlift.http.client.HttpStatus;
+import com.facebook.airlift.http.client.Request;
+import com.facebook.airlift.http.client.Response;
+import com.facebook.airlift.http.client.ResponseHandler;
+import com.facebook.airlift.http.client.ResponseTooLargeException;
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.execution.buffer.SerializedPage;
+import com.facebook.presto.operator.PageBufferClient.PagesResponse;
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.MediaType;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.slice.InputStreamSliceInput;
+import io.airlift.slice.SliceInput;
+import io.airlift.units.DataSize;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.List;
+
+import static com.facebook.airlift.http.client.HttpStatus.familyForStatusCode;
+import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
+import static com.facebook.airlift.http.client.Request.Builder.prepareDelete;
+import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
+import static com.facebook.airlift.http.client.ResponseHandlerUtils.propagate;
+import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
+import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES_TYPE;
+import static com.facebook.presto.client.PrestoHeaders.PRESTO_BUFFER_COMPLETE;
+import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_SIZE;
+import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN;
+import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN;
+import static com.facebook.presto.client.PrestoHeaders.PRESTO_TASK_INSTANCE_ID;
+import static com.facebook.presto.execution.buffer.PagesSerdeUtil.readSerializedPages;
+import static com.facebook.presto.operator.PageBufferClient.PagesResponse.createEmptyPagesResponse;
+import static com.facebook.presto.operator.PageBufferClient.PagesResponse.createPagesResponse;
+import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+@ThreadSafe
+public final class HttpRpcShuffleClient
+ implements RpcShuffleClient
+{
+ private static final Logger log = Logger.get(HttpRpcShuffleClient.class);
+
+ private final HttpClient httpClient;
+ private final URI location;
+
+ public HttpRpcShuffleClient(HttpClient httpClient, URI location)
+ {
+ this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.location = requireNonNull(location, "location is null");
+ }
+
+ @Override
+ public ListenableFuture getResults(long token, DataSize maxResponseSize)
+ {
+ URI uri = uriBuilderFrom(location).appendPath(String.valueOf(token)).build();
+ return httpClient.executeAsync(
+ prepareGet()
+ .setHeader(PRESTO_MAX_SIZE, maxResponseSize.toString())
+ .setUri(uri).build(),
+ new PageResponseHandler());
+ }
+
+ @Override
+ public void acknowledgeResultsAsync(long nextToken)
+ {
+ URI uri = uriBuilderFrom(location).appendPath(String.valueOf(nextToken)).appendPath("acknowledge").build();
+ httpClient.executeAsync(prepareGet().setUri(uri).build(), new ResponseHandler()
+ {
+ @Override
+ public Void handleException(Request request, Exception exception)
+ {
+ log.debug(exception, "Acknowledge request failed: %s", uri);
+ return null;
+ }
+
+ @Override
+ public Void handle(Request request, Response response)
+ {
+ if (familyForStatusCode(response.getStatusCode()) != HttpStatus.Family.SUCCESSFUL) {
+ log.debug("Unexpected acknowledge response code: %s", response.getStatusCode());
+ }
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture> abortResults()
+ {
+ return httpClient.executeAsync(prepareDelete().setUri(location).build(), createStatusResponseHandler());
+ }
+
+ @Override
+ public Throwable rewriteException(Throwable throwable)
+ {
+ if (throwable instanceof ResponseTooLargeException) {
+ return new PageTooLargeException(throwable);
+ }
+ return throwable;
+ }
+
+ public static class PageResponseHandler
+ implements ResponseHandler
+ {
+ @Override
+ public PagesResponse handleException(Request request, Exception exception)
+ {
+ throw propagate(request, exception);
+ }
+
+ @Override
+ public PagesResponse handle(Request request, Response response)
+ {
+ try {
+ // no content means no content was created within the wait period, but query is still ok
+ // if job is finished, complete is set in the response
+ if (response.getStatusCode() == HttpStatus.NO_CONTENT.code()) {
+ return createEmptyPagesResponse(getTaskInstanceId(response), getToken(response), getNextToken(response), getComplete(response));
+ }
+
+ // otherwise we must have gotten an OK response, everything else is considered fatal
+ if (response.getStatusCode() != HttpStatus.OK.code()) {
+ StringBuilder body = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream(), UTF_8))) {
+ // Get up to 1000 lines for debugging
+ for (int i = 0; i < 1000; i++) {
+ String line = reader.readLine();
+ // Don't output more than 100KB
+ if (line == null || body.length() + line.length() > 100 * 1024) {
+ break;
+ }
+ body.append(line + "\n");
+ }
+ }
+ catch (RuntimeException | IOException e) {
+ // Ignored. Just return whatever message we were able to decode
+ }
+ throw new PageTransportErrorException(format("Expected response code to be 200, but was %s %s:%n%s", response.getStatusCode(), response.getStatusMessage(), body.toString()));
+ }
+
+ // invalid content type can happen when an error page is returned, but is unlikely given the above 200
+ String contentType = response.getHeader(CONTENT_TYPE);
+ if (contentType == null) {
+ throw new PageTransportErrorException(format("%s header is not set: %s", CONTENT_TYPE, response));
+ }
+ if (!mediaTypeMatches(contentType, PRESTO_PAGES_TYPE)) {
+ throw new PageTransportErrorException(format("Expected %s response from server but got %s", PRESTO_PAGES_TYPE, contentType));
+ }
+
+ String taskInstanceId = getTaskInstanceId(response);
+ long token = getToken(response);
+ long nextToken = getNextToken(response);
+ boolean complete = getComplete(response);
+
+ try (SliceInput input = new InputStreamSliceInput(response.getInputStream())) {
+ List pages = ImmutableList.copyOf(readSerializedPages(input));
+ return createPagesResponse(taskInstanceId, token, nextToken, pages, complete);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ catch (PageTransportErrorException e) {
+ throw new PageTransportErrorException("Error fetching " + request.getUri().toASCIIString(), e);
+ }
+ }
+
+ private static String getTaskInstanceId(Response response)
+ {
+ String taskInstanceId = response.getHeader(PRESTO_TASK_INSTANCE_ID);
+ if (taskInstanceId == null) {
+ throw new PageTransportErrorException(format("Expected %s header", PRESTO_TASK_INSTANCE_ID));
+ }
+ return taskInstanceId;
+ }
+
+ private static long getToken(Response response)
+ {
+ String tokenHeader = response.getHeader(PRESTO_PAGE_TOKEN);
+ if (tokenHeader == null) {
+ throw new PageTransportErrorException(format("Expected %s header", PRESTO_PAGE_TOKEN));
+ }
+ return Long.parseLong(tokenHeader);
+ }
+
+ private static long getNextToken(Response response)
+ {
+ String nextTokenHeader = response.getHeader(PRESTO_PAGE_NEXT_TOKEN);
+ if (nextTokenHeader == null) {
+ throw new PageTransportErrorException(format("Expected %s header", PRESTO_PAGE_NEXT_TOKEN));
+ }
+ return Long.parseLong(nextTokenHeader);
+ }
+
+ private static boolean getComplete(Response response)
+ {
+ String bufferComplete = response.getHeader(PRESTO_BUFFER_COMPLETE);
+ if (bufferComplete == null) {
+ throw new PageTransportErrorException(format("Expected %s header", PRESTO_BUFFER_COMPLETE));
+ }
+ return Boolean.parseBoolean(bufferComplete);
+ }
+
+ private static boolean mediaTypeMatches(String value, MediaType range)
+ {
+ try {
+ return MediaType.parse(value).is(range);
+ }
+ catch (IllegalArgumentException | IllegalStateException e) {
+ return false;
+ }
+ }
+ }
+}
diff --git a/presto-main/src/main/java/com/facebook/presto/operator/HttpPageBufferClient.java b/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java
similarity index 60%
rename from presto-main/src/main/java/com/facebook/presto/operator/HttpPageBufferClient.java
rename to presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java
index a5446adf15629..b3d1ee9cf148c 100644
--- a/presto-main/src/main/java/com/facebook/presto/operator/HttpPageBufferClient.java
+++ b/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java
@@ -13,25 +13,16 @@
*/
package com.facebook.presto.operator;
-import com.facebook.airlift.http.client.HttpClient;
-import com.facebook.airlift.http.client.HttpClient.HttpResponseFuture;
-import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.HttpUriBuilder;
-import com.facebook.airlift.http.client.Request;
-import com.facebook.airlift.http.client.Response;
-import com.facebook.airlift.http.client.ResponseHandler;
-import com.facebook.airlift.http.client.ResponseTooLargeException;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.buffer.SerializedPage;
import com.facebook.presto.server.remotetask.Backoff;
import com.facebook.presto.spi.PrestoException;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
-import com.google.common.net.MediaType;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import io.airlift.slice.InputStreamSliceInput;
-import io.airlift.slice.SliceInput;
+import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;
@@ -40,10 +31,7 @@
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
-import java.io.BufferedReader;
import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.OptionalInt;
@@ -54,21 +42,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import static com.facebook.airlift.http.client.HttpStatus.familyForStatusCode;
-import static com.facebook.airlift.http.client.Request.Builder.prepareDelete;
-import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
-import static com.facebook.airlift.http.client.ResponseHandlerUtils.propagate;
-import static com.facebook.airlift.http.client.StatusResponseHandler.StatusResponse;
-import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
-import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES_TYPE;
-import static com.facebook.presto.client.PrestoHeaders.PRESTO_BUFFER_COMPLETE;
-import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_SIZE;
-import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN;
-import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN;
-import static com.facebook.presto.client.PrestoHeaders.PRESTO_TASK_INSTANCE_ID;
-import static com.facebook.presto.execution.buffer.PagesSerdeUtil.readSerializedPages;
-import static com.facebook.presto.operator.HttpPageBufferClient.PagesResponse.createEmptyPagesResponse;
-import static com.facebook.presto.operator.HttpPageBufferClient.PagesResponse.createPagesResponse;
import static com.facebook.presto.spi.HostAddress.fromUri;
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_BUFFER_CLOSE_FAILED;
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_MISMATCH;
@@ -77,18 +50,16 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static java.lang.String.format;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@ThreadSafe
-public final class HttpPageBufferClient
+public final class PageBufferClient
implements Closeable
{
- private static final Logger log = Logger.get(HttpPageBufferClient.class);
+ private static final Logger log = Logger.get(PageBufferClient.class);
/**
* For each request, the addPage method will be called zero or more times,
@@ -100,16 +71,16 @@ public final class HttpPageBufferClient
*/
public interface ClientCallback
{
- boolean addPages(HttpPageBufferClient client, List pages);
+ boolean addPages(PageBufferClient client, List pages);
- void requestComplete(HttpPageBufferClient client);
+ void requestComplete(PageBufferClient client);
- void clientFinished(HttpPageBufferClient client);
+ void clientFinished(PageBufferClient client);
- void clientFailed(HttpPageBufferClient client, Throwable cause);
+ void clientFailed(PageBufferClient client, Throwable cause);
}
- private final HttpClient httpClient;
+ private final RpcShuffleClient resultClient;
private final boolean acknowledgePages;
private final URI location;
private final ClientCallback clientCallback;
@@ -119,7 +90,7 @@ public interface ClientCallback
@GuardedBy("this")
private boolean closed;
@GuardedBy("this")
- private HttpResponseFuture> future;
+ private ListenableFuture> future;
@GuardedBy("this")
private DateTime lastUpdate = DateTime.now();
@GuardedBy("this")
@@ -143,8 +114,8 @@ public interface ClientCallback
private final Executor pageBufferClientCallbackExecutor;
- public HttpPageBufferClient(
- HttpClient httpClient,
+ public PageBufferClient(
+ RpcShuffleClient resultClient,
Duration maxErrorDuration,
boolean acknowledgePages,
URI location,
@@ -152,11 +123,11 @@ public HttpPageBufferClient(
ScheduledExecutorService scheduler,
Executor pageBufferClientCallbackExecutor)
{
- this(httpClient, maxErrorDuration, acknowledgePages, location, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor);
+ this(resultClient, maxErrorDuration, acknowledgePages, location, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor);
}
- public HttpPageBufferClient(
- HttpClient httpClient,
+ public PageBufferClient(
+ RpcShuffleClient resultClient,
Duration maxErrorDuration,
boolean acknowledgePages,
URI location,
@@ -165,7 +136,7 @@ public HttpPageBufferClient(
Ticker ticker,
Executor pageBufferClientCallbackExecutor)
{
- this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.resultClient = requireNonNull(resultClient, "resultClient is null");
this.acknowledgePages = acknowledgePages;
this.location = requireNonNull(location, "location is null");
this.clientCallback = requireNonNull(clientCallback, "clientCallback is null");
@@ -194,10 +165,6 @@ else if (completed) {
else {
state = "queued";
}
- String httpRequestState = "not scheduled";
- if (future != null) {
- httpRequestState = future.getState();
- }
long rejectedRows = rowsRejected.get();
int rejectedPages = pagesRejected.get();
@@ -213,7 +180,7 @@ else if (completed) {
requestsScheduled.get(),
requestsCompleted.get(),
requestsFailed.get(),
- httpRequestState);
+ future == null ? "not scheduled" : "processing request");
}
public synchronized boolean isRunning()
@@ -265,7 +232,7 @@ public synchronized void scheduleRequest(DataSize maxResponseSize)
}
catch (Throwable t) {
// should not happen, but be safe and fail the operator
- clientCallback.clientFailed(HttpPageBufferClient.this, t);
+ clientCallback.clientFailed(PageBufferClient.this, t);
}
}, delayNanos, NANOSECONDS);
@@ -293,11 +260,7 @@ private synchronized void initiateRequest(DataSize maxResponseSize)
private synchronized void sendGetResults(DataSize maxResponseSize)
{
URI uri = HttpUriBuilder.uriBuilderFrom(location).appendPath(String.valueOf(token)).build();
- HttpResponseFuture resultFuture = httpClient.executeAsync(
- prepareGet()
- .setHeader(PRESTO_MAX_SIZE, maxResponseSize.toString())
- .setUri(uri).build(),
- new PageResponseHandler());
+ ListenableFuture resultFuture = resultClient.getResults(token, maxResponseSize);
future = resultFuture;
Futures.addCallback(resultFuture, new FutureCallback()
@@ -312,7 +275,7 @@ public void onSuccess(PagesResponse result)
List pages;
try {
boolean shouldAcknowledge = false;
- synchronized (HttpPageBufferClient.this) {
+ synchronized (PageBufferClient.this) {
if (taskInstanceId == null) {
taskInstanceId = result.getTaskInstanceId();
}
@@ -336,25 +299,7 @@ public void onSuccess(PagesResponse result)
// Acknowledge token without handling the response.
// The next request will also make sure the token is acknowledged.
// This is to fast release the pages on the buffer side.
- URI uri = HttpUriBuilder.uriBuilderFrom(location).appendPath(String.valueOf(result.getNextToken())).appendPath("acknowledge").build();
- httpClient.executeAsync(prepareGet().setUri(uri).build(), new ResponseHandler()
- {
- @Override
- public Void handleException(Request request, Exception exception)
- {
- log.debug(exception, "Acknowledge request failed: %s", uri);
- return null;
- }
-
- @Override
- public Void handle(Request request, Response response)
- {
- if (familyForStatusCode(response.getStatusCode()) != HttpStatus.Family.SUCCESSFUL) {
- log.debug("Unexpected acknowledge response code: %s", response.getStatusCode());
- }
- return null;
- }
- });
+ resultClient.acknowledgeResultsAsync(result.getNextToken());
}
}
catch (PrestoException e) {
@@ -367,7 +312,7 @@ public Void handle(Request request, Response response)
// clientCallback can keep stats of requests and responses. For example, it may
// keep track of how often a client returns empty response and adjust request
// frequency or buffer size.
- if (clientCallback.addPages(HttpPageBufferClient.this, pages)) {
+ if (clientCallback.addPages(PageBufferClient.this, pages)) {
pagesReceived.addAndGet(pages.size());
rowsReceived.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum());
}
@@ -376,7 +321,7 @@ public Void handle(Request request, Response response)
rowsRejected.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum());
}
- synchronized (HttpPageBufferClient.this) {
+ synchronized (PageBufferClient.this) {
// client is complete, acknowledge it by sending it a delete in the next request
if (result.isClientComplete()) {
completed = true;
@@ -387,7 +332,7 @@ public Void handle(Request request, Response response)
lastUpdate = DateTime.now();
}
requestsCompleted.incrementAndGet();
- clientCallback.requestComplete(HttpPageBufferClient.this);
+ clientCallback.requestComplete(PageBufferClient.this);
}
@Override
@@ -396,7 +341,7 @@ public void onFailure(Throwable t)
log.debug("Request to %s failed %s", uri, t);
checkNotHoldsLock(this);
- t = rewriteException(t);
+ t = resultClient.rewriteException(t);
if (!(t instanceof PrestoException) && backoff.failure()) {
String message = format("%s (%s - %s failures, failure duration %s, total failed request time %s)",
WORKER_NODE_ERROR,
@@ -413,16 +358,16 @@ public void onFailure(Throwable t)
private synchronized void sendDelete()
{
- HttpResponseFuture resultFuture = httpClient.executeAsync(prepareDelete().setUri(location).build(), createStatusResponseHandler());
+ ListenableFuture> resultFuture = resultClient.abortResults();
future = resultFuture;
- Futures.addCallback(resultFuture, new FutureCallback()
+ Futures.addCallback(resultFuture, new FutureCallback