diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml b/docker/demo/config/test-suite/cow-spark-long-running.yaml index c25b95c8da457..795a4a5f60709 100644 --- a/docker/demo/config/test-suite/cow-spark-long-running.yaml +++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml @@ -25,11 +25,6 @@ dag_content: num_records_insert: 10000 type: SparkInsertNode deps: none - first_validate: - config: - validate_hive: false - type: ValidateDatasetNode - deps: first_insert first_upsert: config: record_size: 200 @@ -39,7 +34,7 @@ dag_content: num_records_upsert: 3000 num_partitions_upsert: 50 type: SparkUpsertNode - deps: first_validate + deps: first_insert first_delete: config: num_partitions_delete: 50 @@ -48,6 +43,7 @@ dag_content: deps: first_upsert second_validate: config: + validate_once_every_itr : 5 validate_hive: false delete_input_data: true type: ValidateDatasetNode diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml index 68d14a02dc54b..09dd6168c985e 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml @@ -47,11 +47,6 @@ dag_content: engine: "mr" type: HiveSyncNode deps: third_insert - first_validate: - config: - validate_hive: false - type: ValidateDatasetNode - deps: first_hive_sync first_upsert: config: record_size: 1000 @@ -61,7 +56,7 @@ dag_content: num_records_upsert: 100 num_partitions_upsert: 1 type: UpsertNode - deps: first_validate + deps: first_hive_sync first_delete: config: num_partitions_delete: 50 @@ -76,6 +71,7 @@ dag_content: deps: first_delete second_validate: config: + validate_once_every_itr : 5 validate_hive: true delete_input_data: true type: ValidateDatasetNode diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml index 0212fdf43c512..b2ab525b1af65 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml @@ -59,6 +59,7 @@ dag_content: deps: first_upsert second_validate: config: + validate_once_every_itr : 5 validate_hive: false delete_input_data: true type: ValidateDatasetNode diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml index d7b11194736e9..b8f2b686066c3 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml @@ -59,6 +59,7 @@ dag_content: deps: first_upsert second_validate: config: + validate_once_every_itr : 5 validate_hive: false delete_input_data: true type: ValidateDatasetNode diff --git a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml index 77898640ea144..a20870f262d8b 100644 --- a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml +++ b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml @@ -62,6 +62,7 @@ dag_content: deps: first_upsert second_validate: config: + validate_once_every_itr : 5 validate_hive: false delete_input_data: false type: ValidateDatasetNode diff --git a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml index 4b2ee7ad13cc4..1c2f44b060036 100644 --- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml +++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml @@ -41,11 +41,6 @@ dag_content: num_records_insert: 300 deps: second_insert type: InsertNode - first_validate: - config: - validate_hive: false - type: ValidateDatasetNode - deps: third_insert first_upsert: config: record_size: 1000 @@ -55,7 +50,7 @@ dag_content: num_records_upsert: 100 num_partitions_upsert: 1 type: UpsertNode - deps: first_validate + deps: third_insert first_delete: config: num_partitions_delete: 1 @@ -64,6 +59,7 @@ dag_content: deps: first_upsert second_validate: config: + validate_once_every_itr : 5 validate_hive: false delete_input_data: true type: ValidateDatasetNode diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 2c39f5f93a52c..d7280402d2d5d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -89,6 +89,7 @@ public static class Config { private static String START_PARTITION = "start_partition"; private static String DELETE_INPUT_DATA = "delete_input_data"; private static String VALIDATE_HIVE = "validate_hive"; + private static String VALIDATE_ONCE_EVERY_ITR = "validate_once_every_itr"; private static String EXECUTE_ITR_COUNT = "execute_itr_count"; private static String VALIDATE_ARCHIVAL = "validate_archival"; private static String VALIDATE_CLEAN = "validate_clean"; @@ -216,6 +217,10 @@ public boolean isValidateHive() { return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString()); } + public int validateOnceEveryIteration() { + return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 1).toString()); + } + public boolean isValidateFullData() { return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString()); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java index 09d44d986e183..de58bf6a1f205 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java @@ -74,81 +74,84 @@ public abstract Dataset getDatasetToValidate(SparkSession session, Executio @Override public void execute(ExecutionContext context, int curItrCount) throws Exception { - - SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); - - // todo: Fix partitioning schemes. For now, assumes data based partitioning. - String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*"; - log.warn("Validation using data from input path " + inputPath); - // listing batches to be validated - String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; - if (log.isDebugEnabled()) { - FileSystem fs = new Path(inputPathStr) - .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); - FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); - log.info("fileStatuses length: " + fileStatuses.length); - for (FileStatus fileStatus : fileStatuses) { - log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString()); - } - } - - Dataset inputSnapshotDf = getInputDf(context, session, inputPath); - - // read from hudi and remove meta columns. - Dataset trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema()); - if (config.isValidateFullData()) { - log.debug("Validating full dataset"); - Dataset exceptInputDf = inputSnapshotDf.except(trimmedHudiDf); - Dataset exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf); - long exceptInputCount = exceptInputDf.count(); - long exceptHudiCount = exceptHudiDf.count(); - log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount); - if (exceptInputCount != 0 || exceptHudiCount != 0) { - log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count() - + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount); - throw new AssertionError("Hudi contents does not match contents input data. "); - } - } else { - Dataset intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf); - long inputCount = inputSnapshotDf.count(); - long outputCount = trimmedHudiDf.count(); - log.debug("Input count: " + inputCount + "; output count: " + outputCount); - // the intersected df should be same as inputDf. if not, there is some mismatch. - if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) { - log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount); - throw new AssertionError("Hudi contents does not match contents input data. "); + int validateOnceEveryItr = config.validateOnceEveryIteration(); + int itrCountToExecute = config.getIterationCountToExecute(); + if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) || + (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) { + SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); + // todo: Fix partitioning schemes. For now, assumes data based partitioning. + String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*"; + log.warn("Validation using data from input path " + inputPath); + // listing batches to be validated + String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + if (log.isDebugEnabled()) { + FileSystem fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); + log.info("fileStatuses length: " + fileStatuses.length); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString()); + } } - if (config.isValidateHive()) { - String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key()); - String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key()); - log.warn("Validating hive table with db : " + database + " and table : " + tableName); - session.sql("REFRESH TABLE " + database + "." + tableName); - Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + - "test_suite_source_ordering_field FROM " + database + "." + tableName); - Dataset reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", - "_hoodie_is_deleted", "test_suite_source_ordering_field"); - - Dataset intersectedHiveDf = reorderedInputDf.intersect(cowDf); - outputCount = trimmedHudiDf.count(); - log.warn("Input count: " + inputCount + "; output count: " + outputCount); + Dataset inputSnapshotDf = getInputDf(context, session, inputPath); + + // read from hudi and remove meta columns. + Dataset trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema()); + if (config.isValidateFullData()) { + log.debug("Validating full dataset"); + Dataset exceptInputDf = inputSnapshotDf.except(trimmedHudiDf); + Dataset exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf); + long exceptInputCount = exceptInputDf.count(); + long exceptHudiCount = exceptHudiDf.count(); + log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount); + if (exceptInputCount != 0 || exceptHudiCount != 0) { + log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count() + + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount); + throw new AssertionError("Hudi contents does not match contents input data. "); + } + } else { + Dataset intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf); + long inputCount = inputSnapshotDf.count(); + long outputCount = trimmedHudiDf.count(); + log.debug("Input count: " + inputCount + "; output count: " + outputCount); // the intersected df should be same as inputDf. if not, there is some mismatch. - if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) { - log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount); - throw new AssertionError("Hudi hive table contents does not match contents input data. "); + if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount); + throw new AssertionError("Hudi contents does not match contents input data. "); } - } - // if delete input data is enabled, erase input data. - if (config.isDeleteInputData()) { - // clean up input data for current group of writes. - inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; - FileSystem fs = new Path(inputPathStr) - .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); - FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); - for (FileStatus fileStatus : fileStatuses) { - log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); - fs.delete(fileStatus.getPath(), true); + if (config.isValidateHive()) { + String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key()); + String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key()); + log.warn("Validating hive table with db : " + database + " and table : " + tableName); + session.sql("REFRESH TABLE " + database + "." + tableName); + Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + + "test_suite_source_ordering_field FROM " + database + "." + tableName); + Dataset reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", + "_hoodie_is_deleted", "test_suite_source_ordering_field"); + + Dataset intersectedHiveDf = reorderedInputDf.intersect(cowDf); + outputCount = trimmedHudiDf.count(); + log.warn("Input count: " + inputCount + "; output count: " + outputCount); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) { + log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount); + throw new AssertionError("Hudi hive table contents does not match contents input data. "); + } + } + + // if delete input data is enabled, erase input data. + if (config.isDeleteInputData()) { + // clean up input data for current group of writes. + inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + FileSystem fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); + fs.delete(fileStatus.getPath(), true); + } } } }