diff --git a/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml b/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml
new file mode 100644
index 0000000000000..61ea13c18e566
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dag_name: unit-test-cow-dag
+dag_rounds: 1
+dag_intermittent_delay_mins: 10
+dag_content:
+ first_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 2
+ num_records_insert: 100
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: first_insert
+ third_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: second_insert
+ first_upsert:
+ config:
+ record_size: 70000
+ num_partitions_upsert: 1
+ repeat_count: 1
+ num_records_upsert: 100
+ type: UpsertNode
+ deps: third_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_upsert
+ first_presto_query:
+ config:
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 400
+ query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: first_hive_sync
+# first_trino_query:
+# config:
+# trino_queries:
+# query1: "select count(*) from testdb1.table1"
+# result1: 300
+# query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
+# result2: 0
+# type: TrinoQueryNode
+# deps: first_presto_query
\ No newline at end of file
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index b3a599f9bfa1d..1b371efaa839c 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -394,6 +394,16 @@
test
+
+ com.facebook.presto
+ presto-jdbc
+
+
+
+ io.trino
+ trino-jdbc
+
+
org.awaitility
awaitility
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 5e2f9812ba529..8adea6b179804 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -62,6 +62,7 @@
import java.util.Map;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
/**
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency
@@ -317,5 +318,27 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
@Parameter(names = {"--test-continuous-mode"}, description = "Tests continuous mode in deltastreamer.")
public Boolean testContinousMode = false;
+
+ @Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL in the format jdbc:presto://:// "
+ + "e.g. URL to connect to Presto running on localhost port 8080 with the catalog `hive` and the schema `sales`: "
+ + "jdbc:presto://localhost:8080/hive/sales")
+ public String prestoJdbcUrl = EMPTY_STRING;
+
+ @Parameter(names = {"--presto-jdbc-username"}, description = "Username to use for authentication")
+ public String prestoUsername = "test";
+
+ @Parameter(names = {"--presto-jdbc-password"}, description = "Password corresponding to the username to use for authentication")
+ public String prestoPassword;
+
+ @Parameter(names = {"--trino-jdbc-url"}, description = "Trino JDBC URL in the format jdbc:trino://:// "
+ + "e.g. URL to connect to Trino running on localhost port 8080 with the catalog `hive` and the schema `sales`: "
+ + "jdbc:trino://localhost:8080/hive/sales")
+ public String trinoJdbcUrl = EMPTY_STRING;
+
+ @Parameter(names = {"--trino-jdbc-username"}, description = "Username to use for authentication")
+ public String trinoUsername = "test";
+
+ @Parameter(names = {"--trino-jdbc-password"}, description = "Password corresponding to the username to use for authentication")
+ public String trinoPassword;
}
}
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index a781d19cb78c5..03182d2784b25 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -43,7 +43,7 @@ public class DeltaConfig implements Serializable {
private final SerializableConfiguration configuration;
public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
- SerializableConfiguration configuration) {
+ SerializableConfiguration configuration) {
this.deltaOutputMode = deltaOutputMode;
this.deltaInputType = deltaInputType;
this.configuration = configuration;
@@ -74,6 +74,10 @@ public static class Config {
public static final String CHILDREN = "children";
public static final String HIVE_QUERIES = "hive_queries";
public static final String HIVE_PROPERTIES = "hive_props";
+ public static final String PRESTO_QUERIES = "presto_queries";
+ public static final String PRESTO_PROPERTIES = "presto_props";
+ public static final String TRINO_QUERIES = "trino_queries";
+ public static final String TRINO_PROPERTIES = "trino_props";
private static String NUM_RECORDS_INSERT = "num_records_insert";
private static String NUM_RECORDS_UPSERT = "num_records_upsert";
private static String NUM_RECORDS_DELETE = "num_records_delete";
@@ -283,7 +287,7 @@ public Option getPreCombineField() {
public Option getPartitionField() {
return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
- : Option.of(configsMap.get(PARTITION_FIELD).toString());
+ : Option.of(configsMap.get(PARTITION_FIELD).toString());
}
public String getMergeCondition() {
@@ -319,7 +323,7 @@ public Map getOtherConfigs() {
public List> getHiveQueries() {
try {
- return (List>) this.configsMap.getOrDefault("hive_queries", new ArrayList<>());
+ return (List>) this.configsMap.getOrDefault(HIVE_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get hive queries from configs");
}
@@ -333,6 +337,30 @@ public List getHiveProperties() {
return (List) this.configsMap.getOrDefault(HIVE_PROPERTIES, new ArrayList<>());
}
+ public List getPrestoProperties() {
+ return (List) this.configsMap.getOrDefault(PRESTO_PROPERTIES, new ArrayList<>());
+ }
+
+ public List getTrinoProperties() {
+ return (List) this.configsMap.getOrDefault(TRINO_PROPERTIES, new ArrayList<>());
+ }
+
+ public List> getPrestoQueries() {
+ try {
+ return (List>) this.configsMap.getOrDefault(PRESTO_QUERIES, new ArrayList<>());
+ } catch (Exception e) {
+ throw new RuntimeException("unable to get presto queries from configs");
+ }
+ }
+
+ public List> getTrinoQueries() {
+ try {
+ return (List>) this.configsMap.getOrDefault(TRINO_QUERIES, new ArrayList<>());
+ } catch (Exception e) {
+ throw new RuntimeException("unable to get trino queries from configs");
+ }
+ }
+
@Override
public String toString() {
try {
@@ -449,6 +477,26 @@ public Builder withHiveProperties(List hiveProperties) {
return this;
}
+ public Builder withPrestoProperties(List prestoProperties) {
+ this.configsMap.put(PRESTO_PROPERTIES, prestoProperties);
+ return this;
+ }
+
+ public Builder withTrinoProperties(List trinoProperties) {
+ this.configsMap.put(TRINO_PROPERTIES, trinoProperties);
+ return this;
+ }
+
+ public Builder withPrestoQueryAndResults(List> prestoQueries) {
+ this.configsMap.put(PRESTO_QUERIES, prestoQueries);
+ return this;
+ }
+
+ public Builder withTrinoQueryAndResults(List> trinoQueries) {
+ this.configsMap.put(TRINO_QUERIES, trinoQueries);
+ return this;
+ }
+
public Builder withConfigsMap(Map configsMap) {
this.configsMap = configsMap;
return this;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
index 789d7e3423466..1d78d0fdbba8a 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
@@ -53,7 +53,13 @@
import java.util.stream.Collectors;
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME;
+import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_PROPERTIES;
+import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_QUERIES;
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.NO_DEPENDENCY_VALUE;
+import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_PROPERTIES;
+import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_QUERIES;
+import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.TRINO_PROPERTIES;
+import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.TRINO_QUERIES;
/**
* Utility class to SerDe workflow dag.
@@ -172,7 +178,8 @@ private static DagNode convertJsonToDagNode(JsonNode node, String type, String n
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
.withName(name).build();
return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
- } catch (ClassNotFoundException e) {
+ }
+ catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@@ -192,11 +199,23 @@ private static Map convertJsonNodeToMap(JsonNode node) {
while (itr.hasNext()) {
Entry entry = itr.next();
switch (entry.getKey()) {
- case DeltaConfig.Config.HIVE_QUERIES:
- configsMap.put(DeltaConfig.Config.HIVE_QUERIES, getHiveQueries(entry));
+ case HIVE_QUERIES:
+ configsMap.put(HIVE_QUERIES, getQueries(entry));
+ break;
+ case HIVE_PROPERTIES:
+ configsMap.put(HIVE_PROPERTIES, getQuerySessionProperties(entry));
+ break;
+ case PRESTO_QUERIES:
+ configsMap.put(PRESTO_QUERIES, getQueries(entry));
+ break;
+ case PRESTO_PROPERTIES:
+ configsMap.put(PRESTO_PROPERTIES, getQuerySessionProperties(entry));
+ break;
+ case TRINO_QUERIES:
+ configsMap.put(TRINO_QUERIES, getQueries(entry));
break;
- case DeltaConfig.Config.HIVE_PROPERTIES:
- configsMap.put(DeltaConfig.Config.HIVE_PROPERTIES, getProperties(entry));
+ case TRINO_PROPERTIES:
+ configsMap.put(TRINO_PROPERTIES, getQuerySessionProperties(entry));
break;
default:
configsMap.put(entry.getKey(), getValue(entry.getValue()));
@@ -206,25 +225,27 @@ private static Map convertJsonNodeToMap(JsonNode node) {
return configsMap;
}
- private static List> getHiveQueries(Entry entry) {
+ private static List> getQueries(Entry entry) {
List> queries = new ArrayList<>();
try {
List flattened = new ArrayList<>();
flattened.add(entry.getValue());
- queries = (List>)getHiveQueryMapper().readValue(flattened.toString(), List.class);
- } catch (Exception e) {
+ queries = (List>) getQueryMapper().readValue(flattened.toString(), List.class);
+ }
+ catch (Exception e) {
e.printStackTrace();
}
return queries;
}
- private static List getProperties(Entry entry) {
+ private static List getQuerySessionProperties(Entry entry) {
List properties = new ArrayList<>();
try {
List flattened = new ArrayList<>();
flattened.add(entry.getValue());
- properties = (List)getHivePropertyMapper().readValue(flattened.toString(), List.class);
- } catch (Exception e) {
+ properties = (List) getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class);
+ }
+ catch (Exception e) {
e.printStackTrace();
}
return properties;
@@ -233,15 +254,20 @@ private static List getProperties(Entry entry) {
private static Object getValue(JsonNode node) {
if (node.isInt()) {
return node.asInt();
- } else if (node.isLong()) {
+ }
+ else if (node.isLong()) {
return node.asLong();
- } else if (node.isShort()) {
+ }
+ else if (node.isShort()) {
return node.asInt();
- } else if (node.isBoolean()) {
+ }
+ else if (node.isBoolean()) {
return node.asBoolean();
- } else if (node.isDouble()) {
+ }
+ else if (node.isDouble()) {
return node.asDouble();
- } else if (node.isFloat()) {
+ }
+ else if (node.isFloat()) {
return node.asDouble();
}
return node.textValue();
@@ -254,13 +280,28 @@ private static JsonNode createJsonNode(DagNode node, String type) throws IOExcep
while (itr.hasNext()) {
Entry entry = itr.next();
switch (entry.getKey()) {
- case DeltaConfig.Config.HIVE_QUERIES:
- ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES,
- MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+ case HIVE_QUERIES:
+ ((ObjectNode) configNode).put(HIVE_QUERIES,
+ MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+ break;
+ case HIVE_PROPERTIES:
+ ((ObjectNode) configNode).put(HIVE_PROPERTIES,
+ MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
+ case PRESTO_QUERIES:
+ ((ObjectNode) configNode).put(PRESTO_QUERIES,
+ MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+ break;
+ case PRESTO_PROPERTIES:
+ ((ObjectNode) configNode).put(PRESTO_PROPERTIES,
+ MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
break;
- case DeltaConfig.Config.HIVE_PROPERTIES:
- ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES,
- MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
+ case TRINO_QUERIES:
+ ((ObjectNode) configNode).put(TRINO_QUERIES,
+ MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+ break;
+ case TRINO_PROPERTIES:
+ ((ObjectNode) configNode).put(TRINO_PROPERTIES,
+ MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
break;
default:
break;
@@ -293,21 +334,22 @@ public static String toString(InputStream inputStream) throws IOException {
return result.toString("utf-8");
}
- private static ObjectMapper getHiveQueryMapper() {
+ private static ObjectMapper getQueryMapper() {
SimpleModule module = new SimpleModule();
ObjectMapper queryMapper = new ObjectMapper();
- module.addSerializer(List.class, new HiveQuerySerializer());
- module.addDeserializer(List.class, new HiveQueryDeserializer());
+ module.addSerializer(List.class, new QuerySerializer());
+ module.addDeserializer(List.class, new QueryDeserializer());
queryMapper.registerModule(module);
return queryMapper;
}
- private static final class HiveQuerySerializer extends JsonSerializer {
+ private static final class QuerySerializer extends JsonSerializer {
Integer index = 0;
+
@Override
public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
- for (Pair pair : (List)pairs) {
+ for (Pair pair : (List) pairs) {
gen.writeStringField("query" + index, pair.getLeft().toString());
gen.writeNumberField("result" + index, Integer.parseInt(pair.getRight().toString()));
index++;
@@ -316,7 +358,7 @@ public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializ
}
}
- private static final class HiveQueryDeserializer extends JsonDeserializer {
+ private static final class QueryDeserializer extends JsonDeserializer {
@Override
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
List> pairs = new ArrayList<>();
@@ -334,7 +376,8 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw
if (fieldName.contains("query")) {
query = parser.getValueAsString();
- } else if (fieldName.contains("result")) {
+ }
+ else if (fieldName.contains("result")) {
result = parser.getValueAsInt();
pairs.add(Pair.of(query, result));
}
@@ -344,21 +387,22 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw
}
}
- private static ObjectMapper getHivePropertyMapper() {
+ private static ObjectMapper getQueryEnginePropertyMapper() {
SimpleModule module = new SimpleModule();
ObjectMapper propMapper = new ObjectMapper();
- module.addSerializer(List.class, new HivePropertySerializer());
- module.addDeserializer(List.class, new HivePropertyDeserializer());
+ module.addSerializer(List.class, new QueryEnginePropertySerializer());
+ module.addDeserializer(List.class, new QueryEnginePropertyDeserializer());
propMapper.registerModule(module);
return propMapper;
}
- private static final class HivePropertySerializer extends JsonSerializer {
+ private static final class QueryEnginePropertySerializer extends JsonSerializer {
Integer index = 0;
+
@Override
public void serialize(List props, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
- for (String prop : (List)props) {
+ for (String prop : (List) props) {
gen.writeStringField("prop" + index, prop);
index++;
}
@@ -366,7 +410,7 @@ public void serialize(List props, JsonGenerator gen, SerializerProvider serializ
}
}
- private static final class HivePropertyDeserializer extends JsonDeserializer {
+ private static final class QueryEnginePropertyDeserializer extends JsonDeserializer {
@Override
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
List props = new ArrayList<>();
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java
new file mode 100644
index 0000000000000..c1a3d23791bd3
--- /dev/null
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+public abstract class BaseQueryNode extends DagNode {
+
+ public void setSessionProperties(List properties, Statement stmt) throws SQLException {
+ for (String prop : properties) {
+ executeStatement(prop, stmt);
+ }
+ }
+
+ public void executeAndValidateQueries(List> queriesWithResult, Statement stmt) throws SQLException {
+ for (Pair queryAndResult : queriesWithResult) {
+ log.info("Running {}", queryAndResult.getLeft());
+ ResultSet res = stmt.executeQuery(queryAndResult.getLeft());
+ if (!res.next()) {
+ log.info("res.next() was False - typically this means the query returned no rows.");
+ assert 0 == queryAndResult.getRight();
+ }
+ else {
+ Integer result = res.getInt(1);
+ if (!queryAndResult.getRight().equals(result)) {
+ throw new AssertionError(
+ "QUERY: " + queryAndResult.getLeft()
+ + " | EXPECTED RESULT = " + queryAndResult.getRight()
+ + " | ACTUAL RESULT = " + result
+ );
+ }
+ }
+ log.info("Successfully validated query!");
+ }
+ }
+
+ private void executeStatement(String query, Statement stmt) throws SQLException {
+ log.info("Executing statement {}", stmt.toString());
+ stmt.execute(query);
+ }
+}
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index 2da5c558d2cb2..fd04cc34c6866 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -19,7 +19,7 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
@@ -28,8 +28,6 @@
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
import java.sql.Statement;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
@@ -39,7 +37,7 @@
/**
* A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
*/
-public class HiveQueryNode extends DagNode {
+public class HiveQueryNode extends BaseQueryNode {
private HiveServiceProvider hiveServiceProvider;
@@ -61,37 +59,17 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
- Connection con = DriverManager.getConnection(hiveSyncConfig.getString(HIVE_URL),
- hiveSyncConfig.getString(HIVE_USER), hiveSyncConfig.getString(HIVE_PASS));
- Statement stmt = con.createStatement();
- stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
- for (String hiveProperty : this.config.getHiveProperties()) {
- executeStatement(hiveProperty, stmt);
+ try (Connection con = DriverManager.getConnection(hiveSyncConfig.getString(HIVE_URL),
+ hiveSyncConfig.getString(HIVE_USER), hiveSyncConfig.getString(HIVE_PASS))) {
+ Statement stmt = con.createStatement();
+ stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
+ setSessionProperties(this.config.getHiveProperties(), stmt);
+ executeAndValidateQueries(this.config.getHiveQueries(), stmt);
+ stmt.close();
+ this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}
- for (Pair queryAndResult : this.config.getHiveQueries()) {
- log.info("Running {}", queryAndResult.getLeft());
- ResultSet res = stmt.executeQuery(queryAndResult.getLeft());
- if (!res.next()) {
- log.info("res.next() was False - typically this means the query returned no rows.");
- assert 0 == queryAndResult.getRight();
- } else {
- Integer result = res.getInt(1);
- if (!queryAndResult.getRight().equals(result)) {
- throw new AssertionError(
- "QUERY: " + queryAndResult.getLeft()
- + " | EXPECTED RESULT = " + queryAndResult.getRight()
- + " | ACTUAL RESULT = " + result
- );
- }
- }
- log.info("Successfully validated query!");
+ catch (Exception e) {
+ throw new HoodieValidationException("Hive query validation failed due to " + e.getMessage(), e);
}
- this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}
-
- private void executeStatement(String query, Statement stmt) throws SQLException {
- log.info("Executing statement {}", stmt.toString());
- stmt.execute(query);
- }
-
}
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
new file mode 100644
index 0000000000000..9a9bafcf6fb89
--- /dev/null
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+public class PrestoQueryNode extends BaseQueryNode {
+
+ public PrestoQueryNode(DeltaConfig.Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void execute(ExecutionContext context, int curItrCount) throws Exception {
+ log.info("Executing presto query node {}", this.getName());
+ String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
+ if (StringUtils.isNullOrEmpty(url)) {
+ throw new IllegalArgumentException("Presto JDBC connection url not provided. Please set --presto-jdbc-url.");
+ }
+ String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
+ String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
+ try {
+ Class.forName("com.facebook.presto.jdbc.PrestoDriver");
+ } catch (ClassNotFoundException e) {
+ throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
+ }
+ try (Connection connection = DriverManager.getConnection(url, user, pass)) {
+ Statement stmt = connection.createStatement();
+ setSessionProperties(this.config.getPrestoProperties(), stmt);
+ executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
+ stmt.close();
+ }
+ catch (Exception e) {
+ throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java
new file mode 100644
index 0000000000000..ffcc901f67e09
--- /dev/null
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+public class TrinoQueryNode extends BaseQueryNode{
+
+ public TrinoQueryNode(DeltaConfig.Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void execute(ExecutionContext context, int curItrCount) throws Exception {
+ log.info("Executing trino query node {}", this.getName());
+ String url = context.getHoodieTestSuiteWriter().getCfg().trinoJdbcUrl;
+ if (StringUtils.isNullOrEmpty(url)) {
+ throw new IllegalArgumentException("Trino JDBC connection url not provided. Please set --trino-jdbc-url.");
+ }
+ String user = context.getHoodieTestSuiteWriter().getCfg().trinoUsername;
+ String pass = context.getHoodieTestSuiteWriter().getCfg().trinoPassword;
+ try {
+ Class.forName("io.trino.jdbc.TrinoDriver");
+ } catch (ClassNotFoundException e) {
+ throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e);
+ }
+ try (Connection connection = DriverManager.getConnection(url, user, pass)) {
+ Statement stmt = connection.createStatement();
+ setSessionProperties(this.config.getTrinoProperties(), stmt);
+ executeAndValidateQueries(this.config.getTrinoQueries(), stmt);
+ stmt.close();
+ }
+ catch (Exception e) {
+ throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
index 23691659cab24..8228c53e54786 100644
--- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
+++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
@@ -71,4 +71,24 @@ dag_content:
query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
result2: 0
type: HiveQueryNode
- deps: first_hive_sync
\ No newline at end of file
+ deps: first_hive_sync
+ first_presto_query:
+ config:
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb1.table1"
+ result1: 300
+ query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: first_hive_query
+ first_trino_query:
+ config:
+ trino_queries:
+ query1: "select count(*) from testdb1.table1"
+ result1: 300
+ query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
+ result2: 0
+ type: TrinoQueryNode
+ deps: first_presto_query
\ No newline at end of file
diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml
index 1db1813b38cb2..9f9193b5eb347 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -100,6 +100,8 @@
org.apache.hbase.thirdparty:hbase-shaded-protobuf
org.apache.htrace:htrace-core4
commons-io:commons-io
+ com.facebook.presto:presto-jdbc
+ io.trino:trino-jdbc
org.jetbrains.kotlin:kotlin-stdlib-jdk8
org.jetbrains.kotlin:kotlin-stdlib
diff --git a/pom.xml b/pom.xml
index ac10a4e872813..681e7ddb573f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,8 @@
2.10.1
org.apache.hive
2.3.1
+ 0.273
+ 390
core
4.1.1
1.6.0
@@ -1069,6 +1071,18 @@
+
+ com.facebook.presto
+ presto-jdbc
+ ${presto.version}
+
+
+
+ io.trino
+ trino-jdbc
+ ${trino.version}
+
+
org.apache.curator