Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -56,11 +57,16 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;

/**
* Consistent hashing bucket index implementation, with auto-adjust bucket number.
* NOTE: bucket resizing is triggered by clustering.
Expand Down Expand Up @@ -189,29 +195,64 @@ public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, S
*/
public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) {
Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition);

Path partitionPath = FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a typo here? partitionPath is data partition path now, e.g. hdfs://.../hudi_table/date=2023-12-01/, which does not match with logic in hashingMetaCommitFilePredicate. Correct me if I'm wrong, did i miss anything?

According to the code below, partitionPath will be used to create a marker file in method createCommitMarker. If partitionPath is the path of data files, it will create a marker file in the partition folder, e.g. hdfs://.../hudi_table/date=2023-12-01/00000000000000.commit.

However, the code below is looking for the committed metadata files in the folder metadataPath.

      final FileStatus[] metaFiles = metaClient.getFs().listStatus(metadataPath);
      final TreeSet<String> commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
          .map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
          .sorted()
          .collect(Collectors.toCollection(TreeSet::new));

And in the test case, TestSparkConsistentBucketClustering.testLoadMetadata, it also looks for the committed metadata files in the folder metadataPath

        Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), p);
        try {
          Arrays.stream(table.getMetaClient().getFs().listStatus(metadataPath)).forEach(fl -> {
            if (fl.getPath().getName().contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX)) {
              try {
                // delete commit marker to test recovery job
                table.getMetaClient().getFs().delete(fl.getPath());
              } catch (IOException e) {
                throw new RuntimeException(e);
              }
            }
          });

BTW, the code has been refactored in a new class, https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java#L106

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, it creats a marker file under data partition while a normal meta file under metadata path, the marker file is used for recovery consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, correct, the marker file is under data partition. So, as I understand, commitMetaTss will always be an empty set. Because metaFiles is not data partition, it is FSUtils.getPartitionPath(metaClient.getHashingMetadataPath(), partition).

final TreeSet<String> commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)

May I ask how the recovery consistency works? Which part of code I should refer? Really appreciate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TengHuo Yeah it looks like a Typo. It should be metadata path. Idea is create commit marker file for every hashing_metadata file, so that we will able to identify latest metadata file.

@danny0405 thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, can you fire a fix @TengHuo @rohan-uptycs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @TengHuo, I will raise a PR in sometime.

try {
FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath);
final HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
Predicate<FileStatus> metaFilePredicate = fileStatus -> {
Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
String filename = fileStatus.getPath().getName();
if (!filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)) {
return false;
}
String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(filename);
return completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
return filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
};

// Get a valid hashing metadata with the largest (latest) timestamp
FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate)
.max(Comparator.comparing(a -> a.getPath().getName())).orElse(null);

if (metaFile == null) {
return Option.empty();
Predicate<FileStatus> hashingMetadataFilePredicate = fileStatus -> {
String filename = fileStatus.getPath().getName();
return filename.contains(HASHING_METADATA_FILE_SUFFIX);
};
final FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath);
final TreeSet<String> commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
.map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));
final FileStatus[] hashingMetaFiles = Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate)
.sorted(Comparator.comparing(f -> f.getPath().getName()))
.toArray(FileStatus[]::new);
// max committed metadata file
final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : commitMetaTss.last();
// max updated metadata file
FileStatus maxMetadataFile = hashingMetaFiles.length > 0 ? hashingMetaFiles[hashingMetaFiles.length - 1] : null;
// If single file present in metadata and if its default file return it
if (maxMetadataFile != null && HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS)) {
return loadMetadataFromGivenFile(table, maxMetadataFile);
}
// if max updated metadata file and committed metadata file are same then return
if (maxCommitMetaFileTs != null && maxMetadataFile != null
&& maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) {
return loadMetadataFromGivenFile(table, maxMetadataFile);
}
HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();

byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
// fix the in-consistency between un-committed and committed hashing metadata files.
List<FileStatus> fixed = new ArrayList<>();
Arrays.stream(hashingMetaFiles).forEach(hashingMetaFile -> {
Path path = hashingMetaFile.getPath();
String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
if (maxCommitMetaFileTs != null && timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
// only fix the metadata with greater timestamp than max committed timestamp
return;
}
boolean isRehashingCommitted = completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
if (isRehashingCommitted) {
if (!commitMetaTss.contains(timestamp)) {
try {
createCommitMarker(table, path, partitionPath);
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + path.getName() + " for partition " + partition, e);
}
}
fixed.add(hashingMetaFile);
} else if (recommitMetadataFile(table, hashingMetaFile, partition)) {
fixed.add(hashingMetaFile);
}
});

return fixed.isEmpty() ? Option.empty() : loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1));
} catch (FileNotFoundException e) {
return Option.empty();
} catch (IOException e) {
Expand Down Expand Up @@ -271,8 +312,83 @@ public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
}

LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: "
+ partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString());
+ partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString());
throw new HoodieIndexException("Failed to getBucket as hashing node has no file group");
}
}

/***
* Creates commit marker corresponding to hashing metadata file after post commit clustering operation.
* @param table hoodie table
* @param fileStatus file for which commit marker should be created
* @param partitionPath partition path the file belongs to
* @throws IOException
*/
private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException {
HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
Path fullPath = new Path(partitionPath, getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX);
if (fs.exists(fullPath)) {
return;
}
FileIOUtils.createFileInPath(fs, fullPath, Option.of(StringUtils.EMPTY_STRING.getBytes()));
}

/***
* Loads consistent hashing metadata of table from the given meta file
* @param table hoodie table
* @param metaFile hashing metadata file
* @return HoodieConsistentHashingMetadata object
*/
private static Option<HoodieConsistentHashingMetadata> loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
try {
if (metaFile == null) {
return Option.empty();
}
byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
} catch (FileNotFoundException e) {
return Option.empty();
} catch (IOException e) {
LOG.error("Error when loading hashing metadata, for path: " + metaFile.getPath().getName(), e);
throw new HoodieIndexException("Error while loading hashing metadata", e);
}
}

/***
* COMMIT MARKER RECOVERY JOB.
* If particular hashing metadta file doesn't have commit marker then there could be a case where clustering is done but post commit marker
* creation operation failed. In this case this method will check file group id from consistent hashing metadata against storage base file group ids.
* if one of the file group matches then we can conclude that this is the latest metadata file.
* Note : we will end up calling this method if there is no marker file and no replace commit on active timeline, if replace commit is not present on
* active timeline that means old file group id's before clustering operation got cleaned and only new file group id's of current clustering operation
* are present on the disk.
* @param table hoodie table
* @param metaFile metadata file on which sync check needs to be performed
* @param partition partition metadata file belongs to
* @return true if hashing metadata file is latest else false
*/
private static boolean recommitMetadataFile(HoodieTable table, FileStatus metaFile, String partition) {
Path partitionPath = FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition);
String timestamp = getTimestampFromFile(metaFile.getPath().getName());
if (table.getPendingCommitTimeline().containsInstant(timestamp)) {
return false;
}
Option<HoodieConsistentHashingMetadata> hoodieConsistentHashingMetadataOption = loadMetadataFromGivenFile(table, metaFile);
if (!hoodieConsistentHashingMetadataOption.isPresent()) {
return false;
}
HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata = hoodieConsistentHashingMetadataOption.get();

Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile -> hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node -> node.getFileIdPrefix().equals(hoodieBaseFile));
if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
.map(fileIdPrefix -> FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate)) {
try {
createCommitMarker(table, metaFile.getPath(), partitionPath);
return true;
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + metaFile.getPath().getName() + " for partition " + partition, e);
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.hudi.client.functional;

import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy;
import org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -33,10 +36,11 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
Expand All @@ -52,6 +56,7 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -149,6 +154,59 @@ public void testResizing(boolean isSplit) throws IOException {
});
}

/**
* Test running archival after clustering
* @throws IOException
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLoadMetadata(boolean isCommitFilePresent) throws IOException {
final int maxFileSize = 5120;
final int targetBucketNum = 14;
setup(maxFileSize);
writeClient.getConfig().setValue(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
writeClient.getConfig().setValue(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4");
writeClient.getConfig().setValue(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5");
writeData(HoodieActiveTimeline.createNewInstantTime(), 2000, true);
String clusteringTime = (String) writeClient.scheduleClustering(Option.empty()).get();
writeClient.cluster(clusteringTime, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
writeClient.clean();
HoodieTimelineArchiver hoodieTimelineArchiver = new HoodieTimelineArchiver(writeClient.getConfig(), table);
hoodieTimelineArchiver.archiveIfRequired(context);
Arrays.stream(dataGen.getPartitionPaths()).forEach(p -> {
if (!isCommitFilePresent) {
Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), p);
try {
Arrays.stream(table.getMetaClient().getFs().listStatus(metadataPath)).forEach(fl -> {
if (fl.getPath().getName().contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX)) {
try {
// delete commit marker to test recovery job
table.getMetaClient().getFs().delete(fl.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
HoodieConsistentHashingMetadata metadata = HoodieSparkConsistentBucketIndex.loadMetadata(table, p).get();
Assertions.assertEquals(targetBucketNum, metadata.getNodes().size());
});
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
Assertions.assertEquals(2080, readRecords(dataGen.getPartitionPaths()).size());
}

/**
* 1. Test PARTITION_SORT mode, i.e., sort by the record key
* 2. Test custom column sort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class HoodieConsistentHashingMetadata implements Serializable {
*/
public static final int HASH_VALUE_MASK = Integer.MAX_VALUE;
public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta";
public static final String HASHING_METADATA_COMMIT_FILE_SUFFIX = ".commit";

private final short version;
private final String partitionPath;
Expand Down