Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -885,24 +885,22 @@ private boolean shouldExecuteMetadataTableDeletion() {
// partitions are ready to use
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled()
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
&& !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
}

/**
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
if (clearAll) {
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
if (clearAll && partitions.size() > 0) {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
return;
} else if (partitions.remove(partitionType.get().getPartitionPath())) {
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
completedPartitions.remove(partitionType.get().getPartitionPath());
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}

public HoodieTableMetadata getMetadataTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -365,8 +364,7 @@ public void completeCompaction(
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,9 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
if (config.isMetadataTableEnabled()) {
// even with metadata enabled, some index could have been disabled
// delete metadata partitions corresponding to such indexes
deleteMetadataIndexIfNecessary();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope can you please elaborate on the original intent here?

return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
maybeDeleteMetadataTable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope and here

return Option.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ private static Properties getOrderedPropertiesWithTableChecksum(Properties props
* @throws IOException
*/
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
String checksum;
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
final String checksum;
if (isValidChecksum(props)) {
checksum = props.getProperty(TABLE_CHECKSUM.key());
props.store(outputStream, "Updated at " + Instant.now());
} else {
Expand All @@ -285,8 +285,8 @@ private static String storeProperties(Properties props, FSDataOutputStream outpu
return checksum;
}

private boolean isValidChecksum() {
return contains(TABLE_CHECKSUM) && validateChecksum(props);
private static boolean isValidChecksum(Properties props) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props);
}

/**
Expand All @@ -298,20 +298,13 @@ public HoodieTableConfig() {

private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(cfgPath)) {
props.load(is);
// validate checksum for latest table version
if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) {
LOG.warn("Checksum validation failed. Falling back to backed up configs.");
try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) {
props.load(fsDataInputStream);
}
}
Comment on lines -304 to -310
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this checksum validation is reverted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope i suppose this check is optional and removing meant for reducing chance of the access?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the code invokes, the backup file was very probably not exist, and we already load the backfile if any error happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 but why we'd want to omit the validation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code is problematic, based on the fact the config file and backup config file can only exists one at a time point. We already check the checksum in the modification code path. There is no need to check again in the read path.

And falling back to backup file is not a reasonable way, more proper to throws exception here, but i would let @codope do that in following PR.

} catch (IOException ioe) {
if (!fs.exists(cfgPath)) {
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
// try the backup. this way no query ever fails if update fails midway.
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(backupCfgPath)) {
props.load(is);
}
Expand Down Expand Up @@ -631,7 +624,7 @@ public List<String> getMetadataPartitions() {
CONFIG_VALUES_DELIMITER
);
}

/**
* Returns the format to use for partition meta files.
*/
Expand Down