diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index 339e95b9e08d4..182cf9450453f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -51,6 +51,7 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Set; @@ -120,45 +121,64 @@ public Option execute() { if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) { throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime)); } + boolean firstTimeInitializingMetadataTable = false; + HoodieIndexPartitionInfo fileIndexPartitionInfo = null; + if (indexPartitionInfos.size() == 1 && indexPartitionInfos.get(0).getMetadataPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) { + firstTimeInitializingMetadataTable = true; + fileIndexPartitionInfo = indexPartitionInfos.get(0); + } // ensure the metadata partitions for the requested indexes are not already available (or inflight) Set indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig()); Set requestedPartitions = indexPartitionInfos.stream() .map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()); requestedPartitions.retainAll(indexesInflightOrCompleted); - if (!requestedPartitions.isEmpty()) { + if (!firstTimeInitializingMetadataTable && !requestedPartitions.isEmpty()) { throw new HoodieIndexException(String.format("Following partitions already exist or inflight: %s", requestedPartitions)); } // transition requested indexInstant to inflight table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty()); - // start indexing for each partition - HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime) - .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime))); - // this will only build index upto base instant as generated by the plan, we will be doing catchup later - String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant(); - LOG.info("Starting Index Building with base instant: " + indexUptoInstant); - metadataWriter.buildMetadataPartitions(context, indexPartitionInfos); - - // get remaining instants to catchup - List instantsToCatchup = getInstantsToCatchup(indexUptoInstant); - LOG.info("Total remaining instants to index: " + instantsToCatchup.size()); - - // reconcile with metadata table timeline - String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath()); - HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build(); - Set metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream() - .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); - - // index catchup for all remaining instants with a timeout - currentCaughtupInstant = indexUptoInstant; - catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps); - // save index commit metadata and update table config - List finalIndexPartitionInfos = indexPartitionInfos.stream() - .map(info -> new HoodieIndexPartitionInfo( - info.getVersion(), - info.getMetadataPartitionPath(), - currentCaughtupInstant)) - .collect(Collectors.toList()); + List finalIndexPartitionInfos = null; + if (!firstTimeInitializingMetadataTable) { + // start indexing for each partition + HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime) + .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime))); + // this will only build index upto base instant as generated by the plan, we will be doing catchup later + String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant(); + LOG.info("Starting Index Building with base instant: " + indexUptoInstant); + metadataWriter.buildMetadataPartitions(context, indexPartitionInfos); + + // get remaining instants to catchup + List instantsToCatchup = getInstantsToCatchup(indexUptoInstant); + LOG.info("Total remaining instants to index: " + instantsToCatchup.size()); + + // reconcile with metadata table timeline + String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath()); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build(); + Set metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream() + .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + + // index catchup for all remaining instants with a timeout + currentCaughtupInstant = indexUptoInstant; + catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps); + // save index commit metadata and update table config + finalIndexPartitionInfos = indexPartitionInfos.stream() + .map(info -> new HoodieIndexPartitionInfo( + info.getVersion(), + info.getMetadataPartitionPath(), + currentCaughtupInstant)) + .collect(Collectors.toList()); + } else { + String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant(); + // save index commit metadata and update table config + finalIndexPartitionInfos = Collections.singletonList(fileIndexPartitionInfo).stream() + .map(info -> new HoodieIndexPartitionInfo( + info.getVersion(), + info.getMetadataPartitionPath(), + indexUptoInstant)) + .collect(Collectors.toList()); + } + HoodieIndexCommitMetadata indexCommitMetadata = HoodieIndexCommitMetadata.newBuilder() .setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build(); updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, indexCommitMetadata); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java index cfd975c5098a4..c306f4f6f05b6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java @@ -105,7 +105,10 @@ public Option execute() { // in case FILES partition itself was not initialized before (i.e. metadata was never enabled), this will initialize synchronously HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime) .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to initialize filegroups for indexing for instant: %s", instantTime))); - metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp()); + if (!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) { + // initialize metadata partition only if not for FILES partition. + metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp()); + } // for each partitionToIndex add that time to the plan List indexPartitionInfos = finalPartitionsToIndex.stream() diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 96f6ce38cda48..6f78487cce04b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.metadata.MetadataPartitionType; @@ -228,6 +229,9 @@ private Option scheduleIndexing(JavaSparkContext jsc) throws Exception { private Option doSchedule(SparkRDDWriteClient client) { List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes); checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time."); + if (!isMetadataInitialized() && !partitionTypes.contains(MetadataPartitionType.FILES)) { + throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray())); + } if (indexExists(partitionTypes)) { return Option.empty(); } @@ -249,6 +253,11 @@ private boolean indexExists(List partitionTypes) { return false; } + private boolean isMetadataInitialized() { + Set indexedMetadataPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig()); + return !indexedMetadataPartitions.isEmpty(); + } + private int runIndexing(JavaSparkContext jsc) throws Exception { String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient); try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { @@ -318,8 +327,6 @@ List getRequestedPartitionTypes(String indexTypes) { List requestedIndexTypes = Arrays.asList(indexTypes.split(",")); return requestedIndexTypes.stream() .map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT))) - // FILES partition is initialized synchronously while getting metadata writer - .filter(p -> !MetadataPartitionType.FILES.equals(p)) .collect(Collectors.toList()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 9312a26b4f950..0f57e3b1b8402 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -49,8 +49,10 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import static org.apache.hudi.common.table.HoodieTableMetaClient.reload; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; @@ -106,7 +108,7 @@ public void testGetRequestedPartitionTypes() { config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS"; HoodieIndexer indexer = new HoodieIndexer(jsc, config); List partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes); - assertFalse(partitionTypes.contains(FILES)); + assertTrue(partitionTypes.contains(FILES)); assertTrue(partitionTypes.contains(BLOOM_FILTERS)); assertTrue(partitionTypes.contains(COLUMN_STATS)); } @@ -134,11 +136,75 @@ public void testIsIndexBuiltForAllRequestedTypes() { @Test public void testIndexerWithNotAllIndexesEnabled() { initTestDataGenerator(); - String tableName = "indexer_test"; - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); - HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); + initializeWriteClient(metadataConfigBuilder.build()); + + // validate table config + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + + // build indexer config which has only column_stats enabled (files and bloom filter is already enabled) + indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[]{FILES, BLOOM_FILTERS}), Collections.emptyList()); + } + + @Test + public void testIndexerWithFilesPartition() { + initTestDataGenerator(); + tableName = "indexer_test"; + // enable files and bloom_filters on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); + initializeWriteClient(metadataConfigBuilder.build()); + + // validate table config + assertFalse(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + + // build indexer config which has only files enabled + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS})); + } + + /** + * If first time indexing is done for any other partition other than FILES partition, exception will be thrown, given metadata table is not initialized in synchronous code path + * with regular writers. + */ + @Test + public void testIndexerForExceptionWithNonFilesPartition() { + initTestDataGenerator(); + tableName = "indexer_test"; + // enable files and bloom_filters on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false); + initializeWriteClient(metadataConfigBuilder.build()); + // validate table config + assertFalse(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + + // build indexer config which has only column stats enabled. expected to throw exception. + HoodieIndexer.Config config = new HoodieIndexer.Config(); + String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); + config.basePath = basePath; + config.tableName = tableName; + config.indexTypes = COLUMN_STATS.name(); + config.runningMode = SCHEDULE_AND_EXECUTE; + config.propsFilePath = propsPath; + // start the indexer and validate index building fails + HoodieIndexer indexer = new HoodieIndexer(jsc, config); + assertEquals(-1, indexer.start(0)); + + // validate table config + metaClient = reload(metaClient); + assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(FILES.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath())); + assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + // validate metadata partitions actually exist + assertFalse(metadataPartitionExists(basePath, context, FILES)); + + // trigger FILES partition and indexing should succeed. + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS})); + } + + private void initializeWriteClient(HoodieMetadataConfig metadataConfig) { + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfig).build(); // do one upsert with synchronous metadata update SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); String instant = "0001"; @@ -147,31 +213,30 @@ public void testIndexerWithNotAllIndexesEnabled() { JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); + } - // validate table config - assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); - assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); - - // build indexer config which has only column_stats enabled (files is enabled by default) + private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List alreadyCompletedPartitions, List nonExistantPartitions) { HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); config.basePath = basePath; config.tableName = tableName; - config.indexTypes = COLUMN_STATS.name(); + config.indexTypes = partitionTypeToIndex.name(); config.runningMode = SCHEDULE_AND_EXECUTE; config.propsFilePath = propsPath; - // start the indexer and validate column_stats index is also complete + // start the indexer and validate files index is completely built out HoodieIndexer indexer = new HoodieIndexer(jsc, config); assertEquals(0, indexer.start(0)); // validate table config - assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); - assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath())); + metaClient = reload(metaClient); + Set completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig()); + assertTrue(completedPartitions.contains(partitionTypeToIndex.getPartitionPath())); + alreadyCompletedPartitions.forEach(entry -> assertTrue(completedPartitions.contains(entry.getPartitionPath()))); + nonExistantPartitions.forEach(entry -> assertFalse(completedPartitions.contains(entry.getPartitionPath()))); + // validate metadata partitions actually exist - assertTrue(metadataPartitionExists(basePath, context, FILES)); - assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath, context, partitionTypeToIndex)); + alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath, context, entry))); } @Test