Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import jakarta.annotation.Nullable;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -59,6 +60,8 @@ public interface StatementClient

Set<String> getDeallocatedPreparedStatements();

Map<String, List<String>> getResponseHeaders();

@Nullable
String getStartedTransactionId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.security.SelectedRole;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.ThreadSafe;
Expand Down Expand Up @@ -114,7 +115,7 @@ class StatementClientV1
private final Map<String, String> addedSessionFunctions = new ConcurrentHashMap<>();
private final Set<String> removedSessionFunctions = newConcurrentHashSet();
private final boolean validateNextUriSource;

private final Map<String, List<String>> responseHeaders;
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
Expand All @@ -140,6 +141,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
}

processResponse(response.getHeaders(), response.getValue());
this.responseHeaders = toHeaderMap(response.getHeaders());
}

private Request buildQueryRequest(ClientSession session, String query)
Expand Down Expand Up @@ -216,6 +218,11 @@ private Request buildQueryRequest(ClientSession session, String query)
return builder.build();
}

public Map<String, List<String>> getResponseHeaders()
{
return responseHeaders;
}

@Override
public String getQuery()
{
Expand Down Expand Up @@ -438,6 +445,15 @@ private void validateNextUriSource(final URI nextUri, final URI infoUri)
throw new RuntimeException(format("Next URI host and port %s are different than current %s", nextUri.getHost(), infoUri.getHost()));
}

private static Map<String, List<String>> toHeaderMap(Headers headers)
{
ImmutableMap.Builder<String, List<String>> builder = ImmutableMap.builder();
for (String name : headers.names()) {
builder.put(name, ImmutableList.copyOf(headers.values(name)));
}
return builder.build();
}

private void processResponse(Headers headers, QueryResults results)
{
setCatalog.set(headers.get(PRESTO_SET_CATALOG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ private static File getPluginSchedulerConfigFile(PlanCheckerRouterPluginConfig p
String javaClusterURI = format("router-java-url=%s", planCheckerRouterConfig.getJavaRouterURI());
String nativeClusterURI = format("router-native-url=%s", planCheckerRouterConfig.getNativeRouterURI());
String javaClusterFallbackEnabled = format("enable-java-cluster-fallback=%s", planCheckerRouterConfig.isJavaClusterFallbackEnabled());
Files.write(tempPluginSchedulerConfigFile, ImmutableList.of(schedulerName, planCheckerClusterURIs, javaClusterURI, nativeClusterURI, javaClusterFallbackEnabled));
String javaClusterQueryRetryEnabled = format("enable-java-cluster-query-retry=%s", planCheckerRouterConfig.isJavaClusterQueryRetryEnabled());
Files.write(tempPluginSchedulerConfigFile, ImmutableList.of(schedulerName, planCheckerClusterURIs, javaClusterURI, nativeClusterURI, javaClusterFallbackEnabled, javaClusterQueryRetryEnabled));
return tempPluginSchedulerConfigFile.toFile();
}

Expand Down
20 changes: 9 additions & 11 deletions presto-plan-checker-router-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ Set the scheduler name to `CUSTOM_PLUGIN_SCHEDULER` in `etc/router-config.json`.
## Configuration:
The following configuration properties must be set in `etc/router-config/router-scheduler.properties`:

| Property Name | Type | Description |
|------------------------------|---------|--------------------------------------------------------------------------------------------------------------------|
| router-scheduler.name | String | The name of the custom scheduler factory |
| | | Example: `router-scheduler.name=plan-checker` |
| plan-check-clusters-uris | String | The URIs of the plan checker clusters. | |
| router-java-url | String | The router URI dedicated to java clusters. |
| router-native-url | String | The router URI dedicated to native clusters. |
| client-request-timeout | String | The maximum time the client will wait for a response before timing out. |
| | | Default : `2 minutes` |
| enable-java-cluster-fallback | boolean | Enables fallback to the Java clusters when the plan checker clusters are unavailable or fail to process a request. |
| | | Default : `false` |
| Property Name | Type | Description |
|---------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| router-scheduler.name | String | The name of the custom scheduler factory. <br> Example: `router-scheduler.name=plan-checker` |
| plan-check-clusters-uris | String | The URIs of the plan checker clusters. |
| router-java-url | String | The router URI dedicated to Java clusters. |
| router-native-url | String | The router URI dedicated to native clusters. |
| client-request-timeout | String | The maximum time the client will wait for a response before timing out.<br> Default : `2 minutes` |
| enable-java-cluster-fallback | boolean | Enables fallback to the Java clusters when the plan checker clusters are unavailable or fail to process a request.<br> Default : `false` |
| enable-java-cluster-query-retry | boolean | Enables cross-cluster query retry. <br><br>When set to `true`, if a query fails on the native cluster, the system will automatically retry executing the query on the Java cluster. <br>**Default:** `false` <br><br>**Additional Details:** <br>When `enable-java-cluster-query-retry` is enabled, if the `presto-plan-checker-router-plugin` schedules a query on the native cluster (using the `router-native-url`) and the query execution fails, the query is automatically retried on a Java cluster (using the `router-java-url`). <br><br>The property `router-native-url` can point either to the **native coordinator URL** or to the **router URL** for the native cluster. Similarly, the property `router-java-url` can point either to the **Java coordinator URL** or to the **router URL** for the Java cluster. |
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class PlanCheckerRouterPluginConfig
private URI nativeRouterURI;
private Duration clientRequestTimeout = new Duration(2, MINUTES);
private boolean javaClusterFallbackEnabled;
private boolean javaClusterQueryRetryEnabled;

@Config("plan-check-clusters-uris")
public PlanCheckerRouterPluginConfig setPlanCheckClustersURIs(String uris)
Expand Down Expand Up @@ -102,4 +103,16 @@ public PlanCheckerRouterPluginConfig setJavaClusterFallbackEnabled(boolean javaC
this.javaClusterFallbackEnabled = javaClusterFallbackEnabled;
return this;
}

public boolean isJavaClusterQueryRetryEnabled()
{
return javaClusterQueryRetryEnabled;
}

@Config("enable-java-cluster-query-retry")
public PlanCheckerRouterPluginConfig setJavaClusterQueryRetryEnabled(boolean javaClusterQueryRetryEnabled)
{
this.javaClusterQueryRetryEnabled = javaClusterQueryRetryEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import com.facebook.airlift.units.Duration;
import com.facebook.presto.client.ClientSession;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.QueryStatusInfo;
import com.facebook.presto.client.StatementClient;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.google.common.collect.ImmutableMap;
import jakarta.inject.Inject;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
Expand All @@ -32,13 +34,18 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.facebook.presto.client.PrestoHeaders.PRESTO_RETRY_QUERY;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_TRANSACTION_ID;
import static com.facebook.presto.client.StatementClientFactory.newStatementClient;
import static com.facebook.presto.router.scheduler.HttpRequestSessionContext.getResourceEstimates;
import static com.facebook.presto.router.scheduler.HttpRequestSessionContext.getSerializedSessionFunctions;
import static com.google.common.base.Verify.verify;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;

public class PlanCheckerRouterPluginPrestoClient
Expand All @@ -55,6 +62,7 @@ public class PlanCheckerRouterPluginPrestoClient
private final URI nativeRouterURI;
private final Duration clientRequestTimeout;
private final boolean javaClusterFallbackEnabled;
private final boolean javaClusterQueryRetryEnabled;

@Inject
public PlanCheckerRouterPluginPrestoClient(PlanCheckerRouterPluginConfig planCheckerRouterPluginConfig)
Expand All @@ -68,12 +76,13 @@ public PlanCheckerRouterPluginPrestoClient(PlanCheckerRouterPluginConfig planChe
requireNonNull(planCheckerRouterPluginConfig.getNativeRouterURI(), "nativeRouterURI is null");
this.clientRequestTimeout = planCheckerRouterPluginConfig.getClientRequestTimeout();
this.javaClusterFallbackEnabled = planCheckerRouterPluginConfig.isJavaClusterFallbackEnabled();
this.javaClusterQueryRetryEnabled = planCheckerRouterPluginConfig.isJavaClusterQueryRetryEnabled();
}

public Optional<URI> getCompatibleClusterURI(Map<String, List<String>> headers, String statement, Principal principal)
{
String newSql = ANALYZE_CALL + statement;
ClientSession clientSession = parseHeadersToClientSession(headers, principal);
ClientSession clientSession = parseHeadersToClientSession(headers, principal, getPlanCheckerClusterDestination());
boolean isNativeCompatible = true;
// submit initial query
try (StatementClient client = newStatementClient(httpClient, clientSession, newSql)) {
Expand Down Expand Up @@ -118,13 +127,102 @@ public Optional<URI> getCompatibleClusterURI(Map<String, List<String>> headers,
if (isNativeCompatible) {
log.debug("Native compatible, routing to native-clusters router: [%s]", nativeRouterURI);
nativeClusterRedirectRequests.update(1L);
if (javaClusterQueryRetryEnabled) {
return buildNativeRedirectURI(headers, principal, statement);
}
return Optional.of(nativeRouterURI);
}
log.debug("Native incompatible, routing to java-clusters router: [%s]", javaRouterURI);
javaClusterRedirectRequests.update(1L);
return Optional.of(javaRouterURI);
}

private Optional<URI> buildNativeRedirectURI(Map<String, List<String>> headers, Principal principal, String statement)
{
ClientSession javaSession = parseHeadersToClientSession(prepareHeadersForJavaCluster(headers), principal, javaRouterURI);
try (StatementClient client = newStatementClient(httpClient, javaSession, statement)) {
Optional<URI> redirectUri = getRedirectUriFromPostQuery(client);
return Optional.of(redirectUri.orElse(nativeRouterURI));
}
catch (Exception e) {
log.error("Error submitting query for redirect URI: {%s}", e.getMessage(), e);
return Optional.of(nativeRouterURI);
}
}

public Optional<URI> getRedirectUriFromPostQuery(StatementClient client)
{
QueryStatusInfo statusInfo = client.currentStatusInfo();
if (statusInfo == null || statusInfo.getNextUri() == null) {
return Optional.empty();
}

URI retryUri = statusInfo.getNextUri();
Map<String, List<String>> headers = client.getResponseHeaders();
OptionalLong maxAgeSeconds = extractMaxAgeInSeconds(headers);

if (!maxAgeSeconds.isPresent()) {
log.warn("Missing retryExpirationInSeconds, skipping retry URI creation.");
return Optional.empty();
}

String retryUriBase = retryUri.getScheme() + "://" + retryUri.getAuthority();
String queryId = statusInfo.getId();
String retryUrl = retryUriBase + "/v1/statement/queued/retry/" + queryId;

if (retryUrl == null || retryUrl.isEmpty()) {
log.warn("Missing retryUrl, skipping retry URI creation.");
return Optional.empty();
}

HttpUrl.Builder redirectBuilder = HttpUrl.get(nativeRouterURI)
.newBuilder()
.addQueryParameter("retryUrl", retryUrl);

maxAgeSeconds.ifPresent(expiration ->
redirectBuilder.addQueryParameter("retryExpirationInSeconds", Long.toString(expiration)));

URI redirect = redirectBuilder.build().uri();
log.info("Redirecting to combined native URI: {%s}", redirect);

return Optional.of(redirect);
}

private static OptionalLong extractMaxAgeInSeconds(Map<String, List<String>> headers)
{
if (headers == null) {
return OptionalLong.empty();
}

List<String> cacheControlList = headers.get("Cache-Control");
if (cacheControlList == null) {
return OptionalLong.empty();
}

Pattern maxAgePattern = Pattern.compile("max-age=(\\d+)");
for (String headerValue : cacheControlList) {
Matcher matcher = maxAgePattern.matcher(headerValue);
if (matcher.find()) {
return OptionalLong.of(Long.parseLong(matcher.group(1)));
}
}
return OptionalLong.empty();
}

private static Map<String, List<String>> prepareHeadersForJavaCluster(Map<String, List<String>> headers)
{
ImmutableMap.Builder<String, List<String>> builder = ImmutableMap.builder();

headers.forEach((key, value) -> {
if (!key.equalsIgnoreCase("Host")) {
builder.put(key, value);
}
});
builder.put(PRESTO_RETRY_QUERY, singletonList("true"));

return builder.build();
}

@Managed
@Nested
public CounterStat getJavaClusterRedirectRequests()
Expand All @@ -146,16 +244,22 @@ public CounterStat getFallBackToJavaClusterRedirectRequests()
return fallBackToJavaClusterRedirectRequests;
}

private ClientSession parseHeadersToClientSession(Map<String, List<String>> headers, Principal principal)
private ClientSession parseHeadersToClientSession(Map<String, List<String>> headers, Principal principal, URI destinationOverride)
{
ImmutableMap<String, String> customHeaders = headers.entrySet().stream()
.filter(entry -> entry.getKey().equalsIgnoreCase(PRESTO_RETRY_QUERY))
.collect(ImmutableMap.toImmutableMap(
Map.Entry::getKey,
e -> e.getValue().get(0)));

HttpRequestSessionContext sessionContext =
new HttpRequestSessionContext(
headers,
new SqlParserOptions(),
principal);

return new ClientSession(
getPlanCheckerClusterDestination(),
destinationOverride,
sessionContext.getIdentity().getUser(),
sessionContext.getSource(),
Optional.empty(),
Expand All @@ -174,7 +278,7 @@ private ClientSession parseHeadersToClientSession(Map<String, List<String>> head
clientRequestTimeout,
true,
getSerializedSessionFunctions(sessionContext),
ImmutableMap.of(), // todo: do we need custom headers?
customHeaders,
true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testDefault()
.setNativeRouterURI(null)
.setPlanCheckClustersURIs(null)
.setClientRequestTimeout(new Duration(2, MINUTES))
.setJavaClusterFallbackEnabled(false));
.setJavaClusterFallbackEnabled(false)
.setJavaClusterQueryRetryEnabled(false));
}

@Test
Expand All @@ -49,13 +50,15 @@ public void testExplicitPropertyMappings()
.put("plan-check-clusters-uris", "192.168.0.3, 192.168.0.4")
.put("client-request-timeout", "5m")
.put("enable-java-cluster-fallback", "true")
.put("enable-java-cluster-query-retry", "true")
.build();
PlanCheckerRouterPluginConfig expected = new PlanCheckerRouterPluginConfig()
.setJavaRouterURI(new URI("192.168.0.1"))
.setNativeRouterURI(new URI("192.168.0.2"))
.setPlanCheckClustersURIs("192.168.0.3, 192.168.0.4")
.setClientRequestTimeout(new Duration(5, MINUTES))
.setJavaClusterFallbackEnabled(true);
.setJavaClusterFallbackEnabled(true)
.setJavaClusterQueryRetryEnabled(true);
assertFullMapping(properties, expected);
}
}
Loading
Loading