diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 80f7cebd3a2fd..ac60606a4bc4c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -209,7 +209,7 @@ object DataSourceReadOptions { " by carefully analyzing provided partition-column predicates and deducing corresponding partition-path prefix from " + " them (if possible).") - val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty + val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable") .defaultValue("false") .markAdvanced() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index b02fecff3276e..227b585c9ef23 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -212,8 +212,8 @@ class IncrementalRelation(val sqlContext: SQLContext, // 1. the start commit is archived // 2. the end commit is archived // 3. there are files in metadata be deleted - val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, - DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean + val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key, + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index e37a1737d27ab..600feefce5c94 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -196,8 +196,8 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { // 2. the end commit is archived // 3. there are files in metadata be deleted protected lazy val fullTableScan: Boolean = { - val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, - DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean + val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key, + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean fallbackToFullTableScan && (startInstantArchived || endInstantArchived || affectedFilesInCommits.exists(fileStatus => !metaClient.getFs.exists(fileStatus.getPath))) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala index e6aa2b39fb019..f30a143de322a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala @@ -252,7 +252,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { // verify incremental query verifySQLQueries(numRecordsForFirstQuery, numRecordsForSecondQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) - commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key -> "true") + commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key -> "true") // TODO: https://issues.apache.org/jira/browse/HUDI-6657 - Investigate why below assertions fail with full table scan enabled. //verifySQLQueries(numRecordsForFirstQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala index 673468dfec8ba..75a7026ad3ce4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala @@ -164,7 +164,7 @@ class TestIncrementalReadWithFullTableScan extends HoodieSparkClientTestBase { .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) - .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), fallBackFullTableScan) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key(), fallBackFullTableScan) .load(basePath) assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count()) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index aafd4c9e3b56e..1d302fa106b68 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -46,7 +46,7 @@ import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME; import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME; -import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES; +import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN; import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT; import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE; import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL; @@ -203,9 +203,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL()) .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant()) .option(END_INSTANTTIME().key(), queryInfo.getEndInstant()) - .option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), - props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), - INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue())) + .option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), + props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), + INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())) .option(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), handlingMode.name()) .load(srcPath); } else { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 597c0195f5e80..ef903d7c647ed 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -88,9 +88,9 @@ public Dataset runIncrementalQuery(QueryInfo queryInfo) { .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()) - .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), - props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), - DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue())) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), + props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())) .load(sourcePath); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 967ad45218263..f73ee936b601c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -2358,7 +2358,7 @@ public void testHoodieIncrFallback() throws Exception { // Remove source.hoodieincr.num_instants config downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1); - downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true"); + downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key() + "=true"); //Adding this conf to make testing easier :) downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10"); downstreamCfg.operation = WriteOperationType.UPSERT;