Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -120,45 +121,64 @@ public Option<HoodieIndexCommitMetadata> execute() {
if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
}
boolean firstTimeInitializingMetadataTable = false;
HoodieIndexPartitionInfo fileIndexPartitionInfo = null;
if (indexPartitionInfos.size() == 1 && indexPartitionInfos.get(0).getMetadataPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
firstTimeInitializingMetadataTable = true;
fileIndexPartitionInfo = indexPartitionInfos.get(0);
}
// ensure the metadata partitions for the requested indexes are not already available (or inflight)
Set<String> indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> requestedPartitions = indexPartitionInfos.stream()
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
requestedPartitions.retainAll(indexesInflightOrCompleted);
if (!requestedPartitions.isEmpty()) {
if (!firstTimeInitializingMetadataTable && !requestedPartitions.isEmpty()) {
throw new HoodieIndexException(String.format("Following partitions already exist or inflight: %s", requestedPartitions));
}

// transition requested indexInstant to inflight
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
// start indexing for each partition
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
// this will only build index upto base instant as generated by the plan, we will be doing catchup later
String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);

// get remaining instants to catchup
List<HoodieInstant> instantsToCatchup = getInstantsToCatchup(indexUptoInstant);
LOG.info("Total remaining instants to index: " + instantsToCatchup.size());

// reconcile with metadata table timeline
String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath());
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
Set<String> metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());

// index catchup for all remaining instants with a timeout
currentCaughtupInstant = indexUptoInstant;
catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps);
// save index commit metadata and update table config
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = indexPartitionInfos.stream()
.map(info -> new HoodieIndexPartitionInfo(
info.getVersion(),
info.getMetadataPartitionPath(),
currentCaughtupInstant))
.collect(Collectors.toList());
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null;
if (!firstTimeInitializingMetadataTable) {
// start indexing for each partition
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
// this will only build index upto base instant as generated by the plan, we will be doing catchup later
String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);

// get remaining instants to catchup
List<HoodieInstant> instantsToCatchup = getInstantsToCatchup(indexUptoInstant);
LOG.info("Total remaining instants to index: " + instantsToCatchup.size());

// reconcile with metadata table timeline
String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath());
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
Set<String> metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());

// index catchup for all remaining instants with a timeout
currentCaughtupInstant = indexUptoInstant;
catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps);
// save index commit metadata and update table config
finalIndexPartitionInfos = indexPartitionInfos.stream()
.map(info -> new HoodieIndexPartitionInfo(
info.getVersion(),
info.getMetadataPartitionPath(),
currentCaughtupInstant))
.collect(Collectors.toList());
} else {
String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
// save index commit metadata and update table config
finalIndexPartitionInfos = Collections.singletonList(fileIndexPartitionInfo).stream()
.map(info -> new HoodieIndexPartitionInfo(
info.getVersion(),
info.getMetadataPartitionPath(),
indexUptoInstant))
.collect(Collectors.toList());
}

HoodieIndexCommitMetadata indexCommitMetadata = HoodieIndexCommitMetadata.newBuilder()
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, indexCommitMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ public Option<HoodieIndexPlan> execute() {
// in case FILES partition itself was not initialized before (i.e. metadata was never enabled), this will initialize synchronously
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to initialize filegroups for indexing for instant: %s", instantTime)));
metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp());
if (!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
// initialize metadata partition only if not for FILES partition.
metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp());
}

// for each partitionToIndex add that time to the plan
List<HoodieIndexPartitionInfo> indexPartitionInfos = finalPartitionsToIndex.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.MetadataPartitionType;

Expand Down Expand Up @@ -228,6 +229,9 @@ private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes);
checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time.");
if (!isMetadataInitialized() && !partitionTypes.contains(MetadataPartitionType.FILES)) {
throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray()));
}
if (indexExists(partitionTypes)) {
return Option.empty();
}
Expand All @@ -249,6 +253,11 @@ private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
return false;
}

private boolean isMetadataInitialized() {
Set<String> indexedMetadataPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
return !indexedMetadataPartitions.isEmpty();
}

private int runIndexing(JavaSparkContext jsc) throws Exception {
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
Expand Down Expand Up @@ -318,8 +327,6 @@ List<MetadataPartitionType> getRequestedPartitionTypes(String indexTypes) {
List<String> requestedIndexTypes = Arrays.asList(indexTypes.split(","));
return requestedIndexTypes.stream()
.map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)))
// FILES partition is initialized synchronously while getting metadata writer
.filter(p -> !MetadataPartitionType.FILES.equals(p))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
Expand Down Expand Up @@ -106,7 +108,7 @@ public void testGetRequestedPartitionTypes() {
config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS";
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
List<MetadataPartitionType> partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes);
assertFalse(partitionTypes.contains(FILES));
assertTrue(partitionTypes.contains(FILES));
assertTrue(partitionTypes.contains(BLOOM_FILTERS));
assertTrue(partitionTypes.contains(COLUMN_STATS));
}
Expand Down Expand Up @@ -134,11 +136,75 @@ public void testIsIndexBuiltForAllRequestedTypes() {
@Test
public void testIndexerWithNotAllIndexesEnabled() {
initTestDataGenerator();
String tableName = "indexer_test";
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
initializeWriteClient(metadataConfigBuilder.build());

// validate table config
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));

// build indexer config which has only column_stats enabled (files and bloom filter is already enabled)
indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[]{FILES, BLOOM_FILTERS}), Collections.emptyList());
}

@Test
public void testIndexerWithFilesPartition() {
initTestDataGenerator();
tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
initializeWriteClient(metadataConfigBuilder.build());

// validate table config
assertFalse(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));

// build indexer config which has only files enabled
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS}));
}

/**
* If first time indexing is done for any other partition other than FILES partition, exception will be thrown, given metadata table is not initialized in synchronous code path
* with regular writers.
*/
@Test
public void testIndexerForExceptionWithNonFilesPartition() {
initTestDataGenerator();
tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false);
initializeWriteClient(metadataConfigBuilder.build());
// validate table config
assertFalse(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));

// build indexer config which has only column stats enabled. expected to throw exception.
HoodieIndexer.Config config = new HoodieIndexer.Config();
String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
config.basePath = basePath;
config.tableName = tableName;
config.indexTypes = COLUMN_STATS.name();
config.runningMode = SCHEDULE_AND_EXECUTE;
config.propsFilePath = propsPath;
// start the indexer and validate index building fails
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(-1, indexer.start(0));

// validate table config
metaClient = reload(metaClient);
assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(FILES.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
// validate metadata partitions actually exist
assertFalse(metadataPartitionExists(basePath, context, FILES));

// trigger FILES partition and indexing should succeed.
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS}));
}

private void initializeWriteClient(HoodieMetadataConfig metadataConfig) {
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfig).build();
// do one upsert with synchronous metadata update
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
String instant = "0001";
Expand All @@ -147,31 +213,30 @@ public void testIndexerWithNotAllIndexesEnabled() {
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
}

// validate table config
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));

// build indexer config which has only column_stats enabled (files is enabled by default)
private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List<MetadataPartitionType> alreadyCompletedPartitions, List<MetadataPartitionType> nonExistantPartitions) {
HoodieIndexer.Config config = new HoodieIndexer.Config();
String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
config.basePath = basePath;
config.tableName = tableName;
config.indexTypes = COLUMN_STATS.name();
config.indexTypes = partitionTypeToIndex.name();
config.runningMode = SCHEDULE_AND_EXECUTE;
config.propsFilePath = propsPath;
// start the indexer and validate column_stats index is also complete
// start the indexer and validate files index is completely built out
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));

// validate table config
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
metaClient = reload(metaClient);
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
assertTrue(completedPartitions.contains(partitionTypeToIndex.getPartitionPath()));
alreadyCompletedPartitions.forEach(entry -> assertTrue(completedPartitions.contains(entry.getPartitionPath())));
nonExistantPartitions.forEach(entry -> assertFalse(completedPartitions.contains(entry.getPartitionPath())));

// validate metadata partitions actually exist
assertTrue(metadataPartitionExists(basePath, context, FILES));
assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
assertTrue(metadataPartitionExists(basePath, context, partitionTypeToIndex));
alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath, context, entry)));
}

@Test
Expand Down