diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 058e7c357594d..b28c10a812ec7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -122,6 +122,10 @@ object DataSourceReadOptions { .withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " + "skipping over files") + val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable") + .defaultValue("false") + .withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.") /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 19071080312bc..14df0cb38de64 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -18,16 +18,17 @@ package org.apache.hudi import org.apache.avro.Schema +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import java.util.stream.Collectors -import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hadoop.fs.{GlobPattern, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.HoodieTimer import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException -import org.apache.hadoop.fs.GlobPattern -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.table.HoodieSparkTable import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -40,11 +41,11 @@ import scala.collection.JavaConversions._ import scala.collection.mutable /** - * Relation, that implements the Hoodie incremental view. - * - * Implemented for Copy_on_write storage. - * - */ + * Relation, that implements the Hoodie incremental view. + * + * Implemented for Copy_on_write storage. + * + */ class IncrementalRelation(val sqlContext: SQLContext, val optParams: Map[String, String], val userSchema: StructType, @@ -85,7 +86,7 @@ class IncrementalRelation(val sqlContext: SQLContext, log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) val tableSchema = if (useEndInstantSchema) { - if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else + if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last) } else { schemaResolver.getTableAvroSchemaWithoutMetadataFields() @@ -165,26 +166,63 @@ class IncrementalRelation(val sqlContext: SQLContext, if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) { sqlContext.sparkContext.emptyRDD[Row] } else { - log.info("Additional Filters to be applied to incremental source are :" + filters) + log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")")) var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) - if (metaBootstrapFileIdToFullPath.nonEmpty) { - df = sqlContext.sparkSession.read - .format("hudi") - .schema(usedSchema) - .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) - .load() + val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean + + var doFullTableScan = false + + if (fallbackToFullTableScan) { + val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration); + val timer = new HoodieTimer().startTimer(); + + val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths + val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path))) + val timeTaken = timer.endTimer() + log.info("Checking if paths exists took " + timeTaken + "ms") + + val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) + val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first + + if (isInstantArchived || firstNotFoundPath.isDefined) { + doFullTableScan = true + log.info("Falling back to full table scan") + } } - if (regularFileIdToFullPath.nonEmpty) { - df = df.union(sqlContext.read.options(sOpts) + if (doFullTableScan) { + val hudiDF = sqlContext.read + .format("hudi") .schema(usedSchema) - .parquet(filteredRegularFullPaths.toList: _*) - .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.head.getTimestamp)) + .load(basePath) + .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam + optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.last.getTimestamp))) + commitsToReturn.last.getTimestamp)) + // schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order + val fieldNames : Array[String] = df.schema.fields.map(field => field.name) + df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*)) + } else { + if (metaBootstrapFileIdToFullPath.nonEmpty) { + df = sqlContext.sparkSession.read + .format("hudi") + .schema(usedSchema) + .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) + .load() + } + + if (regularFileIdToFullPath.nonEmpty) { + df = df.union(sqlContext.read.options(sOpts) + .schema(usedSchema) + .parquet(filteredRegularFullPaths.toList: _*) + .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.head.getTimestamp)) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.last.getTimestamp))) + } } filters.foldLeft(df)((e, f) => e.filter(f)).rdd diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index e8b179804dfca..ad6d40e00a31b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -34,7 +34,8 @@ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.types._ import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail} +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} @@ -708,6 +709,90 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) } + @Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = { + // Create 10 commits + for (i <- 1 to 10) { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.cleaner.commits.retained", "3") + .option("hoodie.keep.min.commits", "4") + .option("hoodie.keep.max.commits", "5") + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieMetadataConfig.ENABLE.key(), value = false) + .mode(SaveMode.Append) + .save(basePath) + } + + val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build() + /** + * State of timeline after 10 commits + * +------------------+--------------------------------------+ + * | Archived | Active Timeline | + * +------------------+--------------+-----------------------+ + * | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 | + * +------------------+--------------+-----------------------+ + * | Data cleaned | Data exists in table | + * +---------------------------------+-----------------------+ + */ + + val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9 + //Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files + var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4 + var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5 + + //Calling without the fallback should result in Path does not exist + var hoodieIncViewDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) + .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) + .load(basePath) + + val msg = "Should fail with Path does not exist" + assertThrows(classOf[AnalysisException], new Executable { + override def execute(): Unit = { + hoodieIncViewDF.count() + } + }, msg) + + //Should work with fallback enabled + hoodieIncViewDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) + .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true") + .load(basePath) + assertEquals(100, hoodieIncViewDF.count()) + + //Test out for archived commits + val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray + startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0 + endTs = completedCommits.nthInstant(1).get().getTimestamp //C5 + + //Calling without the fallback should result in Path does not exist + hoodieIncViewDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) + .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) + .load(basePath) + + assertThrows(classOf[AnalysisException], new Executable { + override def execute(): Unit = { + hoodieIncViewDF.count() + } + }, msg) + + //Should work with fallback enabled + hoodieIncViewDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) + .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true") + .load(basePath) + assertEquals(500, hoodieIncViewDF.count()) + } + def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = { val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index ebb359390be0c..0eed937635b22 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -135,7 +135,10 @@ public Pair>, String> fetchNextBatch(Option lastCkpt DataFrameReader reader = sparkSession.read().format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft()) - .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()); + .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), + props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue())); Dataset source = reader.load(srcPath); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index d63bce6584d90..7fabcaeb6f720 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.DFSPropertiesConfiguration; @@ -1739,6 +1740,54 @@ public void testJdbcSourceIncrementalFetchInContinuousMode() { } } + @Test + public void testHoodieIncrFallback() throws Exception { + String tableBasePath = dfsBasePath + "/incr_test_table"; + String downstreamTableBasePath = dfsBasePath + "/incr_test_downstream_table"; + + insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT); + HoodieDeltaStreamer.Config downstreamCfg = + TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, + WriteOperationType.BULK_INSERT, true, null); + new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); + + insertInTable(tableBasePath, 9, WriteOperationType.UPSERT); + //No change as this fails with Path not exist error + assertThrows(org.apache.spark.sql.AnalysisException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync()); + TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*", sqlContext); + + if (downstreamCfg.configs == null) { + downstreamCfg.configs = new ArrayList<>(); + } + + downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true"); + //Adding this conf to make testing easier :) + downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10"); + downstreamCfg.operation = WriteOperationType.UPSERT; + new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); + new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); + + long baseTableRecords = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").count(); + long downStreamTableRecords = sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath + "/*/*.parquet").count(); + assertEquals(baseTableRecords, downStreamTableRecords); + } + + private void insertInTable(String tableBasePath, int count, WriteOperationType operationType) throws Exception { + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, operationType, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false); + if (cfg.configs == null) { + cfg.configs = new ArrayList<>(); + } + cfg.configs.add("hoodie.cleaner.commits.retained=3"); + cfg.configs.add("hoodie.keep.min.commits=4"); + cfg.configs.add("hoodie.keep.max.commits=5"); + cfg.configs.add("hoodie.test.source.generate.inserts=true"); + + for (int i = 0; i < count; i++) { + new HoodieDeltaStreamer(cfg, jsc).sync(); + } + } + @Test public void testInsertOverwrite() throws Exception { testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE);