Skip to content

Commit 1330919

Browse files
committed
Add Cache-Control header to query endpoints
1 parent 0a504d2 commit 1330919

File tree

11 files changed

+96
-9
lines changed

11 files changed

+96
-9
lines changed

presto-main-base/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,11 @@ public Optional<DispatchInfo> getDispatchInfo(QueryId queryId)
412412
});
413413
}
414414

415+
public DispatchQuery getDispatchQuery(QueryId queryId)
416+
{
417+
return queryTracker.getQuery(queryId);
418+
}
419+
415420
/**
416421
* Check if a given queryId exists in query tracker
417422
*

presto-main-base/src/main/java/com/facebook/presto/dispatcher/NoOpQueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
7575
throw new UnsupportedOperationException();
7676
}
7777

78+
@Override
79+
public QueryExecution getQueryExecution(QueryId queryId) throws NoSuchElementException
80+
{
81+
throw new UnsupportedOperationException();
82+
}
83+
7884
@Override
7985
public Session getQuerySession(QueryId queryId)
8086
{

presto-main-base/src/main/java/com/facebook/presto/execution/QueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ BasicQueryInfo getQueryInfo(QueryId queryId)
6565
QueryInfo getFullQueryInfo(QueryId queryId)
6666
throws NoSuchElementException;
6767

68+
/**
69+
* @throws NoSuchElementException if query does not exist
70+
*/
71+
QueryExecution getQueryExecution(QueryId queryId)
72+
throws NoSuchElementException;
73+
6874
/**
6975
* @throws NoSuchElementException if query does not exist
7076
*/

presto-main-base/src/main/java/com/facebook/presto/server/protocol/ExecutingQueryResponseProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.server.protocol;
1515

1616
import com.facebook.presto.dispatcher.DispatchInfo;
17+
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
1718
import com.facebook.presto.spi.QueryId;
1819
import com.google.common.util.concurrent.ListenableFuture;
1920
import io.airlift.units.DataSize;
@@ -53,6 +54,7 @@ Optional<ListenableFuture<Response>> waitForExecutingResponse(
5354
QueryId queryId,
5455
String slug,
5556
DispatchInfo dispatchInfo,
57+
TrackedQuery trackedQuery,
5658
UriInfo uriInfo,
5759
String xPrestoPrefixUrl,
5860
String scheme,

presto-main-base/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.execution.QueryInfo;
2828
import com.facebook.presto.execution.QueryState;
2929
import com.facebook.presto.execution.QueryStats;
30+
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
3031
import com.facebook.presto.execution.StageExecutionInfo;
3132
import com.facebook.presto.execution.StageExecutionStats;
3233
import com.facebook.presto.execution.StageInfo;
@@ -39,6 +40,7 @@
3940
import io.airlift.units.Duration;
4041

4142
import javax.ws.rs.WebApplicationException;
43+
import javax.ws.rs.core.CacheControl;
4244
import javax.ws.rs.core.Context;
4345
import javax.ws.rs.core.Response;
4446
import javax.ws.rs.core.UriBuilder;
@@ -61,6 +63,7 @@
6163
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
6264
import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
6365
import static com.facebook.airlift.json.JsonCodec.mapJsonCodec;
66+
import static com.facebook.presto.SystemSessionProperties.getQueryClientTimeout;
6467
import static com.facebook.presto.client.PrestoHeaders.PRESTO_ADDED_PREPARE;
6568
import static com.facebook.presto.client.PrestoHeaders.PRESTO_ADDED_SESSION_FUNCTION;
6669
import static com.facebook.presto.client.PrestoHeaders.PRESTO_CLEAR_SESSION;
@@ -83,6 +86,7 @@
8386
import static com.google.common.base.Preconditions.checkArgument;
8487
import static com.google.common.base.Strings.isNullOrEmpty;
8588
import static java.lang.String.format;
89+
import static java.lang.System.currentTimeMillis;
8690
import static java.util.Collections.unmodifiableList;
8791
import static java.util.Objects.requireNonNull;
8892
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -100,7 +104,7 @@ public final class QueryResourceUtil
100104

101105
private QueryResourceUtil() {}
102106

103-
public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled)
107+
public static Response toResponse(Query query, QueryResults queryResults, TrackedQuery trackedQuery, boolean compressionEnabled)
104108
{
105109
Response.ResponseBuilder response = Response.ok(queryResults);
106110

@@ -158,10 +162,12 @@ public static Response toResponse(Query query, QueryResults queryResults, boolea
158162
response.header(PRESTO_REMOVED_SESSION_FUNCTION, urlEncode(SQL_FUNCTION_ID_JSON_CODEC.toJson(signature)));
159163
}
160164

165+
response.cacheControl(getCacheControlFromTracked(trackedQuery));
166+
161167
return response.build();
162168
}
163169

164-
public static Response toResponse(Query query, QueryResults queryResults, String xPrestoPrefixUri, boolean compressionEnabled, boolean nestedDataSerializationEnabled)
170+
public static Response toResponse(Query query, QueryResults queryResults, TrackedQuery trackedQuery, String xPrestoPrefixUri, boolean compressionEnabled, boolean nestedDataSerializationEnabled)
165171
{
166172
Iterable<List<Object>> queryResultsData = queryResults.getData();
167173
if (nestedDataSerializationEnabled) {
@@ -181,7 +187,15 @@ public static Response toResponse(Query query, QueryResults queryResults, String
181187
queryResults.getUpdateType(),
182188
queryResults.getUpdateCount());
183189

184-
return toResponse(query, resultsClone, compressionEnabled);
190+
return toResponse(query, resultsClone, trackedQuery, compressionEnabled);
191+
}
192+
193+
public static CacheControl getCacheControlFromTracked(TrackedQuery trackedQuery)
194+
{
195+
Duration queryClientTimeout = getQueryClientTimeout(trackedQuery.getSession());
196+
long expireTime = trackedQuery.getLastHeartbeatInMillis() + queryClientTimeout.toMillis();
197+
long ageInSeconds = MILLISECONDS.toSeconds(Math.max(0, expireTime - currentTimeMillis()));
198+
return CacheControl.valueOf("max-age=" + ageInSeconds);
185199
}
186200

187201
public static void abortIfPrefixUrlInvalid(String xPrestoPrefixUrl)

presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
5353
return null;
5454
}
5555

56+
@Override
57+
public QueryExecution getQueryExecution(QueryId queryId)
58+
{
59+
return null;
60+
}
61+
5662
public Session getQuerySession(QueryId queryId)
5763
{
5864
return null;

presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
252252
return queryTracker.getQuery(queryId).getQueryInfo();
253253
}
254254

255+
@Override
256+
public QueryExecution getQueryExecution(QueryId queryId)
257+
{
258+
return queryTracker.getQuery(queryId);
259+
}
260+
255261
@Override
256262
public Session getQuerySession(QueryId queryId)
257263
throws NoSuchElementException

presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.facebook.airlift.concurrent.BoundedExecutor;
1717
import com.facebook.airlift.stats.TimeStat;
1818
import com.facebook.presto.client.QueryResults;
19+
import com.facebook.presto.execution.QueryExecution;
20+
import com.facebook.presto.execution.QueryManager;
1921
import com.facebook.presto.server.ForStatementResource;
2022
import com.facebook.presto.server.ServerConfig;
2123
import com.facebook.presto.spi.QueryId;
@@ -68,6 +70,7 @@ public class ExecutingStatementResource
6870

6971
private final BoundedExecutor responseExecutor;
7072
private final LocalQueryProvider queryProvider;
73+
private final QueryManager queryManager;
7174
private final boolean compressionEnabled;
7275
private final boolean nestedDataSerializationEnabled;
7376
private final QueryBlockingRateLimiter queryRateLimiter;
@@ -76,11 +79,13 @@ public class ExecutingStatementResource
7679
public ExecutingStatementResource(
7780
@ForStatementResource BoundedExecutor responseExecutor,
7881
LocalQueryProvider queryProvider,
82+
QueryManager queryManager,
7983
ServerConfig serverConfig,
8084
QueryBlockingRateLimiter queryRateLimiter)
8185
{
8286
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
8387
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
88+
this.queryManager = requireNonNull(queryManager, "queryManager is null");
8489
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
8590
this.nestedDataSerializationEnabled = requireNonNull(serverConfig, "serverConfig is null").isNestedDataSerializationEnabled();
8691
this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");
@@ -132,9 +137,10 @@ public void getQueryResults(
132137
return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize, binaryResults);
133138
},
134139
responseExecutor);
140+
QueryExecution trackedQuery = queryManager.getQueryExecution(queryId);
135141
ListenableFuture<Response> queryResultsFuture = transform(
136142
waitForResultsAsync,
137-
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
143+
results -> toResponse(query, results, trackedQuery, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
138144
directExecutor());
139145
bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor);
140146
}

presto-main/src/main/java/com/facebook/presto/server/protocol/LocalExecutingQueryResponseProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.server.protocol;
1515

1616
import com.facebook.presto.dispatcher.DispatchInfo;
17+
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
1718
import com.facebook.presto.spi.QueryId;
1819
import com.google.common.util.concurrent.ListenableFuture;
1920
import io.airlift.units.DataSize;
@@ -46,6 +47,7 @@ public Optional<ListenableFuture<Response>> waitForExecutingResponse(
4647
QueryId queryId,
4748
String slug,
4849
DispatchInfo dispatchInfo,
50+
TrackedQuery trackedQuery,
4951
UriInfo uriInfo,
5052
String xPrestoPrefixUrl,
5153
String scheme,
@@ -64,7 +66,7 @@ public Optional<ListenableFuture<Response>> waitForExecutingResponse(
6466
}
6567
return Optional.of(transform(
6668
query.waitForResults(0, uriInfo, scheme, maxWait, targetResultSize, binaryResults),
67-
results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
69+
results -> QueryResourceUtil.toResponse(query, results, trackedQuery, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
6870
directExecutor()));
6971
}
7072
}

presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.facebook.presto.dispatcher.DispatchExecutor;
2222
import com.facebook.presto.dispatcher.DispatchInfo;
2323
import com.facebook.presto.dispatcher.DispatchManager;
24+
import com.facebook.presto.dispatcher.DispatchQuery;
2425
import com.facebook.presto.execution.ExecutionFailureInfo;
2526
import com.facebook.presto.metadata.SessionPropertyManager;
2627
import com.facebook.presto.server.HttpRequestSessionContext;
@@ -56,6 +57,7 @@
5657
import javax.ws.rs.WebApplicationException;
5758
import javax.ws.rs.container.AsyncResponse;
5859
import javax.ws.rs.container.Suspended;
60+
import javax.ws.rs.core.CacheControl;
5961
import javax.ws.rs.core.Context;
6062
import javax.ws.rs.core.Response;
6163
import javax.ws.rs.core.Response.Status;
@@ -77,6 +79,7 @@
7779
import static com.facebook.presto.server.protocol.QueryResourceUtil.NO_DURATION;
7880
import static com.facebook.presto.server.protocol.QueryResourceUtil.abortIfPrefixUrlInvalid;
7981
import static com.facebook.presto.server.protocol.QueryResourceUtil.createQueuedQueryResults;
82+
import static com.facebook.presto.server.protocol.QueryResourceUtil.getCacheControlFromTracked;
8083
import static com.facebook.presto.server.protocol.QueryResourceUtil.getQueuedUri;
8184
import static com.facebook.presto.server.protocol.QueryResourceUtil.getScheme;
8285
import static com.facebook.presto.server.security.RoleType.USER;
@@ -91,6 +94,7 @@
9194
import static com.google.common.util.concurrent.Futures.transformAsync;
9295
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
9396
import static io.airlift.units.DataSize.Unit.MEGABYTE;
97+
import static java.lang.System.currentTimeMillis;
9498
import static java.util.Locale.ENGLISH;
9599
import static java.util.Objects.requireNonNull;
96100
import static java.util.UUID.randomUUID;
@@ -221,7 +225,9 @@ public Response postStatement(
221225
Query query = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0);
222226
queries.put(query.getQueryId(), query);
223227

224-
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
228+
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled)
229+
.cacheControl(query.getDefaultCacheControl())
230+
.build();
225231
}
226232

227233
/**
@@ -273,7 +279,9 @@ public Response putStatement(
273279
throw badRequest(CONFLICT, "Query already exists");
274280
}
275281

276-
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
282+
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled)
283+
.cacheControl(query.getDefaultCacheControl())
284+
.build();
277285
}
278286

279287
/**
@@ -322,7 +330,9 @@ public Response retryFailedQuery(
322330
}
323331
}
324332

325-
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
333+
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled)
334+
.cacheControl(query.getDefaultCacheControl())
335+
.build();
326336
}
327337

328338
/**
@@ -435,6 +445,7 @@ private static Response.ResponseBuilder withCompressionConfiguration(Response.Re
435445

436446
private static final class Query
437447
{
448+
private static final int CACHE_CONTROL_MAX_AGE_SEC = 60;
438449
private final String query;
439450
private final SessionContext sessionContext;
440451
private final DispatchManager dispatchManager;
@@ -443,6 +454,7 @@ private static final class Query
443454
private final String slug;
444455
private final AtomicLong lastToken = new AtomicLong();
445456
private final int retryCount;
457+
private final long expirationTime;
446458

447459
@GuardedBy("this")
448460
private ListenableFuture<?> querySubmissionFuture;
@@ -468,6 +480,7 @@ public Query(
468480
this.retryCount = retryCount;
469481
this.queryId = requireNonNull(queryId, "queryId is null");
470482
this.slug = requireNonNull(slug, "slug is null");
483+
this.expirationTime = currentTimeMillis() + SECONDS.toMillis(CACHE_CONTROL_MAX_AGE_SEC);
471484
}
472485

473486
/**
@@ -518,6 +531,15 @@ public int getRetryCount()
518531
return retryCount;
519532
}
520533

534+
/**
535+
* Returns a cache control with the default max age value
536+
*/
537+
public CacheControl getDefaultCacheControl()
538+
{
539+
long maxAgeMillis = Math.max(0, expirationTime - currentTimeMillis());
540+
return CacheControl.valueOf("max-age=" + MILLISECONDS.toSeconds(maxAgeMillis));
541+
}
542+
521543
/**
522544
* Checks whether the query has been processed by the dispatchManager
523545
*/
@@ -591,7 +613,10 @@ public ListenableFuture<Response> toResponse(
591613
xPrestoPrefixUrl,
592614
DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION),
593615
binaryResults);
594-
return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled).build());
616+
617+
return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled)
618+
.cacheControl(getDefaultCacheControl())
619+
.build());
595620
}
596621
}
597622

@@ -602,12 +627,14 @@ public ListenableFuture<Response> toResponse(
602627
.status(NOT_FOUND)
603628
.build()));
604629
}
630+
DispatchQuery dispatchQuery = dispatchManager.getDispatchQuery(queryId);
605631

606632
if (waitForDispatched().isDone()) {
607633
Optional<ListenableFuture<Response>> executingQueryResponse = executingQueryResponseProvider.waitForExecutingResponse(
608634
queryId,
609635
slug,
610636
dispatchInfo.get(),
637+
dispatchQuery,
611638
uriInfo,
612639
xPrestoPrefixUrl,
613640
getScheme(xForwardedProto, uriInfo),
@@ -624,6 +651,7 @@ public ListenableFuture<Response> toResponse(
624651

625652
return immediateFuture(withCompressionConfiguration(Response.ok(
626653
createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled)
654+
.cacheControl(getCacheControlFromTracked(dispatchQuery))
627655
.build());
628656
}
629657

0 commit comments

Comments
 (0)