diff --git a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml index 4903e3650c144..a19617ef135c5 100644 --- a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml +++ b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml @@ -45,9 +45,21 @@ dag_content: delete_input_data: false type: ValidateDatasetNode deps: first_insert + first_presto_query: + config: + execute_itr_count: 5 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 30000 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: execute_itr_count: 5 delete_input_data: true type: ValidateAsyncOperations - deps: second_validate \ No newline at end of file + deps: first_presto_query \ No newline at end of file diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml index 8b82415982f90..6e94b05a698ae 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: deltastreamer-long-running-multi-partitions.yaml -dag_rounds: 50 +dag_rounds: 20 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -71,7 +71,7 @@ dag_content: deps: first_delete second_validate: config: - validate_once_every_itr : 5 + execute_itr_count: 20 validate_hive: true delete_input_data: true max_wait_time_for_deltastreamer_catch_up_ms: 600000 @@ -79,7 +79,7 @@ dag_content: deps: second_hive_sync last_validate: config: - execute_itr_count: 50 + execute_itr_count: 20 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml index 031664cd15c99..9ba6993e1d500 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: deltastreamer-long-running-multi-partitions.yaml -dag_rounds: 30 +dag_rounds: 20 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -65,9 +65,21 @@ dag_content: max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 20 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 7600 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: - execute_itr_count: 30 + execute_itr_count: 20 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml index c23775b2ce546..9ba6993e1d500 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: deltastreamer-long-running-multi-partitions.yaml -dag_rounds: 50 +dag_rounds: 20 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -65,9 +65,21 @@ dag_content: max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 20 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 7600 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: - execute_itr_count: 50 + execute_itr_count: 20 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml index 2fc68596d84a4..b0207920320c9 100644 --- a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml +++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml @@ -17,7 +17,7 @@ # to be used with test-aggressive-clean-archival.properties dag_name: deltastreamer-medium-clustering.yaml -dag_rounds: 20 +dag_rounds: 15 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -62,14 +62,27 @@ dag_content: deps: first_upsert second_validate: config: + validate_once_every_itr: 3 validate_hive: false delete_input_data: true max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 15 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 3600 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: - execute_itr_count: 20 + execute_itr_count: 15 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml index db7edb8f8f28c..563640299144e 100644 --- a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml +++ b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml @@ -17,7 +17,7 @@ # to be used with test-aggressive-clean-archival.properties dag_name: deltastreamer-long-running-multi-partitions.yaml -dag_rounds: 20 +dag_rounds: 15 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -68,9 +68,21 @@ dag_content: max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 15 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 3600 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: - execute_itr_count: 20 + execute_itr_count: 15 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml b/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml index a8be72e108136..8d42eea877b85 100644 --- a/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml +++ b/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml @@ -56,8 +56,20 @@ dag_content: delete_input_data: true type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 6 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 11000 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: execute_itr_count: 6 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml index 102807ec435be..4fefcc497d32b 100644 --- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml +++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: detlastreamer-long-running-example.yaml -dag_rounds: 50 +dag_rounds: 20 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -65,9 +65,21 @@ dag_content: max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 20 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 3600 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: - execute_itr_count: 50 + execute_itr_count: 20 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/simple-clustering.yaml b/docker/demo/config/test-suite/simple-clustering.yaml index 4ede6394cf752..96f741ecc56a5 100644 --- a/docker/demo/config/test-suite/simple-clustering.yaml +++ b/docker/demo/config/test-suite/simple-clustering.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: simple-clustering.yaml -dag_rounds: 30 +dag_rounds: 15 dag_intermittent_delay_mins: 0 dag_content: first_insert: @@ -54,7 +54,7 @@ dag_content: deps: first_delete first_cluster: config: - execute_itr_count: 25 + execute_itr_count: 10 type: ClusteringNode deps: first_validate second_validate: @@ -62,3 +62,15 @@ dag_content: validate_hive: false type: ValidateDatasetNode deps: first_cluster + first_presto_query: + config: + validate_once_every_itr: 5 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 8300 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate diff --git a/docker/demo/config/test-suite/simple-deltastreamer.yaml b/docker/demo/config/test-suite/simple-deltastreamer.yaml index 11b7f17d34a4d..1215b337c83a0 100644 --- a/docker/demo/config/test-suite/simple-deltastreamer.yaml +++ b/docker/demo/config/test-suite/simple-deltastreamer.yaml @@ -68,3 +68,15 @@ dag_content: delete_input_data: true type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + validate_once_every_itr: 3 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 9600 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate diff --git a/docker/demo/config/test-suite/spark-immutable-dataset.yaml b/docker/demo/config/test-suite/spark-immutable-dataset.yaml index d6cbf1b244de5..b609f3dc0886d 100644 --- a/docker/demo/config/test-suite/spark-immutable-dataset.yaml +++ b/docker/demo/config/test-suite/spark-immutable-dataset.yaml @@ -45,9 +45,21 @@ dag_content: delete_input_data: false type: ValidateDatasetNode deps: first_insert + first_presto_query: + config: + execute_itr_count: 5 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 48000 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: execute_itr_count: 5 delete_input_data: true type: ValidateAsyncOperations - deps: second_validate \ No newline at end of file + deps: first_presto_query \ No newline at end of file diff --git a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml index 947bbdab86b43..693d7bf22710a 100644 --- a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml +++ b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml @@ -48,6 +48,18 @@ dag_content: max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 6 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 6000 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: execute_itr_count: 6 diff --git a/docker/demo/config/test-suite/spark-long-running.yaml b/docker/demo/config/test-suite/spark-long-running.yaml index 2ffef557815c7..52aeb92a7f3e7 100644 --- a/docker/demo/config/test-suite/spark-long-running.yaml +++ b/docker/demo/config/test-suite/spark-long-running.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: cow-spark-deltastreamer-long-running-multi-partitions.yaml -dag_rounds: 30 +dag_rounds: 20 dag_intermittent_delay_mins: 0 dag_content: first_insert: @@ -49,9 +49,21 @@ dag_content: max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 30 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 189000 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: execute_itr_count: 30 max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/spark-medium-clustering.yaml b/docker/demo/config/test-suite/spark-medium-clustering.yaml index 09537a23d553e..3045f7c4b9542 100644 --- a/docker/demo/config/test-suite/spark-medium-clustering.yaml +++ b/docker/demo/config/test-suite/spark-medium-clustering.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: spark-medium-clustering.yaml -dag_rounds: 20 +dag_rounds: 15 dag_intermittent_delay_mins: 0 dag_content: first_insert: @@ -52,8 +52,20 @@ dag_content: delete_input_data: true type: ValidateDatasetNode deps: first_delete + first_presto_query: + config: + execute_itr_count: 20 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 146000 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate last_validate: config: execute_itr_count: 20 type: ValidateAsyncOperations - deps: second_validate + deps: first_presto_query diff --git a/docker/demo/config/test-suite/spark-simple.yaml b/docker/demo/config/test-suite/spark-simple.yaml index 192adcf377dc0..ebd1cd2d4d3ca 100644 --- a/docker/demo/config/test-suite/spark-simple.yaml +++ b/docker/demo/config/test-suite/spark-simple.yaml @@ -51,4 +51,16 @@ dag_content: validate_hive: false delete_input_data: false type: ValidateDatasetNode - deps: first_delete \ No newline at end of file + deps: first_delete + first_presto_query: + config: + execute_itr_count: 1 + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 120 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: second_validate \ No newline at end of file diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index e7e28d3dec28b..b911f116d3794 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -319,6 +320,9 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config { @Parameter(names = {"--test-continuous-mode"}, description = "Tests continuous mode in deltastreamer.") public Boolean testContinousMode = false; + @Parameter(names = {"--enable-presto-validation"}, description = "Enables presto validation") + public Boolean enablePrestoValidation = false; + @Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL in the format jdbc:presto://:// " + "e.g. URL to connect to Presto running on localhost port 8080 with the catalog `hive` and the schema `sales`: " + "jdbc:presto://localhost:8080/hive/sales") @@ -343,5 +347,8 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config { @Parameter(names = {"--index-type"}, description = "Index type to use for writes") public String indexType = "SIMPLE"; + + @Parameter(names = {"--enable-metadata-on-read"}, description = "Enable's metadata for queries") + public Boolean enableMetadataOnRead = HoodieMetadataConfig.ENABLE.defaultValue(); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java index 94f0a51a4dba9..45f087717cd8a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java @@ -35,25 +35,33 @@ public PrestoQueryNode(DeltaConfig.Config config) { @Override public void execute(ExecutionContext context, int curItrCount) throws Exception { - log.info("Executing presto query node {}", this.getName()); - String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl; - if (StringUtils.isNullOrEmpty(url)) { - throw new IllegalArgumentException("Presto JDBC connection url not provided. Please set --presto-jdbc-url."); + if (!context.getHoodieTestSuiteWriter().getCfg().enablePrestoValidation) { + return; } - String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername; - String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword; - try { - Class.forName("com.facebook.presto.jdbc.PrestoDriver"); - } catch (ClassNotFoundException e) { - throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); - } - try (Connection connection = DriverManager.getConnection(url, user, pass)) { - Statement stmt = connection.createStatement(); - setSessionProperties(this.config.getPrestoProperties(), stmt); - executeAndValidateQueries(this.config.getPrestoQueries(), stmt); - stmt.close(); - } catch (Exception e) { - throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); + int validateOnceEveryItr = config.validateOnceEveryIteration(); + int itrCountToExecute = config.getIterationCountToExecute(); + if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) + || (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) { + log.info("Executing presto query node {}", this.getName()); + String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl; + if (StringUtils.isNullOrEmpty(url)) { + throw new IllegalArgumentException("Presto JDBC connection url not provided. Please set --presto-jdbc-url."); + } + String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername; + String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword; + try { + Class.forName("com.facebook.presto.jdbc.PrestoDriver"); + } catch (ClassNotFoundException e) { + throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); + } + try (Connection connection = DriverManager.getConnection(url, user, pass)) { + Statement stmt = connection.createStatement(); + setSessionProperties(this.config.getPrestoProperties(), stmt); + executeAndValidateQueries(this.config.getPrestoQueries(), stmt); + stmt.close(); + } catch (Exception e) { + throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); + } } } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java index 358abb36f9cdc..bd50616d142a4 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java @@ -18,11 +18,12 @@ package org.apache.hudi.integ.testsuite.dag.nodes; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -35,15 +36,15 @@ */ public class ValidateDatasetNode extends BaseValidateDatasetNode { - private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class); + private static final Logger LOG = LoggerFactory.getLogger(ValidateDatasetNode.class); - public ValidateDatasetNode(Config config) { + public ValidateDatasetNode(DeltaConfig.Config config) { super(config); } @Override public Logger getLogger() { - return log; + return LOG; } @Override @@ -51,7 +52,7 @@ public Dataset getDatasetToValidate(SparkSession session, ExecutionContext StructType inputSchema) { String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key()); String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + (partitionPathField.isEmpty() ? "/" : "/*/*/*"); - Dataset hudiDf = session.read().option(HoodieMetadataConfig.ENABLE.key(), String.valueOf(config.isEnableMetadataValidate())) + Dataset hudiDf = session.read().option(HoodieMetadataConfig.ENABLE.key(), String.valueOf(context.getHoodieTestSuiteWriter().getCfg().enableMetadataOnRead)) .format("hudi").load(hudiPath); return hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index 38751bda5a2a6..6c1d39e2f6c37 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -72,6 +72,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) .option(HoodieIndexConfig.INDEX_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.indexType) .option(DataSourceWriteOptions.OPERATION.key, getOperation()) + .option(HoodieIndexConfig.INDEX_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.indexType) .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .mode(SaveMode.Append) .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)