Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.operator;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.drift.client.DriftClient;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.memory.context.LocalMemoryContext;
Expand All @@ -41,7 +40,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -91,7 +89,6 @@ public class ExchangeClient
private final HttpClient httpClient;
private final DriftClient<ThriftTaskClient> driftClient;
private final ScheduledExecutorService scheduler;
private boolean asyncPageTransportEnabled;

@GuardedBy("this")
private boolean noMoreLocations;
Expand Down Expand Up @@ -133,7 +130,6 @@ public ExchangeClient(
int concurrentRequestMultiplier,
Duration maxErrorDuration,
boolean acknowledgePages,
boolean asyncPageTransportEnabled,
double responseSizeExponentialMovingAverageDecayingAlpha,
HttpClient httpClient,
DriftClient<ThriftTaskClient> driftClient,
Expand All @@ -147,7 +143,6 @@ public ExchangeClient(
this.concurrentRequestMultiplier = concurrentRequestMultiplier;
this.maxErrorDuration = maxErrorDuration;
this.acknowledgePages = acknowledgePages;
this.asyncPageTransportEnabled = asyncPageTransportEnabled;
this.httpClient = httpClient;
this.driftClient = driftClient;
this.scheduler = scheduler;
Expand Down Expand Up @@ -199,11 +194,10 @@ public synchronized void addLocation(URI location, TaskId remoteSourceTaskId)
checkState(!noMoreLocations, "No more locations already set");

RpcShuffleClient resultClient;
Optional<URI> asyncPageTransportLocation = getAsyncPageTransportLocation(location, asyncPageTransportEnabled);
switch (location.getScheme().toLowerCase(Locale.ENGLISH)) {
case "http":
case "https":
resultClient = new HttpRpcShuffleClient(httpClient, location, asyncPageTransportLocation);
resultClient = new HttpRpcShuffleClient(httpClient, location);
break;
case "thrift":
resultClient = new ThriftRpcShuffleClient(driftClient, location);
Expand All @@ -217,7 +211,6 @@ public synchronized void addLocation(URI location, TaskId remoteSourceTaskId)
maxErrorDuration,
acknowledgePages,
location,
asyncPageTransportLocation,
new ExchangeClientCallback(),
scheduler,
pageBufferClientCallbackExecutor);
Expand Down Expand Up @@ -555,19 +548,6 @@ private static void closeQuietly(PageBufferClient client)
}
}

private static Optional<URI> getAsyncPageTransportLocation(URI location, boolean asyncPageTransportEnabled)
{
if (asyncPageTransportEnabled) {
// rewrite location for http request to get task results in async mode
// new URL cannot replace v1/task completely, v1/task/async is only used to get task results
String path = location.getPath().replace("v1/task", "v1/task/async");
return Optional.of(HttpUriBuilder.uriBuilderFrom(location).replacePath(path).build());
}
else {
return Optional.empty();
}
}

private static class ExponentialMovingAverage
{
private final double alpha;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.operator;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.DefunctConfig;
import com.facebook.airlift.http.client.HttpClientConfig;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
Expand All @@ -26,6 +27,7 @@

import java.util.concurrent.TimeUnit;

@DefunctConfig("exchange.async-page-transport-enabled")
public class ExchangeClientConfig
{
private DataSize maxBufferSize = new DataSize(32, Unit.MEGABYTE);
Expand All @@ -38,7 +40,6 @@ public class ExchangeClientConfig
private int pageBufferClientMaxCallbackThreads = 25;
private boolean acknowledgePages = true;
private double responseSizeExponentialMovingAverageDecayingAlpha = 0.1;
private boolean asyncPageTransportEnabled = true;

@NotNull
public DataSize getMaxBufferSize()
Expand Down Expand Up @@ -170,16 +171,4 @@ public double getResponseSizeExponentialMovingAverageDecayingAlpha()
{
return responseSizeExponentialMovingAverageDecayingAlpha;
}

public boolean isAsyncPageTransportEnabled()
{
return asyncPageTransportEnabled;
}

@Config("exchange.async-page-transport-enabled")
public ExchangeClientConfig setAsyncPageTransportEnabled(boolean asyncPageTransportEnabled)
{
this.asyncPageTransportEnabled = asyncPageTransportEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class ExchangeClientFactory
private final DriftClient<ThriftTaskClient> driftClient;
private final DataSize maxResponseSize;
private final boolean acknowledgePages;
private final boolean asyncPageTransportEnabled;
private final double responseSizeExponentialMovingAverageDecayingAlpha;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutorMBean executorMBean;
Expand All @@ -65,7 +64,6 @@ public ExchangeClientFactory(
config.getConcurrentRequestMultiplier(),
config.getMaxErrorDuration(),
config.isAcknowledgePages(),
config.isAsyncPageTransportEnabled(),
config.getPageBufferClientMaxCallbackThreads(),
config.getResponseSizeExponentialMovingAverageDecayingAlpha(),
httpClient,
Expand All @@ -79,7 +77,6 @@ public ExchangeClientFactory(
int concurrentRequestMultiplier,
Duration maxErrorDuration,
boolean acknowledgePages,
boolean asyncPageTransportEnabled,
int pageBufferClientMaxCallbackThreads,
double responseSizeExponentialMovingAverageDecayingAlpha,
HttpClient httpClient,
Expand All @@ -90,7 +87,6 @@ public ExchangeClientFactory(
this.concurrentRequestMultiplier = concurrentRequestMultiplier;
this.maxErrorDuration = requireNonNull(maxErrorDuration, "maxErrorDuration is null");
this.acknowledgePages = acknowledgePages;
this.asyncPageTransportEnabled = asyncPageTransportEnabled;
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.driftClient = requireNonNull(driftClient, "driftClient is null");

Expand Down Expand Up @@ -135,7 +131,6 @@ public ExchangeClient get(LocalMemoryContext systemMemoryContext)
concurrentRequestMultiplier,
maxErrorDuration,
acknowledgePages,
asyncPageTransportEnabled,
responseSizeExponentialMovingAverageDecayingAlpha,
httpClient,
driftClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.Optional;

import static com.facebook.airlift.http.client.HttpStatus.familyForStatusCode;
import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
Expand Down Expand Up @@ -67,25 +66,17 @@ public final class HttpRpcShuffleClient

private final HttpClient httpClient;
private final URI location;
private final Optional<URI> asyncPageTransportLocation;

public HttpRpcShuffleClient(HttpClient httpClient, URI location)
{
this(httpClient, location, Optional.empty());
}

public HttpRpcShuffleClient(HttpClient httpClient, URI location, Optional<URI> asyncPageTransportLocation)
{
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.location = requireNonNull(location, "location is null");
this.asyncPageTransportLocation = requireNonNull(asyncPageTransportLocation, "asyncPageTransportLocation is null");
}

@Override
public ListenableFuture<PagesResponse> getResults(long token, DataSize maxResponseSize)
{
URI uriBase = asyncPageTransportLocation.orElse(location);
URI uri = uriBuilderFrom(uriBase).appendPath(String.valueOf(token)).build();
URI uri = uriBuilderFrom(location).appendPath(String.valueOf(token)).build();
return httpClient.executeAsync(
prepareGet()
.setHeader(PRESTO_MAX_SIZE, maxResponseSize.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.io.Closeable;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -87,7 +86,6 @@ public interface ClientCallback
private final RpcShuffleClient resultClient;
private final boolean acknowledgePages;
private final URI location;
private final Optional<URI> asyncPageTransportLocation;
private final ClientCallback clientCallback;
private final ScheduledExecutorService scheduler;
private final Backoff backoff;
Expand Down Expand Up @@ -124,20 +122,18 @@ public PageBufferClient(
Duration maxErrorDuration,
boolean acknowledgePages,
URI location,
Optional<URI> asyncPageTransportLocation,
ClientCallback clientCallback,
ScheduledExecutorService scheduler,
Executor pageBufferClientCallbackExecutor)
{
this(resultClient, maxErrorDuration, acknowledgePages, location, asyncPageTransportLocation, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor);
this(resultClient, maxErrorDuration, acknowledgePages, location, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor);
}

public PageBufferClient(
RpcShuffleClient resultClient,
Duration maxErrorDuration,
boolean acknowledgePages,
URI location,
Optional<URI> asyncPageTransportLocation,
ClientCallback clientCallback,
ScheduledExecutorService scheduler,
Ticker ticker,
Expand All @@ -146,7 +142,6 @@ public PageBufferClient(
this.resultClient = requireNonNull(resultClient, "resultClient is null");
this.acknowledgePages = acknowledgePages;
this.location = requireNonNull(location, "location is null");
this.asyncPageTransportLocation = requireNonNull(asyncPageTransportLocation, "asyncPageTransportLocation is null");
this.clientCallback = requireNonNull(clientCallback, "clientCallback is null");
this.scheduler = requireNonNull(scheduler, "scheduler is null");
this.pageBufferClientCallbackExecutor = requireNonNull(pageBufferClientCallbackExecutor, "pageBufferClientCallbackExecutor is null");
Expand Down Expand Up @@ -268,8 +263,7 @@ private synchronized void initiateRequest(DataSize maxResponseSize)

private synchronized void sendGetResults(DataSize maxResponseSize)
{
URI uriBase = asyncPageTransportLocation.orElse(location);
URI uri = HttpUriBuilder.uriBuilderFrom(uriBase).appendPath(String.valueOf(token)).build();
URI uri = HttpUriBuilder.uriBuilderFrom(location).appendPath(String.valueOf(token)).build();

ListenableFuture<PagesResponse> resultFuture = resultClient.getResults(token, maxResponseSize);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;

import java.io.IOException;
import java.util.regex.Pattern;

/**
* It is not possible to map the {@link AsyncPageTransportServlet} to "/v1/task/<taskid>/results/*" directly.
* Servlet pattern matching doesn't allow arbitrary globbing (e.g.: /v1/task/*\/results/*).
* Only prefix or suffix matching is allowed (e.g.: /v1/task/*, *\/suffix).
* Hence a more nuanced matching and a forward is required.
*/
public class AsyncPageTransportForwardFilter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just implement the rename in one shot, as opposed to forwarding /v1/task to /v1/task/async? i.e. why not just use /v1/task and remove this filter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then all the requests starting with /v1/task will be routed to the custom servlet, while requests other than /v1/task*/results/* have to be routed to TaskResource

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a better way of doing it.

implements Filter
{
private static final String GET_RESULTS_URL_PREFIX = "/v1/task";
private static final Pattern GET_RESULTS_URL_PATTERN = Pattern.compile("/v1/task/[^/]+/results/[^/]+/[^/]+/?");
private static final String GET_RESULTS_METHOD = "GET";
private static final String GET_RESULTS_URL_FORWARD_PREFIX = "/v1/task/async";

@Override
public void init(FilterConfig filterConfig) {}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException
{
HttpServletRequest httpRequest = (HttpServletRequest) request;
String requestUri = httpRequest.getRequestURI();
if (isGetResultsRequest(httpRequest.getRequestURI(), httpRequest.getMethod())) {
String forwardUri = GET_RESULTS_URL_FORWARD_PREFIX + requestUri.substring(GET_RESULTS_URL_PREFIX.length());
httpRequest.getRequestDispatcher(forwardUri).forward(request, response);
}
else {
chain.doFilter(request, response);
}
}

private boolean isGetResultsRequest(String uri, String method)
{
if (!GET_RESULTS_METHOD.equals(method)) {
return false;
}
if (!uri.startsWith(GET_RESULTS_URL_PREFIX)) {
return false;
}
return GET_RESULTS_URL_PATTERN.matcher(uri).matches();
}

@Override
public void destroy() {}
}
Loading