diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index b64d8ec090008..483e9e6f2249e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -95,6 +95,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions; /** @@ -377,7 +378,25 @@ protected void initializeIfNeeded(HoodieTableMeta if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); } + return; } + + // if metadata table exists, then check if any of the enabled partition types needs to be initialized + Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); + List partitionsToInit = this.enabledPartitionTypes.stream() + .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) + .collect(Collectors.toList()); + + // if there are no partitions to initialize or there is a pending operation, then don't initialize in this round + if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { + return; + } + + String createInstantTime = getInitialCommitInstantTime(dataMetaClient); + initTableMetadata(); // re-init certain flags in BaseTableMetadata + initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit); + initialCommit(createInstantTime, partitionsToInit); + updateInitializedPartitionsInTableConfig(partitionsToInit); } private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient, @@ -502,26 +521,11 @@ private boolean isCommitRevertedByInFlightAction( */ private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, Option inflightInstantTimestamp) throws IOException { - ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); - - // We can only initialize if there are no pending operations on the dataset - List pendingDataInstant = dataMetaClient.getActiveTimeline() - .getInstants().filter(i -> !i.isCompleted()) - .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) - .collect(Collectors.toList()); - - if (!pendingDataInstant.isEmpty()) { - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); - LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: " - + Arrays.toString(pendingDataInstant.toArray())); + if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } - // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the timestamp of the latest completed action. - String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() - .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); + String createInstantTime = getInitialCommitInstantTime(dataMetaClient); initializeMetaClient(dataWriteConfig.getMetadataConfig().populateMetaFields()); initTableMetadata(); @@ -535,15 +539,38 @@ private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, enabledPartitionTypes = this.enabledPartitionTypes; } initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - - // During cold startup, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out - // of these large number of files and calling the existing update(HoodieCommitMetadata) function does not scale - // well. Hence, we have a special commit just for the initialization scenario. initialCommit(createInstantTime, enabledPartitionTypes); updateInitializedPartitionsInTableConfig(enabledPartitionTypes); return true; } + private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { + // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit + // Otherwise, we use the timestamp of the latest completed action. + String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() + .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); + return createInstantTime; + } + + private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option inflightInstantTimestamp) { + ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); + + // We can only initialize if there are no pending operations on the dataset + List pendingDataInstant = dataMetaClient.getActiveTimeline() + .getInstants().filter(i -> !i.isCompleted()) + .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) + .collect(Collectors.toList()); + + if (!pendingDataInstant.isEmpty()) { + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); + LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: " + + Arrays.toString(pendingDataInstant.toArray())); + return true; + } + return false; + } + private void updateInitializedPartitionsInTableConfig(List partitionTypes) { Set completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig()); completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet())); @@ -973,8 +1000,12 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan } /** - * This is invoked to initialize metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to - * other regular commits. + * This is invoked to initialize metadata table for a dataset. + * Initial commit has special handling mechanism due to its scale compared to other regular commits. + * During cold startup, the list of files to be committed can be huge. + * So creating a HoodieCommitMetadata out of these large number of files, + * and calling the existing update(HoodieCommitMetadata) function does not scale well. + * Hence, we have a special commit just for the initialization scenario. */ private void initialCommit(String createInstantTime, List partitionTypes) { // List all partitions in the basePath of the containing dataset @@ -992,18 +1023,17 @@ private void initialCommit(String createInstantTime, List }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); final Map> partitionToRecordsMap = new HashMap<>(); - // Record which saves the list of all partitions - HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); - if (partitions.isEmpty()) { - // in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit - final HoodieData allPartitionRecordsRDD = engineContext.parallelize( - Collections.singletonList(allPartitionRecord), 1); - partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD); - commit(createInstantTime, partitionToRecordsMap, false); - return; - } - if (partitionTypes.contains(MetadataPartitionType.FILES)) { + // Record which saves the list of all partitions + HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); + if (partitions.isEmpty()) { + // in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit + final HoodieData allPartitionRecordsRDD = engineContext.parallelize( + Collections.singletonList(allPartitionRecord), 1); + partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD); + commit(createInstantTime, partitionToRecordsMap, false); + return; + } HoodieData filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 49c289d2bb430..3cebdc27513f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -70,7 +70,6 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metadata.HoodieTableMetadata; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -99,6 +98,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS; +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists; + /** * Abstract implementation of a HoodieTable. * @@ -814,14 +820,60 @@ public void maybeDeleteMetadataTable() { if (shouldExecuteMetadataTableDeletion()) { try { LOG.info("Deleting metadata table because it is disabled in writer."); - HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); - clearMetadataTablePartitionsConfig(); + deleteMetadataTable(config.getBasePath(), context); + clearMetadataTablePartitionsConfig(Option.empty(), true); } catch (HoodieMetadataException e) { throw new HoodieException("Failed to delete metadata table.", e); } } } + /** + * Deletes the metadata partition if the writer disables any metadata index. + */ + public void deleteMetadataIndexIfNecessary() { + Stream.of(MetadataPartitionType.values()).forEach(partitionType -> { + if (shouldDeleteMetadataPartition(partitionType)) { + try { + LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name()); + if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) { + deleteMetadataPartition(metaClient.getBasePath(), context, partitionType); + } + clearMetadataTablePartitionsConfig(Option.of(partitionType), false); + } catch (HoodieMetadataException e) { + throw new HoodieException("Failed to delete metadata partition: " + partitionType.name(), e); + } + } + }); + } + + private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionType) { + // Only delete metadata table partition when all the following conditions are met: + // (1) This is data table. + // (2) Index corresponding to this metadata partition is disabled in HoodieWriteConfig. + // (3) The completed metadata partitions in table config contains this partition. + // NOTE: Inflight metadata partitions are not considered as they could have been inflight due to async indexer. + if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) || !config.isMetadataTableEnabled()) { + return false; + } + boolean metadataIndexDisabled; + switch (partitionType) { + // NOTE: FILES partition type is always considered in sync with hoodie.metadata.enable. + // It cannot be the case that metadata is enabled but FILES is disabled. + case COLUMN_STATS: + metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled(); + break; + case BLOOM_FILTERS: + metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled(); + break; + default: + LOG.debug("Not a valid metadata partition type: " + partitionType.name()); + return false; + } + return metadataIndexDisabled + && getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(partitionType.getPartitionPath()); + } + private boolean shouldExecuteMetadataTableDeletion() { // Only execute metadata table deletion when all the following conditions are met // (1) This is data table @@ -831,17 +883,23 @@ private boolean shouldExecuteMetadataTableDeletion() { // partitions are ready to use return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) && !config.isMetadataTableEnabled() - && (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS) + && (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS) || !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions())); } /** * Clears hoodie.table.metadata.partitions in hoodie.properties */ - private void clearMetadataTablePartitionsConfig() { - LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties"); - metaClient.getTableConfig().setValue( - HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING); + private void clearMetadataTablePartitionsConfig(Option partitionType, boolean clearAll) { + if (clearAll) { + LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties"); + metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING); + HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + return; + } + Set completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig()); + completedPartitions.remove(partitionType.get().getPartitionPath()); + metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions)); HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index f749ce490ea51..f1e43b9d30d42 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -105,6 +105,9 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con public Option getMetadataWriter(String triggeringInstantTimestamp, Option actionMetadata) { if (config.isMetadataTableEnabled()) { + // even with metadata enabled, some index could have been disabled + // delete metadata partitions corresponding to such indexes + deleteMetadataIndexIfNecessary(); return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp))); } else { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 9e4bb14a4cf99..71efe89a055e1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -113,6 +113,9 @@ public Option getMetad // existence after the creation is needed. final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp)); + // even with metadata enabled, some index could have been disabled + // delete metadata partitions corresponding to such indexes + deleteMetadataIndexIfNecessary(); try { if (isMetadataTableExists || metaClient.getFs().exists(new Path( HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 61c2775f988a9..35ae6da1cb0ce 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -146,6 +146,10 @@ import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; +import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -202,6 +206,119 @@ public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRol validateMetadata(testTable, true); } + @Test + public void testTurnOffMetadataIndexAfterEnable() throws Exception { + initPath(); + HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) + .build(); + init(COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + // metadata enabled with only FILES partition + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) { + // Insert + String commitTime = "0000001"; + List records = dataGen.generateInserts(commitTime, 20); + client.startCommitWithTime(commitTime); + List writeStatuses = client.insert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + + // Upsert + commitTime = "0000002"; + client.startCommitWithTime(commitTime); + records = dataGen.generateUniqueUpdates(commitTime, 10); + writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + // check table config + HoodieTableMetaClient.reload(metaClient); + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + assertFalse(tableConfig.getMetadataPartitions().isEmpty()); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath())); + + // enable column stats and run 1 upserts + HoodieWriteConfig cfgWithColStatsEnabled = HoodieWriteConfig.newBuilder() + .withProperties(cfg.getProps()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withProperties(cfg.getMetadataConfig().getProps()) + .withMetadataIndexColumnStats(true) + .build()) + .build(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithColStatsEnabled)) { + // Upsert + String commitTime = "0000003"; + client.startCommitWithTime(commitTime); + List records = dataGen.generateUniqueUpdates(commitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + // check table config + HoodieTableMetaClient.reload(metaClient); + tableConfig = metaClient.getTableConfig(); + assertFalse(tableConfig.getMetadataPartitions().isEmpty()); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath())); + + // disable column stats and run 1 upsert + HoodieWriteConfig cfgWithColStatsDisabled = HoodieWriteConfig.newBuilder() + .withProperties(cfg.getProps()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withProperties(cfg.getMetadataConfig().getProps()) + .withMetadataIndexColumnStats(false) + .build()) + .build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithColStatsDisabled)) { + // Upsert + String commitTime = "0000004"; + client.startCommitWithTime(commitTime); + List records = dataGen.generateUniqueUpdates(commitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + // check table config + HoodieTableMetaClient.reload(metaClient); + tableConfig = metaClient.getTableConfig(); + assertFalse(tableConfig.getMetadataPartitions().isEmpty()); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath())); + + // enable bloom filter as well as column stats and run 1 upsert + HoodieWriteConfig cfgWithBloomFilterEnabled = HoodieWriteConfig.newBuilder() + .withProperties(cfgWithColStatsEnabled.getProps()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withProperties(cfgWithColStatsEnabled.getMetadataConfig().getProps()) + .withMetadataIndexBloomFilter(true) + .build()) + .build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithBloomFilterEnabled)) { + // Upsert + String commitTime = "0000005"; + client.startCommitWithTime(commitTime); + List records = dataGen.generateUniqueUpdates(commitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + // check table config + HoodieTableMetaClient.reload(metaClient); + tableConfig = metaClient.getTableConfig(); + assertFalse(tableConfig.getMetadataPartitions().isEmpty()); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath())); + } + @Test public void testTurnOffMetadataTableAfterEnable() throws Exception { init(COPY_ON_WRITE, true); @@ -549,13 +666,13 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I // metadata writer to delete column_stats partition HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); - metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); + metadataWriter.deletePartitions("0000003", Arrays.asList(COLUMN_STATS)); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false); // partition should be physically deleted assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size()); - assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())); + assertFalse(metadataTablePartitions.contains(COLUMN_STATS.getPartitionPath())); Option completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant(); assertTrue(completedReplaceInstant.isPresent()); @@ -566,7 +683,7 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); metadataTablePartitions.forEach(partition -> { List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); - if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) { + if (COLUMN_STATS.getPartitionPath().equals(partition)) { // there should not be any file slice in column_stats partition assertTrue(latestSlices.isEmpty()); } else { @@ -819,7 +936,7 @@ private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table // Compaction should not be triggered yet. Let's verify no base file // and few log files available. List fileSlices = table.getSliceView() - .getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList()); + .getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList()); if (fileSlices.isEmpty()) { throw new IllegalStateException("LogFile slices are not available!"); } @@ -912,7 +1029,7 @@ private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClien .withBasePath(metadataMetaClient.getBasePath()) .withLogFilePaths(logFilePaths) .withLatestInstantTime(latestCommitTimestamp) - .withPartition(MetadataPartitionType.FILES.getPartitionPath()) + .withPartition(FILES.getPartitionPath()) .withReaderSchema(schema) .withMaxMemorySizeInBytes(100000L) .withBufferSize(4096) @@ -942,7 +1059,7 @@ private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClien private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException { table.getHoodieView().sync(); List fileSlices = table.getSliceView() - .getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList()); + .getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList()); if (!fileSlices.get(0).getBaseFile().isPresent()) { throw new IllegalStateException("Base file not available!"); } @@ -2005,7 +2122,7 @@ public void testMetadataMetrics() throws Exception { assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L); - final String prefix = MetadataPartitionType.FILES.getPartitionPath() + "."; + final String prefix = FILES.getPartitionPath() + "."; assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)); assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)); assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)); @@ -2218,10 +2335,10 @@ private void validateMetadata(SparkRDDWriteClient testClient) throws IOException + numFileVersions + " per file group, but was " + latestSlices.size()); List logFiles = latestSlices.get(0).getLogFiles().collect(Collectors.toList()); try { - if (MetadataPartitionType.FILES.getPartitionPath().equals(partition)) { + if (FILES.getPartitionPath().equals(partition)) { verifyMetadataRawRecords(table, logFiles, false); } - if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) { + if (COLUMN_STATS.getPartitionPath().equals(partition)) { verifyMetadataColumnStatsRecords(logFiles); } } catch (IOException e) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 93d4ac5916191..2e387be54452a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -95,6 +95,10 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable) throws init(tableType, enableMetadataTable, true, false, false); } + public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableColumnStats) throws IOException { + init(tableType, enableMetadataTable, true, false, false); + } + public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics, boolean validateMetadataPayloadStateConsistency) throws IOException { init(tableType, Option.empty(), enableMetadataTable, enableFullScan, enableMetrics, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index d20f63bac7356..9ff3fd57d2812 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -444,6 +444,11 @@ public Builder withEngineType(EngineType engineType) { return this; } + public Builder withProperties(Properties properties) { + this.metadataConfig.getProps().putAll(properties); + return this; + } + public HoodieMetadataConfig build() { metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType)); metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());