From 4a1a71ce1450241fdd6e0d31bcf451e31e9befdb Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Wed, 24 Jan 2024 08:56:37 -0800 Subject: [PATCH 1/2] Expose getResults metrics in AsyncPageTransportServlet Same as in TaskResource --- .../server/AsyncPageTransportServlet.java | 25 +++++++++++++++++++ .../presto/server/ServerMainModule.java | 2 ++ 2 files changed, 27 insertions(+) diff --git a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java index 81d3a670c90d6..7b70388025ebd 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java +++ b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java @@ -15,6 +15,7 @@ import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.airlift.log.Logger; +import com.facebook.airlift.stats.TimeStat; import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.TaskManager; import com.facebook.presto.execution.buffer.BufferResult; @@ -26,6 +27,8 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; import javax.annotation.security.RolesAllowed; import javax.inject.Inject; @@ -56,6 +59,7 @@ import static com.google.common.net.HttpHeaders.CONTENT_LENGTH; import static com.google.common.net.HttpHeaders.CONTENT_TYPE; import static com.google.common.util.concurrent.Futures.addCallback; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.lang.Long.parseLong; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -74,6 +78,9 @@ public class AsyncPageTransportServlet private final Executor responseExecutor; private final ScheduledExecutorService timeoutExecutor; + private final TimeStat readFromOutputBufferTime = new TimeStat(); + private final TimeStat resultsRequestTime = new TimeStat(); + @Inject public AsyncPageTransportServlet( TaskManager taskManager, @@ -150,6 +157,7 @@ protected void processRequest( HttpServletRequest request, HttpServletResponse response) throws IOException { + long start = System.nanoTime(); DataSize maxSize = DataSize.valueOf(request.getHeader(PRESTO_MAX_SIZE)); AsyncContext asyncContext = request.startAsync(request, response); @@ -162,6 +170,7 @@ protected void processRequest( { public void onComplete(AsyncEvent event) { + resultsRequestTime.add(Duration.nanosSince(start)); } public void onError(AsyncEvent event) @@ -192,6 +201,8 @@ public void onTimeout(AsyncEvent event) waitTime, timeoutExecutor); + bufferResultFuture.addListener(() -> readFromOutputBufferTime.add(Duration.nanosSince(start)), directExecutor()); + ServletOutputStream out = response.getOutputStream(); addCallback(bufferResultFuture, new FutureCallback() { @@ -239,4 +250,18 @@ public void onFailure(Throwable thrown) public void destroy() { } + + @Managed + @Nested + public TimeStat getReadFromOutputBufferTime() + { + return readFromOutputBufferTime; + } + + @Managed + @Nested + public TimeStat getResultsRequestTime() + { + return resultsRequestTime; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 31bb1a15a12ac..9bd3ebb57c571 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -760,6 +760,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon driftServerBinder(binder).bindService(ThriftServerInfoService.class); // Async page transport + binder.bind(AsyncPageTransportServlet.class).in(Scopes.SINGLETON); + newExporter(binder).export(AsyncPageTransportServlet.class).withGeneratedName(); newMapBinder(binder, String.class, Servlet.class, TheServlet.class) .addBinding("/v1/task/async/*") .to(AsyncPageTransportServlet.class) From 16f963c77bcd5cf83f2b27bc17e56e8b68476b69 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Wed, 24 Jan 2024 08:59:42 -0800 Subject: [PATCH 2/2] Remove blocking implementation of /v1/task//results Point the original endpoint to /v1/task/async --- .../presto/operator/ExchangeClient.java | 22 +---- .../presto/operator/ExchangeClientConfig.java | 15 +-- .../operator/ExchangeClientFactory.java | 5 - .../presto/operator/HttpRpcShuffleClient.java | 11 +-- .../presto/operator/PageBufferClient.java | 10 +- .../AsyncPageTransportForwardFilter.java | 72 ++++++++++++++ .../presto/server/PagesResponseWriter.java | 95 ------------------ .../presto/server/ServerMainModule.java | 4 +- .../facebook/presto/server/TaskResource.java | 96 ------------------- .../presto/operator/TestExchangeClient.java | 1 - .../operator/TestExchangeClientConfig.java | 7 +- .../presto/operator/TestExchangeOperator.java | 1 - .../presto/operator/TestPageBufferClient.java | 6 -- .../TestAsyncPageTransportQueries.java | 58 ----------- 14 files changed, 83 insertions(+), 320 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportForwardFilter.java delete mode 100644 presto-main/src/main/java/com/facebook/presto/server/PagesResponseWriter.java delete mode 100644 presto-tests/src/test/java/com/facebook/presto/execution/TestAsyncPageTransportQueries.java diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java index f253da3caa147..65b4d67907c7e 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClient.java @@ -14,7 +14,6 @@ package com.facebook.presto.operator; import com.facebook.airlift.http.client.HttpClient; -import com.facebook.airlift.http.client.HttpUriBuilder; import com.facebook.drift.client.DriftClient; import com.facebook.presto.execution.TaskId; import com.facebook.presto.memory.context.LocalMemoryContext; @@ -41,7 +40,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -91,7 +89,6 @@ public class ExchangeClient private final HttpClient httpClient; private final DriftClient driftClient; private final ScheduledExecutorService scheduler; - private boolean asyncPageTransportEnabled; @GuardedBy("this") private boolean noMoreLocations; @@ -133,7 +130,6 @@ public ExchangeClient( int concurrentRequestMultiplier, Duration maxErrorDuration, boolean acknowledgePages, - boolean asyncPageTransportEnabled, double responseSizeExponentialMovingAverageDecayingAlpha, HttpClient httpClient, DriftClient driftClient, @@ -147,7 +143,6 @@ public ExchangeClient( this.concurrentRequestMultiplier = concurrentRequestMultiplier; this.maxErrorDuration = maxErrorDuration; this.acknowledgePages = acknowledgePages; - this.asyncPageTransportEnabled = asyncPageTransportEnabled; this.httpClient = httpClient; this.driftClient = driftClient; this.scheduler = scheduler; @@ -199,11 +194,10 @@ public synchronized void addLocation(URI location, TaskId remoteSourceTaskId) checkState(!noMoreLocations, "No more locations already set"); RpcShuffleClient resultClient; - Optional asyncPageTransportLocation = getAsyncPageTransportLocation(location, asyncPageTransportEnabled); switch (location.getScheme().toLowerCase(Locale.ENGLISH)) { case "http": case "https": - resultClient = new HttpRpcShuffleClient(httpClient, location, asyncPageTransportLocation); + resultClient = new HttpRpcShuffleClient(httpClient, location); break; case "thrift": resultClient = new ThriftRpcShuffleClient(driftClient, location); @@ -217,7 +211,6 @@ public synchronized void addLocation(URI location, TaskId remoteSourceTaskId) maxErrorDuration, acknowledgePages, location, - asyncPageTransportLocation, new ExchangeClientCallback(), scheduler, pageBufferClientCallbackExecutor); @@ -555,19 +548,6 @@ private static void closeQuietly(PageBufferClient client) } } - private static Optional getAsyncPageTransportLocation(URI location, boolean asyncPageTransportEnabled) - { - if (asyncPageTransportEnabled) { - // rewrite location for http request to get task results in async mode - // new URL cannot replace v1/task completely, v1/task/async is only used to get task results - String path = location.getPath().replace("v1/task", "v1/task/async"); - return Optional.of(HttpUriBuilder.uriBuilderFrom(location).replacePath(path).build()); - } - else { - return Optional.empty(); - } - } - private static class ExponentialMovingAverage { private final double alpha; diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientConfig.java b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientConfig.java index 3adeaadc38024..af3cf22c9308d 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientConfig.java @@ -14,6 +14,7 @@ package com.facebook.presto.operator; import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.DefunctConfig; import com.facebook.airlift.http.client.HttpClientConfig; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; @@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit; +@DefunctConfig("exchange.async-page-transport-enabled") public class ExchangeClientConfig { private DataSize maxBufferSize = new DataSize(32, Unit.MEGABYTE); @@ -38,7 +40,6 @@ public class ExchangeClientConfig private int pageBufferClientMaxCallbackThreads = 25; private boolean acknowledgePages = true; private double responseSizeExponentialMovingAverageDecayingAlpha = 0.1; - private boolean asyncPageTransportEnabled = true; @NotNull public DataSize getMaxBufferSize() @@ -170,16 +171,4 @@ public double getResponseSizeExponentialMovingAverageDecayingAlpha() { return responseSizeExponentialMovingAverageDecayingAlpha; } - - public boolean isAsyncPageTransportEnabled() - { - return asyncPageTransportEnabled; - } - - @Config("exchange.async-page-transport-enabled") - public ExchangeClientConfig setAsyncPageTransportEnabled(boolean asyncPageTransportEnabled) - { - this.asyncPageTransportEnabled = asyncPageTransportEnabled; - return this; - } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java index 34a8e062dd6be..e4dc7032c010f 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ExchangeClientFactory.java @@ -46,7 +46,6 @@ public class ExchangeClientFactory private final DriftClient driftClient; private final DataSize maxResponseSize; private final boolean acknowledgePages; - private final boolean asyncPageTransportEnabled; private final double responseSizeExponentialMovingAverageDecayingAlpha; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutorMBean executorMBean; @@ -65,7 +64,6 @@ public ExchangeClientFactory( config.getConcurrentRequestMultiplier(), config.getMaxErrorDuration(), config.isAcknowledgePages(), - config.isAsyncPageTransportEnabled(), config.getPageBufferClientMaxCallbackThreads(), config.getResponseSizeExponentialMovingAverageDecayingAlpha(), httpClient, @@ -79,7 +77,6 @@ public ExchangeClientFactory( int concurrentRequestMultiplier, Duration maxErrorDuration, boolean acknowledgePages, - boolean asyncPageTransportEnabled, int pageBufferClientMaxCallbackThreads, double responseSizeExponentialMovingAverageDecayingAlpha, HttpClient httpClient, @@ -90,7 +87,6 @@ public ExchangeClientFactory( this.concurrentRequestMultiplier = concurrentRequestMultiplier; this.maxErrorDuration = requireNonNull(maxErrorDuration, "maxErrorDuration is null"); this.acknowledgePages = acknowledgePages; - this.asyncPageTransportEnabled = asyncPageTransportEnabled; this.httpClient = requireNonNull(httpClient, "httpClient is null"); this.driftClient = requireNonNull(driftClient, "driftClient is null"); @@ -135,7 +131,6 @@ public ExchangeClient get(LocalMemoryContext systemMemoryContext) concurrentRequestMultiplier, maxErrorDuration, acknowledgePages, - asyncPageTransportEnabled, responseSizeExponentialMovingAverageDecayingAlpha, httpClient, driftClient, diff --git a/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java b/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java index 1791c595373af..676cb111c18e0 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java @@ -37,7 +37,6 @@ import java.io.InputStreamReader; import java.net.URI; import java.util.List; -import java.util.Optional; import static com.facebook.airlift.http.client.HttpStatus.familyForStatusCode; import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; @@ -67,25 +66,17 @@ public final class HttpRpcShuffleClient private final HttpClient httpClient; private final URI location; - private final Optional asyncPageTransportLocation; public HttpRpcShuffleClient(HttpClient httpClient, URI location) - { - this(httpClient, location, Optional.empty()); - } - - public HttpRpcShuffleClient(HttpClient httpClient, URI location, Optional asyncPageTransportLocation) { this.httpClient = requireNonNull(httpClient, "httpClient is null"); this.location = requireNonNull(location, "location is null"); - this.asyncPageTransportLocation = requireNonNull(asyncPageTransportLocation, "asyncPageTransportLocation is null"); } @Override public ListenableFuture getResults(long token, DataSize maxResponseSize) { - URI uriBase = asyncPageTransportLocation.orElse(location); - URI uri = uriBuilderFrom(uriBase).appendPath(String.valueOf(token)).build(); + URI uri = uriBuilderFrom(location).appendPath(String.valueOf(token)).build(); return httpClient.executeAsync( prepareGet() .setHeader(PRESTO_MAX_SIZE, maxResponseSize.toString()) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java b/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java index 80d9a7b19909f..0d7a39fdb2dab 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java @@ -35,7 +35,6 @@ import java.io.Closeable; import java.net.URI; import java.util.List; -import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.Executor; @@ -87,7 +86,6 @@ public interface ClientCallback private final RpcShuffleClient resultClient; private final boolean acknowledgePages; private final URI location; - private final Optional asyncPageTransportLocation; private final ClientCallback clientCallback; private final ScheduledExecutorService scheduler; private final Backoff backoff; @@ -124,12 +122,11 @@ public PageBufferClient( Duration maxErrorDuration, boolean acknowledgePages, URI location, - Optional asyncPageTransportLocation, ClientCallback clientCallback, ScheduledExecutorService scheduler, Executor pageBufferClientCallbackExecutor) { - this(resultClient, maxErrorDuration, acknowledgePages, location, asyncPageTransportLocation, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor); + this(resultClient, maxErrorDuration, acknowledgePages, location, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor); } public PageBufferClient( @@ -137,7 +134,6 @@ public PageBufferClient( Duration maxErrorDuration, boolean acknowledgePages, URI location, - Optional asyncPageTransportLocation, ClientCallback clientCallback, ScheduledExecutorService scheduler, Ticker ticker, @@ -146,7 +142,6 @@ public PageBufferClient( this.resultClient = requireNonNull(resultClient, "resultClient is null"); this.acknowledgePages = acknowledgePages; this.location = requireNonNull(location, "location is null"); - this.asyncPageTransportLocation = requireNonNull(asyncPageTransportLocation, "asyncPageTransportLocation is null"); this.clientCallback = requireNonNull(clientCallback, "clientCallback is null"); this.scheduler = requireNonNull(scheduler, "scheduler is null"); this.pageBufferClientCallbackExecutor = requireNonNull(pageBufferClientCallbackExecutor, "pageBufferClientCallbackExecutor is null"); @@ -268,8 +263,7 @@ private synchronized void initiateRequest(DataSize maxResponseSize) private synchronized void sendGetResults(DataSize maxResponseSize) { - URI uriBase = asyncPageTransportLocation.orElse(location); - URI uri = HttpUriBuilder.uriBuilderFrom(uriBase).appendPath(String.valueOf(token)).build(); + URI uri = HttpUriBuilder.uriBuilderFrom(location).appendPath(String.valueOf(token)).build(); ListenableFuture resultFuture = resultClient.getResults(token, maxResponseSize); diff --git a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportForwardFilter.java b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportForwardFilter.java new file mode 100644 index 0000000000000..3e307e91152f8 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportForwardFilter.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; + +import java.io.IOException; +import java.util.regex.Pattern; + +/** + * It is not possible to map the {@link AsyncPageTransportServlet} to "/v1/task//results/*" directly. + * Servlet pattern matching doesn't allow arbitrary globbing (e.g.: /v1/task/*\/results/*). + * Only prefix or suffix matching is allowed (e.g.: /v1/task/*, *\/suffix). + * Hence a more nuanced matching and a forward is required. + */ +public class AsyncPageTransportForwardFilter + implements Filter +{ + private static final String GET_RESULTS_URL_PREFIX = "/v1/task"; + private static final Pattern GET_RESULTS_URL_PATTERN = Pattern.compile("/v1/task/[^/]+/results/[^/]+/[^/]+/?"); + private static final String GET_RESULTS_METHOD = "GET"; + private static final String GET_RESULTS_URL_FORWARD_PREFIX = "/v1/task/async"; + + @Override + public void init(FilterConfig filterConfig) {} + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException + { + HttpServletRequest httpRequest = (HttpServletRequest) request; + String requestUri = httpRequest.getRequestURI(); + if (isGetResultsRequest(httpRequest.getRequestURI(), httpRequest.getMethod())) { + String forwardUri = GET_RESULTS_URL_FORWARD_PREFIX + requestUri.substring(GET_RESULTS_URL_PREFIX.length()); + httpRequest.getRequestDispatcher(forwardUri).forward(request, response); + } + else { + chain.doFilter(request, response); + } + } + + private boolean isGetResultsRequest(String uri, String method) + { + if (!GET_RESULTS_METHOD.equals(method)) { + return false; + } + if (!uri.startsWith(GET_RESULTS_URL_PREFIX)) { + return false; + } + return GET_RESULTS_URL_PATTERN.matcher(uri).matches(); + } + + @Override + public void destroy() {} +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/PagesResponseWriter.java b/presto-main/src/main/java/com/facebook/presto/server/PagesResponseWriter.java deleted file mode 100644 index 9d363bf5c86c5..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/server/PagesResponseWriter.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.server; - -import com.facebook.presto.common.Page; -import com.facebook.presto.spi.page.SerializedPage; -import com.google.common.reflect.TypeToken; -import io.airlift.slice.OutputStreamSliceOutput; -import io.airlift.slice.SliceOutput; - -import javax.ws.rs.Produces; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; -import javax.ws.rs.ext.Provider; - -import java.io.EOFException; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UncheckedIOException; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; -import java.util.List; - -import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES; -import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPages; - -@Provider -@Produces(PRESTO_PAGES) -public class PagesResponseWriter - implements MessageBodyWriter> -{ - private static final MediaType PRESTO_PAGES_TYPE = MediaType.valueOf(PRESTO_PAGES); - private static final Type LIST_GENERIC_TOKEN; - - static { - try { - LIST_GENERIC_TOKEN = List.class.getMethod("get", int.class).getGenericReturnType(); - } - catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean isWriteable(Class type, Type genericType, Annotation[] annotations, MediaType mediaType) - { - return List.class.isAssignableFrom(type) && - TypeToken.of(genericType).resolveType(LIST_GENERIC_TOKEN).getRawType().equals(Page.class) && - mediaType.isCompatible(PRESTO_PAGES_TYPE); - } - - @Override - public long getSize(List serializedPages, Class type, Type genericType, Annotation[] annotations, MediaType mediaType) - { - return -1; - } - - @Override - public void writeTo(List serializedPages, - Class type, - Type genericType, - Annotation[] annotations, - MediaType mediaType, - MultivaluedMap httpHeaders, - OutputStream output) - throws IOException, WebApplicationException - { - try { - SliceOutput sliceOutput = new OutputStreamSliceOutput(output); - writeSerializedPages(sliceOutput, serializedPages); - // We use flush instead of close, because the underlying stream would be closed and that is not allowed. - sliceOutput.flush(); - } - catch (UncheckedIOException e) { - // EOF exception occurs when the client disconnects while writing data - // This is not a "server" problem so we don't want to log this - if (!(e.getCause() instanceof EOFException)) { - throw e; - } - } - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 9bd3ebb57c571..07584e5018b48 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -226,6 +226,7 @@ import javax.annotation.PreDestroy; import javax.inject.Singleton; +import javax.servlet.Filter; import javax.servlet.Servlet; import java.util.List; @@ -535,7 +536,6 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon smileCodecBinder(binder).bindSmileCodec(TaskInfo.class); thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class); thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class); - jaxrsBinder(binder).bind(PagesResponseWriter.class); // exchange client binder.bind(ExchangeClientSupplier.class).to(ExchangeClientFactory.class).in(Scopes.SINGLETON); @@ -760,6 +760,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon driftServerBinder(binder).bindService(ThriftServerInfoService.class); // Async page transport + newSetBinder(binder, Filter.class, TheServlet.class).addBinding() + .to(AsyncPageTransportForwardFilter.class).in(Scopes.SINGLETON); binder.bind(AsyncPageTransportServlet.class).in(Scopes.SINGLETON); newExporter(binder).export(AsyncPageTransportServlet.class).withGeneratedName(); newMapBinder(binder, String.class, Servlet.class, TheServlet.class) diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java index e4678ed48711c..96d90f008aef1 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java @@ -16,31 +16,22 @@ import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.airlift.json.Codec; import com.facebook.airlift.json.JsonCodec; -import com.facebook.airlift.json.smile.SmileCodec; -import com.facebook.airlift.stats.TimeStat; import com.facebook.presto.Session; -import com.facebook.presto.common.Page; import com.facebook.presto.connector.ConnectorTypeSerdeManager; import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.TaskInfo; import com.facebook.presto.execution.TaskManager; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferResult; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.metadata.HandleResolver; import com.facebook.presto.metadata.MetadataUpdates; import com.facebook.presto.metadata.SessionPropertyManager; -import com.facebook.presto.spi.page.SerializedPage; import com.facebook.presto.sql.planner.PlanFragment; import com.google.common.collect.ImmutableList; -import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.units.DataSize; import io.airlift.units.Duration; -import org.weakref.jmx.Managed; -import org.weakref.jmx.Nested; import javax.annotation.security.RolesAllowed; import javax.inject.Inject; @@ -55,13 +46,10 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Context; -import javax.ws.rs.core.GenericEntity; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriInfo; import java.util.List; @@ -74,18 +62,11 @@ import static com.facebook.airlift.http.client.thrift.ThriftRequestUtils.APPLICATION_THRIFT_FB_COMPACT; import static com.facebook.airlift.http.server.AsyncResponseHandler.bindAsyncResponse; import static com.facebook.presto.PrestoMediaTypes.APPLICATION_JACKSON_SMILE; -import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES; -import static com.facebook.presto.client.PrestoHeaders.PRESTO_BUFFER_COMPLETE; import static com.facebook.presto.client.PrestoHeaders.PRESTO_CURRENT_STATE; -import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_SIZE; import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_WAIT; -import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN; -import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN; -import static com.facebook.presto.client.PrestoHeaders.PRESTO_TASK_INSTANCE_ID; import static com.facebook.presto.server.TaskResourceUtils.convertToThriftTaskInfo; import static com.facebook.presto.server.TaskResourceUtils.isThriftRequest; import static com.facebook.presto.server.security.RoleType.INTERNAL; -import static com.facebook.presto.util.TaskUtils.DEFAULT_MAX_WAIT_TIME; import static com.facebook.presto.util.TaskUtils.randomizeWaitTime; import static com.google.common.collect.Iterables.transform; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; @@ -107,8 +88,6 @@ public class TaskResource private final SessionPropertyManager sessionPropertyManager; private final Executor responseExecutor; private final ScheduledExecutorService timeoutExecutor; - private final TimeStat readFromOutputBufferTime = new TimeStat(); - private final TimeStat resultsRequestTime = new TimeStat(); private final Codec planFragmentCodec; private final HandleResolver handleResolver; private final ConnectorTypeSerdeManager connectorTypeSerdeManager; @@ -120,8 +99,6 @@ public TaskResource( @ForAsyncRpc BoundedExecutor responseExecutor, @ForAsyncRpc ScheduledExecutorService timeoutExecutor, JsonCodec planFragmentJsonCodec, - SmileCodec planFragmentSmileCodec, - InternalCommunicationConfig communicationConfig, HandleResolver handleResolver, ConnectorTypeSerdeManager connectorTypeSerdeManager) { @@ -298,65 +275,6 @@ public TaskInfo deleteTask( return taskInfo; } - @GET - @Path("{taskId}/results/{bufferId}/{token}") - @Produces(PRESTO_PAGES) - public void getResults( - @PathParam("taskId") TaskId taskId, - @PathParam("bufferId") OutputBufferId bufferId, - @PathParam("token") final long token, - @HeaderParam(PRESTO_MAX_SIZE) DataSize maxSize, - @Suspended AsyncResponse asyncResponse) - { - requireNonNull(taskId, "taskId is null"); - requireNonNull(bufferId, "bufferId is null"); - - long start = System.nanoTime(); - ListenableFuture bufferResultFuture = taskManager.getTaskResults(taskId, bufferId, token, maxSize); - Duration waitTime = randomizeWaitTime(DEFAULT_MAX_WAIT_TIME); - bufferResultFuture = addTimeout( - bufferResultFuture, - () -> BufferResult.emptyResults(taskManager.getTaskInstanceId(taskId), token, false), - waitTime, - timeoutExecutor); - - ListenableFuture responseFuture = Futures.transform(bufferResultFuture, result -> { - List serializedPages = result.getSerializedPages(); - - GenericEntity entity = null; - Status status; - if (serializedPages.isEmpty()) { - status = Status.NO_CONTENT; - } - else { - entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); - status = Status.OK; - } - - return Response.status(status) - .entity(entity) - .header(PRESTO_TASK_INSTANCE_ID, result.getTaskInstanceId()) - .header(PRESTO_PAGE_TOKEN, result.getToken()) - .header(PRESTO_PAGE_NEXT_TOKEN, result.getNextToken()) - .header(PRESTO_BUFFER_COMPLETE, result.isBufferComplete()) - .build(); - }, directExecutor()); - - // For hard timeout, add an additional time to max wait for thread scheduling contention and GC - Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS); - bindAsyncResponse(asyncResponse, responseFuture, responseExecutor) - .withTimeout(timeout, - Response.status(Status.NO_CONTENT) - .header(PRESTO_TASK_INSTANCE_ID, taskManager.getTaskInstanceId(taskId)) - .header(PRESTO_PAGE_TOKEN, token) - .header(PRESTO_PAGE_NEXT_TOKEN, token) - .header(PRESTO_BUFFER_COMPLETE, false) - .build()); - - responseFuture.addListener(() -> readFromOutputBufferTime.add(Duration.nanosSince(start)), directExecutor()); - asyncResponse.register((CompletionCallback) throwable -> resultsRequestTime.add(Duration.nanosSince(start))); - } - @GET @Path("{taskId}/results/{bufferId}/{token}/acknowledge") public void acknowledgeResults( @@ -391,20 +309,6 @@ public void removeRemoteSource(@PathParam("taskId") TaskId taskId, @PathParam("r taskManager.removeRemoteSource(taskId, remoteSourceTaskId); } - @Managed - @Nested - public TimeStat getReadFromOutputBufferTime() - { - return readFromOutputBufferTime; - } - - @Managed - @Nested - public TimeStat getResultsRequestTime() - { - return resultsRequestTime; - } - private static boolean shouldSummarize(UriInfo uriInfo) { return uriInfo.getQueryParameters().containsKey("summarize"); diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClient.java b/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClient.java index abc4967296150..04448a23546d1 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClient.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClient.java @@ -558,7 +558,6 @@ private ExchangeClient createExchangeClient(MockExchangeRequestProcessor process 1, new Duration(1, MINUTES), true, - false, 0.2, new TestingHttpClient(processor, testingHttpClientExecutor), new TestingDriftClient<>(), diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClientConfig.java b/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClientConfig.java index 45951c16f0f59..c7b4187eae171 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClientConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeClientConfig.java @@ -42,8 +42,7 @@ public void testDefaults() .setPageBufferClientMaxCallbackThreads(25) .setClientThreads(25) .setAcknowledgePages(true) - .setResponseSizeExponentialMovingAverageDecayingAlpha(0.1) - .setAsyncPageTransportEnabled(true)); + .setResponseSizeExponentialMovingAverageDecayingAlpha(0.1)); } @Test @@ -60,7 +59,6 @@ public void testExplicitPropertyMappings() .put("exchange.page-buffer-client.max-callback-threads", "16") .put("exchange.acknowledge-pages", "false") .put("exchange.response-size-exponential-moving-average-decaying-alpha", "0.42") - .put("exchange.async-page-transport-enabled", "false") .build(); ExchangeClientConfig expected = new ExchangeClientConfig() @@ -73,8 +71,7 @@ public void testExplicitPropertyMappings() .setClientThreads(2) .setPageBufferClientMaxCallbackThreads(16) .setAcknowledgePages(false) - .setResponseSizeExponentialMovingAverageDecayingAlpha(0.42) - .setAsyncPageTransportEnabled(false); + .setResponseSizeExponentialMovingAverageDecayingAlpha(0.42); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeOperator.java index 9e8057f1d31b1..7a54169209212 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestExchangeOperator.java @@ -91,7 +91,6 @@ public void setUp() 3, new Duration(1, TimeUnit.MINUTES), true, - false, 0.2, httpClient, new TestingDriftClient<>(), diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java b/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java index a1050360bc35e..11893a5d98b57 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; @@ -107,7 +106,6 @@ public void testHappyPath() new Duration(1, TimeUnit.MINUTES), true, location, - Optional.empty(), callback, scheduler, pageBufferClientCallbackExecutor); @@ -194,7 +192,6 @@ public void testLifecycle() new Duration(1, TimeUnit.MINUTES), true, location, - Optional.empty(), callback, scheduler, pageBufferClientCallbackExecutor); @@ -236,7 +233,6 @@ public void testInvalidResponses() new Duration(1, TimeUnit.MINUTES), true, location, - Optional.empty(), callback, scheduler, pageBufferClientCallbackExecutor); @@ -306,7 +302,6 @@ public void testCloseDuringPendingRequest() new Duration(1, TimeUnit.MINUTES), true, location, - Optional.empty(), callback, scheduler, pageBufferClientCallbackExecutor); @@ -362,7 +357,6 @@ public void testExceptionFromResponseHandler() new Duration(30, TimeUnit.SECONDS), true, location, - Optional.empty(), callback, scheduler, ticker, diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestAsyncPageTransportQueries.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestAsyncPageTransportQueries.java deleted file mode 100644 index 97faf28f865cb..0000000000000 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestAsyncPageTransportQueries.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.execution; - -import com.facebook.presto.Session; -import com.facebook.presto.testing.QueryRunner; -import com.facebook.presto.tests.AbstractTestQueryFramework; -import com.facebook.presto.tests.DistributedQueryRunner; -import com.facebook.presto.tpch.TpchPlugin; -import com.google.common.collect.ImmutableMap; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static com.facebook.presto.testing.TestingSession.testSessionBuilder; -import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME; - -public class TestAsyncPageTransportQueries - extends AbstractTestQueryFramework -{ - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session session = testSessionBuilder() - .setCatalog("tpch") - .setSchema(TINY_SCHEMA_NAME) - .build(); - return new DistributedQueryRunner( - session, - 3, - ImmutableMap.of( - "exchange.async-page-transport-enabled", "true")); - } - - @BeforeClass - public void setUp() - { - getQueryRunner().installPlugin(new TpchPlugin()); - getQueryRunner().createCatalog("tpch", "tpch", ImmutableMap.of()); - } - - @Test - public void testAsyncPageTransport() - { - assertQuery("SELECT custkey, orderstatus, COUNT(DISTINCT orderkey) FROM orders GROUP BY custkey, orderstatus"); - } -}