Skip to content

Commit d2eb154

Browse files
committed
Add Cache-Control header to statement endpoints
1 parent 0a504d2 commit d2eb154

File tree

13 files changed

+165
-12
lines changed

13 files changed

+165
-12
lines changed

presto-main-base/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,11 @@ public Optional<DispatchInfo> getDispatchInfo(QueryId queryId)
412412
});
413413
}
414414

415+
public long getDurationUntilExpirationInMillis(QueryId queryId)
416+
{
417+
return queryTracker.getQuery(queryId).getDurationUntilExpirationInMillis();
418+
}
419+
415420
/**
416421
* Check if a given queryId exists in query tracker
417422
*

presto-main-base/src/main/java/com/facebook/presto/dispatcher/NoOpQueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
7575
throw new UnsupportedOperationException();
7676
}
7777

78+
@Override
79+
public long getDurationUntilExpirationInMillis(QueryId queryId)
80+
{
81+
throw new UnsupportedOperationException();
82+
}
83+
7884
@Override
7985
public Session getQuerySession(QueryId queryId)
8086
{

presto-main-base/src/main/java/com/facebook/presto/execution/QueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ BasicQueryInfo getQueryInfo(QueryId queryId)
6565
QueryInfo getFullQueryInfo(QueryId queryId)
6666
throws NoSuchElementException;
6767

68+
/**
69+
* @throws NoSuchElementException if query does not exist
70+
*/
71+
long getDurationUntilExpirationInMillis(QueryId queryId)
72+
throws NoSuchElementException;
73+
6874
/**
6975
* @throws NoSuchElementException if query does not exist
7076
*/

presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,13 @@ public interface TrackedQuery
405405

406406
long getEndTimeInMillis();
407407

408+
default long getDurationUntilExpirationInMillis()
409+
{
410+
Duration queryClientTimeout = getQueryClientTimeout(getSession());
411+
long expireTime = getLastHeartbeatInMillis() + queryClientTimeout.toMillis();
412+
return Math.max(0, expireTime - currentTimeMillis());
413+
}
414+
408415
Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits();
409416

410417
void fail(Throwable cause);

presto-main-base/src/main/java/com/facebook/presto/server/protocol/ExecutingQueryResponseProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,6 @@ Optional<ListenableFuture<Response>> waitForExecutingResponse(
6060
DataSize targetResultSize,
6161
boolean compressionEnabled,
6262
boolean nestedDataSerializationEnabled,
63-
boolean binaryResults);
63+
boolean binaryResults,
64+
long durationUntilExpirationMs);
6465
}

presto-main-base/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.airlift.units.Duration;
4040

4141
import javax.ws.rs.WebApplicationException;
42+
import javax.ws.rs.core.CacheControl;
4243
import javax.ws.rs.core.Context;
4344
import javax.ws.rs.core.Response;
4445
import javax.ws.rs.core.UriBuilder;
@@ -100,7 +101,7 @@ public final class QueryResourceUtil
100101

101102
private QueryResourceUtil() {}
102103

103-
public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled)
104+
public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled, long durationUntilExpirationMs)
104105
{
105106
Response.ResponseBuilder response = Response.ok(queryResults);
106107

@@ -158,10 +159,18 @@ public static Response toResponse(Query query, QueryResults queryResults, boolea
158159
response.header(PRESTO_REMOVED_SESSION_FUNCTION, urlEncode(SQL_FUNCTION_ID_JSON_CODEC.toJson(signature)));
159160
}
160161

162+
response.cacheControl(getCacheControlMaxAge(durationUntilExpirationMs));
163+
161164
return response.build();
162165
}
163166

164-
public static Response toResponse(Query query, QueryResults queryResults, String xPrestoPrefixUri, boolean compressionEnabled, boolean nestedDataSerializationEnabled)
167+
public static Response toResponse(
168+
Query query,
169+
QueryResults queryResults,
170+
String xPrestoPrefixUri,
171+
boolean compressionEnabled,
172+
boolean nestedDataSerializationEnabled,
173+
long durationUntilExpirationMs)
165174
{
166175
Iterable<List<Object>> queryResultsData = queryResults.getData();
167176
if (nestedDataSerializationEnabled) {
@@ -181,7 +190,12 @@ public static Response toResponse(Query query, QueryResults queryResults, String
181190
queryResults.getUpdateType(),
182191
queryResults.getUpdateCount());
183192

184-
return toResponse(query, resultsClone, compressionEnabled);
193+
return toResponse(query, resultsClone, compressionEnabled, durationUntilExpirationMs);
194+
}
195+
196+
public static CacheControl getCacheControlMaxAge(long durationUntilExpirationMs)
197+
{
198+
return CacheControl.valueOf("max-age=" + MILLISECONDS.toSeconds(durationUntilExpirationMs));
185199
}
186200

187201
public static void abortIfPrefixUrlInvalid(String xPrestoPrefixUrl)

presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
5353
return null;
5454
}
5555

56+
@Override
57+
public long getDurationUntilExpirationInMillis(QueryId queryId)
58+
{
59+
return 0;
60+
}
61+
5662
public Session getQuerySession(QueryId queryId)
5763
{
5864
return null;

presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,13 @@ public QueryInfo getFullQueryInfo(QueryId queryId)
252252
return queryTracker.getQuery(queryId).getQueryInfo();
253253
}
254254

255+
@Override
256+
public long getDurationUntilExpirationInMillis(QueryId queryId)
257+
throws NoSuchElementException
258+
{
259+
return queryTracker.getQuery(queryId).getDurationUntilExpirationInMillis();
260+
}
261+
255262
@Override
256263
public Session getQuerySession(QueryId queryId)
257264
throws NoSuchElementException

presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.airlift.concurrent.BoundedExecutor;
1717
import com.facebook.airlift.stats.TimeStat;
1818
import com.facebook.presto.client.QueryResults;
19+
import com.facebook.presto.execution.QueryManager;
1920
import com.facebook.presto.server.ForStatementResource;
2021
import com.facebook.presto.server.ServerConfig;
2122
import com.facebook.presto.spi.QueryId;
@@ -68,6 +69,7 @@ public class ExecutingStatementResource
6869

6970
private final BoundedExecutor responseExecutor;
7071
private final LocalQueryProvider queryProvider;
72+
private final QueryManager queryManager;
7173
private final boolean compressionEnabled;
7274
private final boolean nestedDataSerializationEnabled;
7375
private final QueryBlockingRateLimiter queryRateLimiter;
@@ -76,11 +78,13 @@ public class ExecutingStatementResource
7678
public ExecutingStatementResource(
7779
@ForStatementResource BoundedExecutor responseExecutor,
7880
LocalQueryProvider queryProvider,
81+
QueryManager queryManager,
7982
ServerConfig serverConfig,
8083
QueryBlockingRateLimiter queryRateLimiter)
8184
{
8285
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
8386
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
87+
this.queryManager = requireNonNull(queryManager, "queryManager is null");
8488
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
8589
this.nestedDataSerializationEnabled = requireNonNull(serverConfig, "serverConfig is null").isNestedDataSerializationEnabled();
8690
this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");
@@ -132,9 +136,10 @@ public void getQueryResults(
132136
return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize, binaryResults);
133137
},
134138
responseExecutor);
139+
long durationUntilExpirationMs = queryManager.getDurationUntilExpirationInMillis(queryId);
135140
ListenableFuture<Response> queryResultsFuture = transform(
136141
waitForResultsAsync,
137-
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
142+
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled, durationUntilExpirationMs),
138143
directExecutor());
139144
bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor);
140145
}

presto-main/src/main/java/com/facebook/presto/server/protocol/LocalExecutingQueryResponseProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Optional;
2828

29+
import static com.facebook.presto.server.protocol.QueryResourceUtil.toResponse;
2930
import static com.google.common.util.concurrent.Futures.transform;
3031
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
3132
import static java.util.Objects.requireNonNull;
@@ -53,7 +54,8 @@ public Optional<ListenableFuture<Response>> waitForExecutingResponse(
5354
DataSize targetResultSize,
5455
boolean compressionEnabled,
5556
boolean nestedDataSerializationEnabled,
56-
boolean binaryResults)
57+
boolean binaryResults,
58+
long durationUntilExpirationMs)
5759
{
5860
Query query;
5961
try {
@@ -64,7 +66,7 @@ public Optional<ListenableFuture<Response>> waitForExecutingResponse(
6466
}
6567
return Optional.of(transform(
6668
query.waitForResults(0, uriInfo, scheme, maxWait, targetResultSize, binaryResults),
67-
results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
69+
results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled, durationUntilExpirationMs),
6870
directExecutor()));
6971
}
7072
}

0 commit comments

Comments
 (0)