diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index a3f9b639af812..8ea6c2adf895f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -344,8 +344,8 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception { HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT); // Simulate the complete data directory including ".hoodie_partition_metadata" file - File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata"); - File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata"); + File metaForP1 = new File(metaClient.getBasePath() + "/p1", ".hoodie_partition_metadata"); + File metaForP2 = new File(metaClient.getBasePath() + "/p2", ".hoodie_partition_metadata"); metaForP1.createNewFile(); metaForP2.createNewFile(); @@ -1716,8 +1716,8 @@ public void testMultiWriterForDoubleLocking() throws Exception { HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4) - .build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4) + .build()) .withAutoCommit(false) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) @@ -1818,6 +1818,29 @@ public void testReattemptOfFailedClusteringCommit() throws Exception { validateMetadata(client); } + @Test + public void testMetadataReadWithNoCompletedCommits() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + List records; + List writeStatuses; + String[] commitTimestamps = {HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime()}; + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + records = dataGen.generateInserts(commitTimestamps[0], 5); + client.startCommitWithTime(commitTimestamps[0]); + writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamps[0]).collect(); + assertNoWriteErrors(writeStatuses); + + // make all commits to inflight in metadata table. Still read should go through, just that it may not return any data. + FileCreateUtils.deleteDeltaCommit(basePath + "/.hoodie/metadata/", commitTimestamps[0]); + FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP); + assertEquals(getAllFiles(metadata(client)).stream().map(p -> p.getName()).map(n -> FSUtils.getCommitTime(n)).collect(Collectors.toSet()).size(), 0); + } + } + + /** * Ensure that the reader only reads completed instants. * @@ -2050,7 +2073,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) @@ -2078,7 +2101,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte // set hoodie.table.version to 2 in hoodie.properties file changeTableVersion(HoodieTableVersion.TWO); writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 1fb872f6835e1..4216f11048aa7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1052,8 +1052,12 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient)); Stream fileSliceStream; if (mergeFileSlices) { - fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( - partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp()); + if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) { + fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( + partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp()); + } else { + return Collections.EMPTY_LIST; + } } else { fileSliceStream = fsView.getLatestFileSlices(partition); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 4e158aaa86796..06c27019326ea 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.hudi.metadata.HoodieMetadataPayload +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal} @@ -74,7 +74,7 @@ case class HoodieFileIndex(spark: SparkSession, spark = spark, metaClient = metaClient, schemaSpec = schemaSpec, - configProperties = getConfigProperties(spark, options), + configProperties = getConfigProperties(spark, options, metaClient), queryPaths = HoodieFileIndex.getQueryPaths(options), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache @@ -254,9 +254,8 @@ case class HoodieFileIndex(spark: SparkSession, override def sizeInBytes: Long = cachedFileSize - private def isDataSkippingEnabled: Boolean = - options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), - spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + private def isDataSkippingEnabled: Boolean = HoodieFileIndex.getBooleanConfigValue(options, spark.sessionState.conf, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + "false") private def isMetadataTableEnabled: Boolean = metadataConfig.enabled() private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled @@ -267,10 +266,15 @@ case class HoodieFileIndex(spark: SparkSession, s"(isMetadataTableEnabled = $isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled") } } + } object HoodieFileIndex extends Logging { + def getBooleanConfigValue(options: Map[String, String], sqlConf: SQLConf, configKey: String, defaultValue: String) : Boolean = { + options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue)).toBoolean + } + object DataSkippingFailureMode extends Enumeration { val configName = "hoodie.fileIndex.dataSkippingFailureMode" @@ -293,16 +297,20 @@ object HoodieFileIndex extends Logging { schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) } } - def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { + private def isFilesPartitionAvailable(metaClient: HoodieTableMetaClient): Boolean = { + metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES) + } + + def getConfigProperties(spark: SparkSession, options: Map[String, String], metaClient: HoodieTableMetaClient) = { val sqlConf: SQLConf = spark.sessionState.conf val properties = new TypedProperties() // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. - properties.setProperty(HoodieMetadataConfig.ENABLE.key(), - sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) + val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && + getBooleanConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString) properties.putAll(options.filter(p => p._2 != null).asJava) + properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable)) properties } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 49d5e19b1662d..38f33bc29eefa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -98,7 +98,7 @@ class HoodieCDCRDD( metaClient.getTableConfig.cdcSupplementalLoggingMode ) - private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, metaClient) protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) .map { preCombineField => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 19027a47bfabc..9d7400fada156 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -394,6 +394,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val props = Map[String, String]( "path" -> basePath, QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL, + HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString, HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE.key -> testCase.columnStatsProcessingModeOverride ) ++ readMetadataOpts