From 34427d0e522bec7eee731644080bd0b5d20570dc Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 28 Sep 2022 18:50:39 -0700 Subject: [PATCH 1/5] Fixing reading from metadata table when there are no inflight commits --- .../functional/TestHoodieBackedMetadata.java | 35 +++++++++++++++---- .../metadata/HoodieTableMetadataUtil.java | 8 +++-- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index a3f9b639af812..8ea6c2adf895f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -344,8 +344,8 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception { HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT); // Simulate the complete data directory including ".hoodie_partition_metadata" file - File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata"); - File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata"); + File metaForP1 = new File(metaClient.getBasePath() + "/p1", ".hoodie_partition_metadata"); + File metaForP2 = new File(metaClient.getBasePath() + "/p2", ".hoodie_partition_metadata"); metaForP1.createNewFile(); metaForP2.createNewFile(); @@ -1716,8 +1716,8 @@ public void testMultiWriterForDoubleLocking() throws Exception { HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4) - .build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4) + .build()) .withAutoCommit(false) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) @@ -1818,6 +1818,29 @@ public void testReattemptOfFailedClusteringCommit() throws Exception { validateMetadata(client); } + @Test + public void testMetadataReadWithNoCompletedCommits() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + List records; + List writeStatuses; + String[] commitTimestamps = {HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime()}; + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + records = dataGen.generateInserts(commitTimestamps[0], 5); + client.startCommitWithTime(commitTimestamps[0]); + writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamps[0]).collect(); + assertNoWriteErrors(writeStatuses); + + // make all commits to inflight in metadata table. Still read should go through, just that it may not return any data. + FileCreateUtils.deleteDeltaCommit(basePath + "/.hoodie/metadata/", commitTimestamps[0]); + FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP); + assertEquals(getAllFiles(metadata(client)).stream().map(p -> p.getName()).map(n -> FSUtils.getCommitTime(n)).collect(Collectors.toSet()).size(), 0); + } + } + + /** * Ensure that the reader only reads completed instants. * @@ -2050,7 +2073,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) @@ -2078,7 +2101,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte // set hoodie.table.version to 2 in hoodie.properties file changeTableVersion(HoodieTableVersion.TWO); writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 1fb872f6835e1..4216f11048aa7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1052,8 +1052,12 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient)); Stream fileSliceStream; if (mergeFileSlices) { - fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( - partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp()); + if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) { + fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( + partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp()); + } else { + return Collections.EMPTY_LIST; + } } else { fileSliceStream = fsView.getLatestFileSlices(partition); } From 23d923e6b8c75781053f3f7bbc811084141f7786 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 3 Oct 2022 15:59:04 -0700 Subject: [PATCH 2/5] Fixing reading from metadata if not fully built out --- .../org/apache/hudi/HoodieFileIndex.scala | 18 +++++++++++------- .../org/apache/hudi/cdc/HoodieCDCRDD.scala | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 4e158aaa86796..a2d6af6be4e90 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.hudi.metadata.HoodieMetadataPayload +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal} @@ -74,7 +74,7 @@ case class HoodieFileIndex(spark: SparkSession, spark = spark, metaClient = metaClient, schemaSpec = schemaSpec, - configProperties = getConfigProperties(spark, options), + configProperties = getConfigProperties(spark, options, metaClient), queryPaths = HoodieFileIndex.getQueryPaths(options), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache @@ -293,16 +293,20 @@ object HoodieFileIndex extends Logging { schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) } } - def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { + private def isFilesPartitionAvailable(metaClient: HoodieTableMetaClient): Boolean = { + metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES) + } + + def getConfigProperties(spark: SparkSession, options: Map[String, String], metaClient: HoodieTableMetaClient) = { val sqlConf: SQLConf = spark.sessionState.conf val properties = new TypedProperties() // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. - properties.setProperty(HoodieMetadataConfig.ENABLE.key(), - sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) - properties.putAll(options.filter(p => p._2 != null).asJava) + val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString).toBoolean + properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable)) + properties.putAll(options.filter(p => p._2 != null && !p._1.equals(HoodieMetadataConfig.ENABLE.key())).asJava) properties } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 49d5e19b1662d..38f33bc29eefa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -98,7 +98,7 @@ class HoodieCDCRDD( metaClient.getTableConfig.cdcSupplementalLoggingMode ) - private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, metaClient) protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) .map { preCombineField => From e246d65957362860b850f1af9ef973b85bf1a4eb Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 5 Oct 2022 19:30:49 -0700 Subject: [PATCH 3/5] addressing minor comments --- .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index a2d6af6be4e90..1cf4c31378226 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -305,8 +305,8 @@ object HoodieFileIndex extends Logging { // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString).toBoolean + properties.putAll(options.filter(p => p._2 != null).asJava) properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable)) - properties.putAll(options.filter(p => p._2 != null && !p._1.equals(HoodieMetadataConfig.ENABLE.key())).asJava) properties } From d7fbaa4fed0c713ee0b0a8ba4b8900b11b89c433 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 7 Oct 2022 13:17:45 -0700 Subject: [PATCH 4/5] fixing sql conf and options interplay --- .../scala/org/apache/hudi/HoodieFileIndex.scala | 14 +++++++++----- .../org/apache/hudi/TestHoodieFileIndex.scala | 3 ++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 1cf4c31378226..67500d221c0e1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -254,9 +254,8 @@ case class HoodieFileIndex(spark: SparkSession, override def sizeInBytes: Long = cachedFileSize - private def isDataSkippingEnabled: Boolean = - options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), - spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + private def isDataSkippingEnabled: Boolean = HoodieFileIndex.getBooleanConfigValue(spark.sessionState.conf, options, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + "false") private def isMetadataTableEnabled: Boolean = metadataConfig.enabled() private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled @@ -267,10 +266,15 @@ case class HoodieFileIndex(spark: SparkSession, s"(isMetadataTableEnabled = $isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled") } } + } object HoodieFileIndex extends Logging { + def getBooleanConfigValue(sqlConf: SQLConf, options: Map[String, String], configKey: String, defaultValue: String) : Boolean = { + options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue)).toBoolean + } + object DataSkippingFailureMode extends Enumeration { val configName = "hoodie.fileIndex.dataSkippingFailureMode" @@ -303,8 +307,8 @@ object HoodieFileIndex extends Logging { // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. - val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString).toBoolean + val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && + getBooleanConfigValue(sqlConf, options, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString) properties.putAll(options.filter(p => p._2 != null).asJava) properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable)) properties diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 19027a47bfabc..6ef260adf94c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -394,6 +394,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val props = Map[String, String]( "path" -> basePath, QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL, + HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString, HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE.key -> testCase.columnStatsProcessingModeOverride ) ++ readMetadataOpts @@ -403,7 +404,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val allFilesPartitions = fileIndex.listFiles(Seq(), Seq()) assertEquals(10, allFilesPartitions.head.files.length) - if (testCase.enableDataSkipping && testCase.enableMetadata) { + if (testCase.enableDataSkipping && testCase.enableMetadata && testCase.enableColumnStats) { // We're selecting a single file that contains "id" == 1 row, which there should be // strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to // truncate search space to just a single file From 8f763c8eacdf249b7ab74c98b0a163eedd7a9aec Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 10 Oct 2022 16:47:41 -0700 Subject: [PATCH 5/5] addressing minor refactoring --- .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 6 +++--- .../test/scala/org/apache/hudi/TestHoodieFileIndex.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 67500d221c0e1..06c27019326ea 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -254,7 +254,7 @@ case class HoodieFileIndex(spark: SparkSession, override def sizeInBytes: Long = cachedFileSize - private def isDataSkippingEnabled: Boolean = HoodieFileIndex.getBooleanConfigValue(spark.sessionState.conf, options, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + private def isDataSkippingEnabled: Boolean = HoodieFileIndex.getBooleanConfigValue(options, spark.sessionState.conf, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false") private def isMetadataTableEnabled: Boolean = metadataConfig.enabled() @@ -271,7 +271,7 @@ case class HoodieFileIndex(spark: SparkSession, object HoodieFileIndex extends Logging { - def getBooleanConfigValue(sqlConf: SQLConf, options: Map[String, String], configKey: String, defaultValue: String) : Boolean = { + def getBooleanConfigValue(options: Map[String, String], sqlConf: SQLConf, configKey: String, defaultValue: String) : Boolean = { options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue)).toBoolean } @@ -308,7 +308,7 @@ object HoodieFileIndex extends Logging { // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && - getBooleanConfigValue(sqlConf, options, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString) + getBooleanConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString) properties.putAll(options.filter(p => p._2 != null).asJava) properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable)) properties diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 6ef260adf94c0..9d7400fada156 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -404,7 +404,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val allFilesPartitions = fileIndex.listFiles(Seq(), Seq()) assertEquals(10, allFilesPartitions.head.files.length) - if (testCase.enableDataSkipping && testCase.enableMetadata && testCase.enableColumnStats) { + if (testCase.enableDataSkipping && testCase.enableMetadata) { // We're selecting a single file that contains "id" == 1 row, which there should be // strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to // truncate search space to just a single file