Skip to content

Commit e01c85a

Browse files
Support for query retry on different cluster via router and planchecker
1 parent b553f71 commit e01c85a

File tree

11 files changed

+459
-161
lines changed

11 files changed

+459
-161
lines changed

presto-client/src/main/java/com/facebook/presto/client/StatementClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.common.type.TimeZoneKey;
1717
import com.facebook.presto.spi.security.SelectedRole;
1818
import jakarta.annotation.Nullable;
19+
import okhttp3.Headers;
1920

2021
import java.io.Closeable;
2122
import java.util.Map;
@@ -59,6 +60,8 @@ public interface StatementClient
5960

6061
Set<String> getDeallocatedPreparedStatements();
6162

63+
Headers getResponseHeaders();
64+
6265
@Nullable
6366
String getStartedTransactionId();
6467

presto-client/src/main/java/com/facebook/presto/client/StatementClientV1.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class StatementClientV1
114114
private final Map<String, String> addedSessionFunctions = new ConcurrentHashMap<>();
115115
private final Set<String> removedSessionFunctions = newConcurrentHashSet();
116116
private final boolean validateNextUriSource;
117+
private final Headers responseHeaders;
117118

118119
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
119120

@@ -140,6 +141,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
140141
}
141142

142143
processResponse(response.getHeaders(), response.getValue());
144+
this.responseHeaders = response.getHeaders();
143145
}
144146

145147
private Request buildQueryRequest(ClientSession session, String query)
@@ -216,6 +218,11 @@ private Request buildQueryRequest(ClientSession session, String query)
216218
return builder.build();
217219
}
218220

221+
public Headers getResponseHeaders()
222+
{
223+
return responseHeaders;
224+
}
225+
219226
@Override
220227
public String getQuery()
221228
{
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.nativetests;
15+
16+
import com.facebook.airlift.bootstrap.Bootstrap;
17+
import com.facebook.airlift.http.server.HttpServerInfo;
18+
import com.facebook.airlift.http.server.testing.TestingHttpServerModule;
19+
import com.facebook.airlift.jaxrs.JaxrsModule;
20+
import com.facebook.airlift.json.JsonCodec;
21+
import com.facebook.airlift.json.JsonModule;
22+
import com.facebook.airlift.log.Logging;
23+
import com.facebook.airlift.node.testing.TestingNodeModule;
24+
import com.facebook.presto.router.RouterModule;
25+
import com.facebook.presto.router.scheduler.CustomSchedulerManager;
26+
import com.facebook.presto.router.scheduler.PlanCheckerRouterPluginConfig;
27+
import com.facebook.presto.router.scheduler.PlanCheckerRouterPluginPrestoClient;
28+
import com.facebook.presto.router.scheduler.PlanCheckerRouterPluginSchedulerFactory;
29+
import com.facebook.presto.router.security.RouterSecurityModule;
30+
import com.facebook.presto.router.spec.GroupSpec;
31+
import com.facebook.presto.router.spec.RouterSpec;
32+
import com.facebook.presto.router.spec.SelectorRuleSpec;
33+
import com.facebook.presto.testing.QueryRunner;
34+
import com.facebook.presto.tests.AbstractTestQueryFramework;
35+
import com.facebook.presto.tests.DistributedQueryRunner;
36+
import com.google.common.collect.ImmutableList;
37+
import com.google.inject.Injector;
38+
import org.testng.annotations.BeforeClass;
39+
40+
import java.io.File;
41+
import java.io.IOException;
42+
import java.net.URI;
43+
import java.nio.file.Files;
44+
import java.nio.file.Path;
45+
import java.sql.Connection;
46+
import java.sql.SQLException;
47+
import java.sql.Statement;
48+
import java.util.List;
49+
import java.util.Optional;
50+
51+
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
52+
import static com.facebook.presto.router.scheduler.SchedulerType.CUSTOM_PLUGIN_SCHEDULER;
53+
import static java.lang.String.format;
54+
import static java.sql.DriverManager.getConnection;
55+
import static java.util.Collections.singletonList;
56+
import static org.testng.Assert.assertEquals;
57+
58+
public abstract class BasePlanCheckerTest
59+
extends AbstractTestQueryFramework
60+
{
61+
protected URI httpServerUri;
62+
protected String storageFormat;
63+
protected boolean sidecarEnabled;
64+
protected PlanCheckerRouterPluginPrestoClient planCheckerRouterPluginPrestoClient;
65+
66+
@BeforeClass
67+
public void init() throws Exception
68+
{
69+
storageFormat = System.getProperty("storageFormat", "PARQUET");
70+
sidecarEnabled = Boolean.parseBoolean(System.getProperty("sidecarEnabled", "true"));
71+
super.init();
72+
Logging.initialize();
73+
74+
URI nativeClusterURI = ((DistributedQueryRunner) getQueryRunner()).getCoordinator().getBaseUrl();
75+
URI javaClusterURI = ((DistributedQueryRunner) getExpectedQueryRunner()).getCoordinator().getBaseUrl();
76+
77+
PlanCheckerRouterPluginConfig planCheckerRouterConfig = new PlanCheckerRouterPluginConfig()
78+
.setPlanCheckClustersURIs(nativeClusterURI.toString())
79+
.setJavaRouterURI(javaClusterURI)
80+
.setNativeRouterURI(nativeClusterURI)
81+
.setJavaClusterFallbackEnabled(true)
82+
.setJavaClusterQueryRetryEnabled(true);
83+
84+
planCheckerRouterPluginPrestoClient = new PlanCheckerRouterPluginPrestoClient(planCheckerRouterConfig);
85+
86+
Path tempFile = Files.createTempFile("temp-config", ".json");
87+
File configFile = getConfigFile(singletonList(planCheckerRouterConfig.getNativeRouterURI()), tempFile.toFile());
88+
89+
Bootstrap app = new Bootstrap(
90+
new TestingNodeModule("test"),
91+
new TestingHttpServerModule(),
92+
new JsonModule(),
93+
new JaxrsModule(true),
94+
new RouterSecurityModule(),
95+
new RouterModule(Optional.of(
96+
new CustomSchedulerManager(
97+
ImmutableList.of(new PlanCheckerRouterPluginSchedulerFactory()),
98+
getPluginSchedulerConfigFile(planCheckerRouterConfig)))));
99+
100+
Injector injector = app.doNotInitializeLogging()
101+
.setRequiredConfigurationProperty("router.config-file", configFile.getAbsolutePath())
102+
.setRequiredConfigurationProperty("presto.version", "test")
103+
.quiet().initialize();
104+
105+
httpServerUri = injector.getInstance(HttpServerInfo.class).getHttpUri();
106+
}
107+
108+
protected abstract QueryRunner createQueryRunner() throws Exception;
109+
110+
protected abstract QueryRunner createExpectedQueryRunner() throws Exception;
111+
112+
@Override
113+
protected void createTables()
114+
{
115+
NativeTestsUtils.createTables(storageFormat);
116+
}
117+
118+
protected static File getConfigFile(List<URI> serverURIs, File tempFile)
119+
throws IOException
120+
{
121+
RouterSpec spec = new RouterSpec(
122+
ImmutableList.of(new GroupSpec("plan-checkers", serverURIs, Optional.empty(), Optional.empty())),
123+
ImmutableList.of(new SelectorRuleSpec(Optional.empty(), Optional.empty(), Optional.empty(), "plan-checkers")),
124+
Optional.of(CUSTOM_PLUGIN_SCHEDULER),
125+
Optional.empty(),
126+
Optional.empty());
127+
JsonCodec<RouterSpec> codec = jsonCodec(RouterSpec.class);
128+
Files.write(tempFile.toPath(), codec.toBytes(spec));
129+
return tempFile;
130+
}
131+
132+
protected static File getPluginSchedulerConfigFile(PlanCheckerRouterPluginConfig planCheckerRouterConfig)
133+
throws IOException
134+
{
135+
Path tempPluginSchedulerConfigFile = Files.createTempFile("router-scheduler-plan-checker", ".properties");
136+
Files.write(tempPluginSchedulerConfigFile, ImmutableList.of(
137+
"router-scheduler.name=plan-checker",
138+
"plan-check-clusters-uris=" + planCheckerRouterConfig.getPlanCheckClustersURIs().get(0),
139+
"router-java-url=" + planCheckerRouterConfig.getJavaRouterURI(),
140+
"router-native-url=" + planCheckerRouterConfig.getNativeRouterURI(),
141+
"enable-java-cluster-fallback=" + planCheckerRouterConfig.isJavaClusterFallbackEnabled(),
142+
"enable-java-cluster-query-retry=" + planCheckerRouterConfig.isJavaClusterQueryRetryEnabled()));
143+
return tempPluginSchedulerConfigFile.toFile();
144+
}
145+
146+
protected static Connection createConnection(URI uri)
147+
throws SQLException
148+
{
149+
String url = format("jdbc:presto://%s:%s", uri.getHost(), uri.getPort());
150+
return getConnection(url, "user", null);
151+
}
152+
153+
protected static void runQuery(String sql, URI uri)
154+
throws SQLException
155+
{
156+
runQuery(sql, uri, Optional.empty());
157+
}
158+
159+
protected static void runQuery(String sql, URI uri, Optional<String> exceptionMessage)
160+
throws SQLException
161+
{
162+
try (Connection connection = createConnection(uri);
163+
Statement statement = connection.createStatement()) {
164+
statement.executeQuery(sql);
165+
}
166+
catch (SQLException e) {
167+
if (exceptionMessage.isPresent()) {
168+
assertEquals(e.getCause().getMessage(), exceptionMessage.get());
169+
return;
170+
}
171+
throw e;
172+
}
173+
}
174+
}

presto-native-tests/src/test/java/com/facebook/presto/nativetests/NativeTestsUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
import com.facebook.presto.nativeworker.NativeQueryRunnerUtils;
1717
import com.facebook.presto.testing.QueryRunner;
18+
import com.google.common.collect.ImmutableMap;
19+
20+
import java.util.Map;
1821

1922
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder;
2023
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.nativeHiveQueryRunnerBuilder;
@@ -26,13 +29,27 @@ private NativeTestsUtils() {}
2629

2730
public static QueryRunner createNativeQueryRunner(String storageFormat, boolean charNToVarcharImplicitCast, boolean sidecarEnabled)
2831
throws Exception
32+
{
33+
return createNativeQueryRunner(storageFormat, charNToVarcharImplicitCast, sidecarEnabled, ImmutableMap.of(), ImmutableMap.of());
34+
}
35+
36+
public static QueryRunner createNativeQueryRunner(String storageFormat, boolean sidecarEnabled, Map<String, String> coordinatorProperties, Map<String, String> extraProperties)
37+
throws Exception
38+
{
39+
return createNativeQueryRunner(storageFormat, false, sidecarEnabled, ImmutableMap.of(), ImmutableMap.of());
40+
}
41+
42+
public static QueryRunner createNativeQueryRunner(String storageFormat, boolean charNToVarcharImplicitCast, boolean sidecarEnabled, Map<String, String> coordinatorProperties, Map<String, String> extraProperties)
43+
throws Exception
2944
{
3045
QueryRunner queryRunner = nativeHiveQueryRunnerBuilder()
3146
.setStorageFormat(storageFormat)
3247
.setAddStorageFormatToPath(true)
3348
.setUseThrift(true)
3449
.setImplicitCastCharNToVarchar(charNToVarcharImplicitCast)
3550
.setCoordinatorSidecarEnabled(sidecarEnabled)
51+
.setExtraCoordinatorProperties(coordinatorProperties)
52+
.setExtraProperties(extraProperties)
3653
.build();
3754
if (sidecarEnabled) {
3855
setupNativeSidecarPlugin(queryRunner);

0 commit comments

Comments
 (0)