Skip to content

Commit 01bf517

Browse files
Support for query retry on different cluster via router and planchecker
1 parent 3c526ac commit 01bf517

File tree

9 files changed

+154
-20
lines changed

9 files changed

+154
-20
lines changed

presto-client/src/main/java/com/facebook/presto/client/PrestoHeaders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public final class PrestoHeaders
5252
public static final String PRESTO_PAGE_NEXT_TOKEN = "X-Presto-Page-End-Sequence-Id";
5353
public static final String PRESTO_BUFFER_COMPLETE = "X-Presto-Buffer-Complete";
5454
public static final String PRESTO_PREFIX_URL = "X-Presto-Prefix-Url";
55+
public static final String PRESTO_RETRY_QUERY = "X-Presto-Retry-Query";
5556

5657
private PrestoHeaders() {}
5758
}

presto-client/src/main/java/com/facebook/presto/client/StatementClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.common.type.TimeZoneKey;
1717
import com.facebook.presto.spi.security.SelectedRole;
1818
import jakarta.annotation.Nullable;
19+
import okhttp3.Headers;
1920

2021
import java.io.Closeable;
2122
import java.util.Map;
@@ -59,6 +60,8 @@ public interface StatementClient
5960

6061
Set<String> getDeallocatedPreparedStatements();
6162

63+
Headers getResponseHeaders();
64+
6265
@Nullable
6366
String getStartedTransactionId();
6467

presto-client/src/main/java/com/facebook/presto/client/StatementClientV1.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class StatementClientV1
114114
private final Map<String, String> addedSessionFunctions = new ConcurrentHashMap<>();
115115
private final Set<String> removedSessionFunctions = newConcurrentHashSet();
116116
private final boolean validateNextUriSource;
117+
private final Headers responseHeaders;
117118

118119
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
119120

@@ -140,6 +141,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
140141
}
141142

142143
processResponse(response.getHeaders(), response.getValue());
144+
this.responseHeaders = response.getHeaders();
143145
}
144146

145147
private Request buildQueryRequest(ClientSession session, String query)
@@ -216,6 +218,11 @@ private Request buildQueryRequest(ClientSession session, String query)
216218
return builder.build();
217219
}
218220

221+
public Headers getResponseHeaders()
222+
{
223+
return responseHeaders;
224+
}
225+
219226
@Override
220227
public String getQuery()
221228
{

presto-native-tests/src/test/java/com/facebook/presto/nativetests/TestPlanCheckerRouterPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ private static File getPluginSchedulerConfigFile(PlanCheckerRouterPluginConfig p
260260
String javaClusterURI = format("router-java-url=%s", planCheckerRouterConfig.getJavaRouterURI());
261261
String nativeClusterURI = format("router-native-url=%s", planCheckerRouterConfig.getNativeRouterURI());
262262
String javaClusterFallbackEnabled = format("enable-java-cluster-fallback=%s", planCheckerRouterConfig.isJavaClusterFallbackEnabled());
263-
Files.write(tempPluginSchedulerConfigFile, ImmutableList.of(schedulerName, planCheckerClusterURIs, javaClusterURI, nativeClusterURI, javaClusterFallbackEnabled));
263+
String javaClusterQueryRetryEnabled = format("enable-java-cluster-query-retry=%s", planCheckerRouterConfig.isJavaClusterQueryRetryEnabled());
264+
Files.write(tempPluginSchedulerConfigFile, ImmutableList.of(schedulerName, planCheckerClusterURIs, javaClusterURI, nativeClusterURI, javaClusterFallbackEnabled, javaClusterQueryRetryEnabled));
264265
return tempPluginSchedulerConfigFile.toFile();
265266
}
266267

presto-plan-checker-router-plugin/README.md

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@ Set the scheduler name to `CUSTOM_PLUGIN_SCHEDULER` in `etc/router-config.json`.
2020
## Configuration:
2121
The following configuration properties must be set in `etc/router-config/router-scheduler.properties`:
2222

23-
| Property Name | Type | Description |
24-
|------------------------------|---------|--------------------------------------------------------------------------------------------------------------------|
25-
| router-scheduler.name | String | The name of the custom scheduler factory |
26-
| | | Example: `router-scheduler.name=plan-checker` |
27-
| plan-check-clusters-uris | String | The URIs of the plan checker clusters. | |
28-
| router-java-url | String | The router URI dedicated to java clusters. |
29-
| router-native-url | String | The router URI dedicated to native clusters. |
30-
| client-request-timeout | String | The maximum time the client will wait for a response before timing out. |
31-
| | | Default : `2 minutes` |
32-
| enable-java-cluster-fallback | boolean | Enables fallback to the Java clusters when the plan checker clusters are unavailable or fail to process a request. |
33-
| | | Default : `false` |
23+
| Property Name | Type | Description |
24+
|---------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
25+
| router-scheduler.name | String | The name of the custom scheduler factory. <br> Example: `router-scheduler.name=plan-checker` |
26+
| plan-check-clusters-uris | String | The URIs of the plan checker clusters. |
27+
| router-java-url | String | The router URI dedicated to java clusters. |
28+
| router-native-url | String | The router URI dedicated to native clusters. |
29+
| client-request-timeout | String | The maximum time the client will wait for a response before timing out.<br> Default : `2 minutes` |
30+
| enable-java-cluster-fallback | boolean | Enables fallback to the Java clusters when the plan checker clusters are unavailable or fail to process a request.<br> Default : `false` |
31+
| enable-java-cluster-query-retry | boolean | Enables cross-cluster query retry.<br>When set to `true`, if a query fails on the native cluster, the system will automatically retry executing the query on the Java cluster.<br>Default: `false` |

presto-plan-checker-router-plugin/src/main/java/com/facebook/presto/router/scheduler/PlanCheckerRouterPluginConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class PlanCheckerRouterPluginConfig
3434
private URI nativeRouterURI;
3535
private Duration clientRequestTimeout = new Duration(2, MINUTES);
3636
private boolean javaClusterFallbackEnabled;
37+
private boolean javaClusterQueryRetryEnabled;
3738

3839
@Config("plan-check-clusters-uris")
3940
public PlanCheckerRouterPluginConfig setPlanCheckClustersURIs(String uris)
@@ -102,4 +103,16 @@ public PlanCheckerRouterPluginConfig setJavaClusterFallbackEnabled(boolean javaC
102103
this.javaClusterFallbackEnabled = javaClusterFallbackEnabled;
103104
return this;
104105
}
106+
107+
public boolean isJavaClusterQueryRetryEnabled()
108+
{
109+
return javaClusterQueryRetryEnabled;
110+
}
111+
112+
@Config("enable-java-cluster-query-retry")
113+
public PlanCheckerRouterPluginConfig setJavaClusterQueryRetryEnabled(boolean javaClusterQueryRetryEnabled)
114+
{
115+
this.javaClusterQueryRetryEnabled = javaClusterQueryRetryEnabled;
116+
return this;
117+
}
105118
}

presto-plan-checker-router-plugin/src/main/java/com/facebook/presto/router/scheduler/PlanCheckerRouterPluginPrestoClient.java

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import com.facebook.airlift.units.Duration;
1919
import com.facebook.presto.client.ClientSession;
2020
import com.facebook.presto.client.QueryError;
21+
import com.facebook.presto.client.QueryStatusInfo;
2122
import com.facebook.presto.client.StatementClient;
2223
import com.facebook.presto.sql.parser.SqlParserOptions;
2324
import com.google.common.collect.ImmutableMap;
2425
import jakarta.inject.Inject;
26+
import okhttp3.Headers;
27+
import okhttp3.HttpUrl;
2528
import okhttp3.OkHttpClient;
2629
import org.weakref.jmx.Managed;
2730
import org.weakref.jmx.Nested;
@@ -32,13 +35,18 @@
3235
import java.util.Locale;
3336
import java.util.Map;
3437
import java.util.Optional;
38+
import java.util.OptionalLong;
3539
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.regex.Matcher;
41+
import java.util.regex.Pattern;
3642

43+
import static com.facebook.presto.client.PrestoHeaders.PRESTO_RETRY_QUERY;
3744
import static com.facebook.presto.client.PrestoHeaders.PRESTO_TRANSACTION_ID;
3845
import static com.facebook.presto.client.StatementClientFactory.newStatementClient;
3946
import static com.facebook.presto.router.scheduler.HttpRequestSessionContext.getResourceEstimates;
4047
import static com.facebook.presto.router.scheduler.HttpRequestSessionContext.getSerializedSessionFunctions;
4148
import static com.google.common.base.Verify.verify;
49+
import static java.util.Collections.singletonList;
4250
import static java.util.Objects.requireNonNull;
4351

4452
public class PlanCheckerRouterPluginPrestoClient
@@ -55,6 +63,7 @@ public class PlanCheckerRouterPluginPrestoClient
5563
private final URI nativeRouterURI;
5664
private final Duration clientRequestTimeout;
5765
private final boolean javaClusterFallbackEnabled;
66+
private final boolean javaClusterQueryRetryEnabled;
5867

5968
@Inject
6069
public PlanCheckerRouterPluginPrestoClient(PlanCheckerRouterPluginConfig planCheckerRouterPluginConfig)
@@ -68,12 +77,13 @@ public PlanCheckerRouterPluginPrestoClient(PlanCheckerRouterPluginConfig planChe
6877
requireNonNull(planCheckerRouterPluginConfig.getNativeRouterURI(), "nativeRouterURI is null");
6978
this.clientRequestTimeout = planCheckerRouterPluginConfig.getClientRequestTimeout();
7079
this.javaClusterFallbackEnabled = planCheckerRouterPluginConfig.isJavaClusterFallbackEnabled();
80+
this.javaClusterQueryRetryEnabled = planCheckerRouterPluginConfig.isJavaClusterQueryRetryEnabled();
7181
}
7282

7383
public Optional<URI> getCompatibleClusterURI(Map<String, List<String>> headers, String statement, Principal principal)
7484
{
7585
String newSql = ANALYZE_CALL + statement;
76-
ClientSession clientSession = parseHeadersToClientSession(headers, principal);
86+
ClientSession clientSession = parseHeadersToClientSession(headers, principal, getPlanCheckerClusterDestination());
7787
boolean isNativeCompatible = true;
7888
// submit initial query
7989
try (StatementClient client = newStatementClient(httpClient, clientSession, newSql)) {
@@ -118,13 +128,92 @@ public Optional<URI> getCompatibleClusterURI(Map<String, List<String>> headers,
118128
if (isNativeCompatible) {
119129
log.debug("Native compatible, routing to native-clusters router: [%s]", nativeRouterURI);
120130
nativeClusterRedirectRequests.update(1L);
131+
if (javaClusterQueryRetryEnabled) {
132+
return buildNativeRedirectURI(headers, principal, statement);
133+
}
121134
return Optional.of(nativeRouterURI);
122135
}
123136
log.debug("Native incompatible, routing to java-clusters router: [%s]", javaRouterURI);
124137
javaClusterRedirectRequests.update(1L);
125138
return Optional.of(javaRouterURI);
126139
}
127140

141+
private Optional<URI> buildNativeRedirectURI(Map<String, List<String>> headers, Principal principal, String statement)
142+
{
143+
ClientSession javaSession = parseHeadersToClientSession(prepareHeadersForJavaCluster(headers), principal, javaRouterURI);
144+
try (StatementClient client = newStatementClient(httpClient, javaSession, statement)) {
145+
Optional<URI> redirectUri = postQueryOnJavaAndGetRedirectUri(client);
146+
return Optional.of(redirectUri.orElse(nativeRouterURI));
147+
}
148+
catch (Exception e) {
149+
log.error("Error submitting query for redirect URI: {%s}", e.getMessage(), e);
150+
return Optional.of(nativeRouterURI);
151+
}
152+
}
153+
154+
public Optional<URI> postQueryOnJavaAndGetRedirectUri(StatementClient client)
155+
{
156+
QueryStatusInfo statusInfo = client.currentStatusInfo();
157+
if (statusInfo == null || statusInfo.getNextUri() == null) {
158+
return Optional.empty();
159+
}
160+
161+
URI retryUri = statusInfo.getNextUri();
162+
Headers headers = client.getResponseHeaders();
163+
OptionalLong maxAgeSeconds = extractMaxAgeInSeconds(headers);
164+
165+
String retryUriBase = retryUri.getScheme() + "://" + retryUri.getAuthority();
166+
String queryId = statusInfo.getId();
167+
String retryUrl = retryUriBase + "/v1/statement/queued/retry/" + queryId;
168+
169+
HttpUrl.Builder redirectBuilder = HttpUrl.get(nativeRouterURI)
170+
.newBuilder()
171+
.addQueryParameter("retryUrl", retryUrl);
172+
173+
maxAgeSeconds.ifPresent(expiration ->
174+
redirectBuilder.addQueryParameter("retryExpirationInSeconds", Long.toString(expiration)));
175+
176+
URI redirect = redirectBuilder.build().uri();
177+
log.info("Redirecting to combined native URI: {%s}", redirect);
178+
179+
return Optional.of(redirect);
180+
}
181+
182+
private static OptionalLong extractMaxAgeInSeconds(Headers headers)
183+
{
184+
if (headers == null) {
185+
return OptionalLong.empty();
186+
}
187+
188+
List<String> cacheControlList = headers.values("Cache-Control");
189+
if (cacheControlList == null) {
190+
return OptionalLong.empty();
191+
}
192+
193+
Pattern maxAgePattern = Pattern.compile("max-age=(\\d+)");
194+
for (String headerValue : cacheControlList) {
195+
Matcher matcher = maxAgePattern.matcher(headerValue);
196+
if (matcher.find()) {
197+
return OptionalLong.of(Long.parseLong(matcher.group(1)));
198+
}
199+
}
200+
return OptionalLong.empty();
201+
}
202+
203+
private static Map<String, List<String>> prepareHeadersForJavaCluster(Map<String, List<String>> headers)
204+
{
205+
ImmutableMap.Builder<String, List<String>> builder = ImmutableMap.builder();
206+
207+
headers.forEach((key, value) -> {
208+
if (!key.equalsIgnoreCase("Host")) {
209+
builder.put(key, value);
210+
}
211+
});
212+
builder.put(PRESTO_RETRY_QUERY, singletonList("true"));
213+
214+
return builder.build();
215+
}
216+
128217
@Managed
129218
@Nested
130219
public CounterStat getJavaClusterRedirectRequests()
@@ -146,16 +235,22 @@ public CounterStat getFallBackToJavaClusterRedirectRequests()
146235
return fallBackToJavaClusterRedirectRequests;
147236
}
148237

149-
private ClientSession parseHeadersToClientSession(Map<String, List<String>> headers, Principal principal)
238+
private ClientSession parseHeadersToClientSession(Map<String, List<String>> headers, Principal principal, URI destinationOverride)
150239
{
240+
ImmutableMap<String, String> customHeaders = headers.entrySet().stream()
241+
.filter(entry -> entry.getKey().equalsIgnoreCase(PRESTO_RETRY_QUERY))
242+
.collect(ImmutableMap.toImmutableMap(
243+
Map.Entry::getKey,
244+
e -> e.getValue().get(0)));
245+
151246
HttpRequestSessionContext sessionContext =
152247
new HttpRequestSessionContext(
153248
headers,
154249
new SqlParserOptions(),
155250
principal);
156251

157252
return new ClientSession(
158-
getPlanCheckerClusterDestination(),
253+
destinationOverride,
159254
sessionContext.getIdentity().getUser(),
160255
sessionContext.getSource(),
161256
Optional.empty(),
@@ -174,7 +269,7 @@ private ClientSession parseHeadersToClientSession(Map<String, List<String>> head
174269
clientRequestTimeout,
175270
true,
176271
getSerializedSessionFunctions(sessionContext),
177-
ImmutableMap.of(), // todo: do we need custom headers?
272+
customHeaders,
178273
true);
179274
}
180275

presto-plan-checker-router-plugin/src/test/java/com/facebook/presto/router/scheduler/TestPlanCheckerProviderRouterPluginConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public void testDefault()
3636
.setNativeRouterURI(null)
3737
.setPlanCheckClustersURIs(null)
3838
.setClientRequestTimeout(new Duration(2, MINUTES))
39-
.setJavaClusterFallbackEnabled(false));
39+
.setJavaClusterFallbackEnabled(false)
40+
.setJavaClusterQueryRetryEnabled(false));
4041
}
4142

4243
@Test
@@ -49,13 +50,15 @@ public void testExplicitPropertyMappings()
4950
.put("plan-check-clusters-uris", "192.168.0.3, 192.168.0.4")
5051
.put("client-request-timeout", "5m")
5152
.put("enable-java-cluster-fallback", "true")
53+
.put("enable-java-cluster-query-retry", "true")
5254
.build();
5355
PlanCheckerRouterPluginConfig expected = new PlanCheckerRouterPluginConfig()
5456
.setJavaRouterURI(new URI("192.168.0.1"))
5557
.setNativeRouterURI(new URI("192.168.0.2"))
5658
.setPlanCheckClustersURIs("192.168.0.3, 192.168.0.4")
5759
.setClientRequestTimeout(new Duration(5, MINUTES))
58-
.setJavaClusterFallbackEnabled(true);
60+
.setJavaClusterFallbackEnabled(true)
61+
.setJavaClusterQueryRetryEnabled(true);
5962
assertFullMapping(properties, expected);
6063
}
6164
}

presto-router/src/main/java/com/facebook/presto/router/RouterResource.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.router;
1515

16+
import com.facebook.airlift.http.client.HttpUriBuilder;
1617
import com.facebook.airlift.log.Logger;
1718
import com.facebook.airlift.stats.CounterStat;
1819
import com.facebook.presto.router.cluster.ClusterManager;
@@ -24,6 +25,7 @@
2425
import jakarta.ws.rs.POST;
2526
import jakarta.ws.rs.Path;
2627
import jakarta.ws.rs.Produces;
28+
import jakarta.ws.rs.QueryParam;
2729
import jakarta.ws.rs.WebApplicationException;
2830
import jakarta.ws.rs.core.Context;
2931
import jakarta.ws.rs.core.Response;
@@ -56,12 +58,23 @@ public RouterResource(ClusterManager clusterManager)
5658
@POST
5759
@Path("/v1/statement")
5860
@Produces(APPLICATION_JSON)
59-
public Response routeQuery(String statement, @Context HttpServletRequest servletRequest)
61+
public Response routeQuery(
62+
String statement,
63+
@Context HttpServletRequest servletRequest,
64+
@QueryParam("retryUrl") String retryUrl,
65+
@QueryParam("retryExpirationInSeconds") String retryExpirationInSeconds)
6066
{
6167
RequestInfo requestInfo = new RequestInfo(servletRequest, statement);
6268
URI coordinatorUri = clusterManager.getDestination(requestInfo).orElseThrow(() -> badRequest(BAD_GATEWAY, "No Presto cluster available"));
63-
URI statementUri = uriBuilderFrom(coordinatorUri).replacePath("/v1/statement").build();
69+
HttpUriBuilder statementUriBuilder = uriBuilderFrom(coordinatorUri).replacePath("/v1/statement");
6470
successRedirectRequests.update(1);
71+
if (retryUrl != null && !retryUrl.isEmpty()) {
72+
statementUriBuilder.addParameter("retryUrl", retryUrl);
73+
}
74+
if (retryExpirationInSeconds != null && !retryExpirationInSeconds.isEmpty()) {
75+
statementUriBuilder.addParameter("retryExpirationInSeconds", retryExpirationInSeconds);
76+
}
77+
URI statementUri = statementUriBuilder.build();
6578
log.info("route query to %s", statementUri);
6679
return Response.temporaryRedirect(statementUri).build();
6780
}

0 commit comments

Comments
 (0)