Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ public Optional<DispatchInfo> getDispatchInfo(QueryId queryId)
});
}

public long getDurationUntilExpirationInMillis(QueryId queryId)
{
return queryTracker.getQuery(queryId).getDurationUntilExpirationInMillis();
}

/**
* Check if a given queryId exists in query tracker
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
throw new UnsupportedOperationException();
}

@Override
public long getDurationUntilExpirationInMillis(QueryId queryId)
{
throw new UnsupportedOperationException();
}

@Override
public Session getQuerySession(QueryId queryId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ BasicQueryInfo getQueryInfo(QueryId queryId)
QueryInfo getFullQueryInfo(QueryId queryId)
throws NoSuchElementException;

/**
* @throws NoSuchElementException if query does not exist
*/
long getDurationUntilExpirationInMillis(QueryId queryId)
throws NoSuchElementException;

/**
* @throws NoSuchElementException if query does not exist
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ public interface TrackedQuery

long getEndTimeInMillis();

default long getDurationUntilExpirationInMillis()
{
Duration queryClientTimeout = getQueryClientTimeout(getSession());
long expireTime = getLastHeartbeatInMillis() + queryClientTimeout.toMillis();
return Math.max(0, expireTime - currentTimeMillis());
}

Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits();

void fail(Throwable cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ Optional<ListenableFuture<Response>> waitForExecutingResponse(
DataSize targetResultSize,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
boolean binaryResults);
boolean binaryResults,
long durationUntilExpirationMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.airlift.units.Duration;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
Expand Down Expand Up @@ -100,7 +101,7 @@ public final class QueryResourceUtil

private QueryResourceUtil() {}

public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled)
public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled, long durationUntilExpirationMs)
{
Response.ResponseBuilder response = Response.ok(queryResults);

Expand Down Expand Up @@ -158,10 +159,18 @@ public static Response toResponse(Query query, QueryResults queryResults, boolea
response.header(PRESTO_REMOVED_SESSION_FUNCTION, urlEncode(SQL_FUNCTION_ID_JSON_CODEC.toJson(signature)));
}

response.cacheControl(getCacheControlMaxAge(durationUntilExpirationMs));

return response.build();
}

public static Response toResponse(Query query, QueryResults queryResults, String xPrestoPrefixUri, boolean compressionEnabled, boolean nestedDataSerializationEnabled)
public static Response toResponse(
Query query,
QueryResults queryResults,
String xPrestoPrefixUri,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
long durationUntilExpirationMs)
{
Iterable<List<Object>> queryResultsData = queryResults.getData();
if (nestedDataSerializationEnabled) {
Expand All @@ -181,7 +190,12 @@ public static Response toResponse(Query query, QueryResults queryResults, String
queryResults.getUpdateType(),
queryResults.getUpdateCount());

return toResponse(query, resultsClone, compressionEnabled);
return toResponse(query, resultsClone, compressionEnabled, durationUntilExpirationMs);
}

public static CacheControl getCacheControlMaxAge(long durationUntilExpirationMs)
{
return CacheControl.valueOf("max-age=" + MILLISECONDS.toSeconds(durationUntilExpirationMs));
}

public static void abortIfPrefixUrlInvalid(String xPrestoPrefixUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
return null;
}

@Override
public long getDurationUntilExpirationInMillis(QueryId queryId)
{
return 0;
}

public Session getQuerySession(QueryId queryId)
{
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
return queryTracker.getQuery(queryId).getQueryInfo();
}

@Override
public long getDurationUntilExpirationInMillis(QueryId queryId)
throws NoSuchElementException
{
return queryTracker.getQuery(queryId).getDurationUntilExpirationInMillis();
}

@Override
public Session getQuerySession(QueryId queryId)
throws NoSuchElementException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.server.ForStatementResource;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.QueryId;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class ExecutingStatementResource

private final BoundedExecutor responseExecutor;
private final LocalQueryProvider queryProvider;
private final QueryManager queryManager;
private final boolean compressionEnabled;
private final boolean nestedDataSerializationEnabled;
private final QueryBlockingRateLimiter queryRateLimiter;
Expand All @@ -76,11 +78,13 @@ public class ExecutingStatementResource
public ExecutingStatementResource(
@ForStatementResource BoundedExecutor responseExecutor,
LocalQueryProvider queryProvider,
QueryManager queryManager,
ServerConfig serverConfig,
QueryBlockingRateLimiter queryRateLimiter)
{
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
this.nestedDataSerializationEnabled = requireNonNull(serverConfig, "serverConfig is null").isNestedDataSerializationEnabled();
this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");
Expand Down Expand Up @@ -132,9 +136,10 @@ public void getQueryResults(
return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize, binaryResults);
},
responseExecutor);
long durationUntilExpirationMs = queryManager.getDurationUntilExpirationInMillis(queryId);
ListenableFuture<Response> queryResultsFuture = transform(
waitForResultsAsync,
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled, durationUntilExpirationMs),
directExecutor());
bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.Optional;

import static com.facebook.presto.server.protocol.QueryResourceUtil.toResponse;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -53,7 +54,8 @@ public Optional<ListenableFuture<Response>> waitForExecutingResponse(
DataSize targetResultSize,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
boolean binaryResults)
boolean binaryResults,
long durationUntilExpirationMs)
{
Query query;
try {
Expand All @@ -64,7 +66,7 @@ public Optional<ListenableFuture<Response>> waitForExecutingResponse(
}
return Optional.of(transform(
query.waitForResults(0, uriInfo, scheme, maxWait, targetResultSize, binaryResults),
results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled, durationUntilExpirationMs),
directExecutor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
Expand All @@ -77,6 +78,7 @@
import static com.facebook.presto.server.protocol.QueryResourceUtil.NO_DURATION;
import static com.facebook.presto.server.protocol.QueryResourceUtil.abortIfPrefixUrlInvalid;
import static com.facebook.presto.server.protocol.QueryResourceUtil.createQueuedQueryResults;
import static com.facebook.presto.server.protocol.QueryResourceUtil.getCacheControlMaxAge;
import static com.facebook.presto.server.protocol.QueryResourceUtil.getQueuedUri;
import static com.facebook.presto.server.protocol.QueryResourceUtil.getScheme;
import static com.facebook.presto.server.security.RoleType.USER;
Expand All @@ -91,6 +93,7 @@
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.System.currentTimeMillis;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -221,7 +224,9 @@ public Response postStatement(
Query query = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0);
queries.put(query.getQueryId(), query);

return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled)
.cacheControl(query.getDefaultCacheControl())
.build();
}

/**
Expand Down Expand Up @@ -273,7 +278,9 @@ public Response putStatement(
throw badRequest(CONFLICT, "Query already exists");
}

return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled)
.cacheControl(query.getDefaultCacheControl())
.build();
}

/**
Expand Down Expand Up @@ -322,7 +329,9 @@ public Response retryFailedQuery(
}
}

return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled)
.cacheControl(query.getDefaultCacheControl())
.build();
}

/**
Expand Down Expand Up @@ -435,6 +444,7 @@ private static Response.ResponseBuilder withCompressionConfiguration(Response.Re

private static final class Query
{
private static final int CACHE_CONTROL_MAX_AGE_SEC = 60;
private final String query;
private final SessionContext sessionContext;
private final DispatchManager dispatchManager;
Expand All @@ -443,6 +453,7 @@ private static final class Query
private final String slug;
private final AtomicLong lastToken = new AtomicLong();
private final int retryCount;
private final long expirationTime;

@GuardedBy("this")
private ListenableFuture<?> querySubmissionFuture;
Expand All @@ -468,6 +479,7 @@ public Query(
this.retryCount = retryCount;
this.queryId = requireNonNull(queryId, "queryId is null");
this.slug = requireNonNull(slug, "slug is null");
this.expirationTime = currentTimeMillis() + SECONDS.toMillis(CACHE_CONTROL_MAX_AGE_SEC);
}

/**
Expand Down Expand Up @@ -518,6 +530,15 @@ public int getRetryCount()
return retryCount;
}

/**
* Returns a cache control with the default max age value
*/
public CacheControl getDefaultCacheControl()
{
long maxAgeMillis = Math.max(0, expirationTime - currentTimeMillis());
return CacheControl.valueOf("max-age=" + MILLISECONDS.toSeconds(maxAgeMillis));
}

/**
* Checks whether the query has been processed by the dispatchManager
*/
Expand Down Expand Up @@ -591,7 +612,10 @@ public ListenableFuture<Response> toResponse(
xPrestoPrefixUrl,
DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION),
binaryResults);
return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled).build());

return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled)
.cacheControl(getDefaultCacheControl())
.build());
}
}

Expand All @@ -602,6 +626,7 @@ public ListenableFuture<Response> toResponse(
.status(NOT_FOUND)
.build()));
}
long durationUntilExpirationMs = dispatchManager.getDurationUntilExpirationInMillis(queryId);

if (waitForDispatched().isDone()) {
Optional<ListenableFuture<Response>> executingQueryResponse = executingQueryResponseProvider.waitForExecutingResponse(
Expand All @@ -615,7 +640,8 @@ public ListenableFuture<Response> toResponse(
TARGET_RESULT_SIZE,
compressionEnabled,
nestedDataSerializationEnabled,
binaryResults);
binaryResults,
durationUntilExpirationMs);

if (executingQueryResponse.isPresent()) {
return executingQueryResponse.get();
Expand All @@ -624,6 +650,7 @@ public ListenableFuture<Response> toResponse(

return immediateFuture(withCompressionConfiguration(Response.ok(
createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled)
.cacheControl(getCacheControlMaxAge(durationUntilExpirationMs))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,17 @@
import static com.facebook.presto.server.TestHttpRequestSessionContext.urlEncode;
import static com.facebook.presto.spi.StandardErrorCode.INCOMPATIBLE_CLIENT;
import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static javax.ws.rs.core.HttpHeaders.CACHE_CONTROL;
import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.Response.Status.OK;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Test(singleThreaded = true)
Expand Down Expand Up @@ -425,6 +428,38 @@ public void testStatusPing()
assertEquals(response.getHeader(CONTENT_TYPE), APPLICATION_JSON, "Content Type");
}

@Test
public void testCacheControlHeaderExists()
{
Request request = preparePost()
.setUri(uriFor("/v1/statement"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.build();

JsonResponse<QueryResults> initResponse = client.execute(request, createFullJsonResponseHandler(QUERY_RESULTS_CODEC));

String initHeader = initResponse.getHeader(CACHE_CONTROL);
assertNotNull(initHeader);
assertTrue(initHeader.contains("max-age"));

int initAge = parseInt(initHeader.substring(initHeader.indexOf("=") + 1));
assertTrue(initAge >= 0);

JsonResponse<QueryResults> queryResults = initResponse;
while (queryResults.getValue().getNextUri() != null) {
URI nextUri = queryResults.getValue().getNextUri();
queryResults = client.execute(prepareGet().setUri(nextUri).build(), createFullJsonResponseHandler(QUERY_RESULTS_CODEC));

String header = queryResults.getHeader(CACHE_CONTROL);
assertNotNull(header);
assertTrue(header.contains("max-age"));

int maxAge = parseInt(header.substring(header.indexOf("=") + 1));
assertTrue(maxAge >= 0);
}
}

public URI uriFor(String path)
{
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build();
Expand Down
Loading
Loading