Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;

/**
Expand Down Expand Up @@ -377,7 +378,25 @@ protected <T extends SpecificRecordBase> void initializeIfNeeded(HoodieTableMeta
if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
return;
}

// if metadata table exists, then check if any of the enabled partition types needs to be initialized
Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream()
.filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p))
.collect(Collectors.toList());

// if there are no partitions to initialize or there is a pending operation, then don't initialize in this round
if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return;
}

String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
initTableMetadata(); // re-init certain flags in BaseTableMetadata
initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit);
initialCommit(createInstantTime, partitionsToInit);
updateInitializedPartitionsInTableConfig(partitionsToInit);
}

private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient,
Expand Down Expand Up @@ -502,26 +521,11 @@ private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction(
*/
private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) throws IOException {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");

// We can only initialize if there are no pending operations on the dataset
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
.getInstants().filter(i -> !i.isCompleted())
.filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get()))
.collect(Collectors.toList());

if (!pendingDataInstant.isEmpty()) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: "
+ Arrays.toString(pendingDataInstant.toArray()));
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}

// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants()
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
String createInstantTime = getInitialCommitInstantTime(dataMetaClient);

initializeMetaClient(dataWriteConfig.getMetadataConfig().populateMetaFields());
initTableMetadata();
Expand All @@ -535,15 +539,38 @@ private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient,
enabledPartitionTypes = this.enabledPartitionTypes;
}
initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes);

// During cold startup, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out
// of these large number of files and calling the existing update(HoodieCommitMetadata) function does not scale
// well. Hence, we have a special commit just for the initialization scenario.
initialCommit(createInstantTime, enabledPartitionTypes);
updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
return true;
}

private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) {
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants()
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
return createInstantTime;
}

private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");

// We can only initialize if there are no pending operations on the dataset
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
.getInstants().filter(i -> !i.isCompleted())
.filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get()))
.collect(Collectors.toList());

if (!pendingDataInstant.isEmpty()) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: "
+ Arrays.toString(pendingDataInstant.toArray()));
return true;
}
return false;
}

private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
Expand Down Expand Up @@ -973,8 +1000,12 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan
}

/**
* This is invoked to initialize metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to
* other regular commits.
* This is invoked to initialize metadata table for a dataset.
* Initial commit has special handling mechanism due to its scale compared to other regular commits.
* During cold startup, the list of files to be committed can be huge.
* So creating a HoodieCommitMetadata out of these large number of files,
* and calling the existing update(HoodieCommitMetadata) function does not scale well.
* Hence, we have a special commit just for the initialization scenario.
*/
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
// List all partitions in the basePath of the containing dataset
Expand All @@ -992,18 +1023,17 @@ private void initialCommit(String createInstantTime, List<MetadataPartitionType>
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();

// Record which saves the list of all partitions
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
// in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
Collections.singletonList(allPartitionRecord), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
commit(createInstantTime, partitionToRecordsMap, false);
return;
}

if (partitionTypes.contains(MetadataPartitionType.FILES)) {
// Record which saves the list of all partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I know why do we need this if condition. can you help clarify.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just saves some duplicate effort. There was no correctness issue in absence of this if condition. For e.g., when colstats is re-enabled, we are reusing this method and then we don't really need to redo the files partition, hence this if condition. In absence of this condition, HoodieMetadataPayload.createPartitionListRecord(partitions) would have been called everytime.

HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
// in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
Collections.singletonList(allPartitionRecord), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
commit(createInstantTime, partitionToRecordsMap, false);
return;
}
HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand Down Expand Up @@ -99,6 +98,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;

/**
* Abstract implementation of a HoodieTable.
*
Expand Down Expand Up @@ -814,14 +820,60 @@ public void maybeDeleteMetadataTable() {
if (shouldExecuteMetadataTableDeletion()) {
try {
LOG.info("Deleting metadata table because it is disabled in writer.");
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
clearMetadataTablePartitionsConfig();
deleteMetadataTable(config.getBasePath(), context);
clearMetadataTablePartitionsConfig(Option.empty(), true);
} catch (HoodieMetadataException e) {
throw new HoodieException("Failed to delete metadata table.", e);
}
}
}

/**
* Deletes the metadata partition if the writer disables any metadata index.
*/
public void deleteMetadataIndexIfNecessary() {
Stream.of(MetadataPartitionType.values()).forEach(partitionType -> {
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
throw new HoodieException("Failed to delete metadata partition: " + partitionType.name(), e);
}
}
});
}

private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionType) {
// Only delete metadata table partition when all the following conditions are met:
// (1) This is data table.
// (2) Index corresponding to this metadata partition is disabled in HoodieWriteConfig.
// (3) The completed metadata partitions in table config contains this partition.
// NOTE: Inflight metadata partitions are not considered as they could have been inflight due to async indexer.
if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) || !config.isMetadataTableEnabled()) {
return false;
}
boolean metadataIndexDisabled;
switch (partitionType) {
// NOTE: FILES partition type is always considered in sync with hoodie.metadata.enable.
// It cannot be the case that metadata is enabled but FILES is disabled.
case COLUMN_STATS:
metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled();
break;
case BLOOM_FILTERS:
metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled();
break;
default:
LOG.debug("Not a valid metadata partition type: " + partitionType.name());
return false;
}
return metadataIndexDisabled
&& getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(partitionType.getPartitionPath());
}

private boolean shouldExecuteMetadataTableDeletion() {
// Only execute metadata table deletion when all the following conditions are met
// (1) This is data table
Expand All @@ -831,17 +883,23 @@ private boolean shouldExecuteMetadataTableDeletion() {
// partitions are ready to use
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled()
&& (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|| !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
}

/**
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig() {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(
HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING);
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
if (clearAll) {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
return;
}
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
completedPartitions.remove(partitionType.get().getPartitionPath());
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
if (config.isMetadataTableEnabled()) {
// even with metadata enabled, some index could have been disabled
// delete metadata partitions corresponding to such indexes
deleteMetadataIndexIfNecessary();
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
// existence after the creation is needed.
final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp));
// even with metadata enabled, some index could have been disabled
// delete metadata partitions corresponding to such indexes
deleteMetadataIndexIfNecessary();
try {
if (isMetadataTableExists || metaClient.getFs().exists(new Path(
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
Expand Down
Loading