diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java index abfacfcfdb5a7..ebbecbda24110 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.table.HoodieTable; @@ -56,11 +57,16 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; import scala.Tuple2; +import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX; +import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX; +import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile; + /** * Consistent hashing bucket index implementation, with auto-adjust bucket number. * NOTE: bucket resizing is triggered by clustering. @@ -189,29 +195,64 @@ public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, S */ public static Option loadMetadata(HoodieTable table, String partition) { Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); - + Path partitionPath = FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition); try { - FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); - final HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - Predicate metaFilePredicate = fileStatus -> { + Predicate hashingMetaCommitFilePredicate = fileStatus -> { String filename = fileStatus.getPath().getName(); - if (!filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)) { - return false; - } - String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(filename); - return completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS); + return filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX); }; - - // Get a valid hashing metadata with the largest (latest) timestamp - FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate) - .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null); - - if (metaFile == null) { - return Option.empty(); + Predicate hashingMetadataFilePredicate = fileStatus -> { + String filename = fileStatus.getPath().getName(); + return filename.contains(HASHING_METADATA_FILE_SUFFIX); + }; + final FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); + final TreeSet commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate) + .map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName())) + .sorted() + .collect(Collectors.toCollection(TreeSet::new)); + final FileStatus[] hashingMetaFiles = Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate) + .sorted(Comparator.comparing(f -> f.getPath().getName())) + .toArray(FileStatus[]::new); + // max committed metadata file + final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : commitMetaTss.last(); + // max updated metadata file + FileStatus maxMetadataFile = hashingMetaFiles.length > 0 ? hashingMetaFiles[hashingMetaFiles.length - 1] : null; + // If single file present in metadata and if its default file return it + if (maxMetadataFile != null && HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS)) { + return loadMetadataFromGivenFile(table, maxMetadataFile); } + // if max updated metadata file and committed metadata file are same then return + if (maxCommitMetaFileTs != null && maxMetadataFile != null + && maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) { + return loadMetadataFromGivenFile(table, maxMetadataFile); + } + HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath())); - return Option.of(HoodieConsistentHashingMetadata.fromBytes(content)); + // fix the in-consistency between un-committed and committed hashing metadata files. + List fixed = new ArrayList<>(); + Arrays.stream(hashingMetaFiles).forEach(hashingMetaFile -> { + Path path = hashingMetaFile.getPath(); + String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName()); + if (maxCommitMetaFileTs != null && timestamp.compareTo(maxCommitMetaFileTs) <= 0) { + // only fix the metadata with greater timestamp than max committed timestamp + return; + } + boolean isRehashingCommitted = completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS); + if (isRehashingCommitted) { + if (!commitMetaTss.contains(timestamp)) { + try { + createCommitMarker(table, path, partitionPath); + } catch (IOException e) { + throw new HoodieIOException("Exception while creating marker file " + path.getName() + " for partition " + partition, e); + } + } + fixed.add(hashingMetaFile); + } else if (recommitMetadataFile(table, hashingMetaFile, partition)) { + fixed.add(hashingMetaFile); + } + }); + + return fixed.isEmpty() ? Option.empty() : loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1)); } catch (FileNotFoundException e) { return Option.empty(); } catch (IOException e) { @@ -271,8 +312,83 @@ public Option getRecordLocation(HoodieKey key) { } LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " - + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); } } + + /*** + * Creates commit marker corresponding to hashing metadata file after post commit clustering operation. + * @param table hoodie table + * @param fileStatus file for which commit marker should be created + * @param partitionPath partition path the file belongs to + * @throws IOException + */ + private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException { + HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); + Path fullPath = new Path(partitionPath, getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); + if (fs.exists(fullPath)) { + return; + } + FileIOUtils.createFileInPath(fs, fullPath, Option.of(StringUtils.EMPTY_STRING.getBytes())); + } + + /*** + * Loads consistent hashing metadata of table from the given meta file + * @param table hoodie table + * @param metaFile hashing metadata file + * @return HoodieConsistentHashingMetadata object + */ + private static Option loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) { + try { + if (metaFile == null) { + return Option.empty(); + } + byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath())); + return Option.of(HoodieConsistentHashingMetadata.fromBytes(content)); + } catch (FileNotFoundException e) { + return Option.empty(); + } catch (IOException e) { + LOG.error("Error when loading hashing metadata, for path: " + metaFile.getPath().getName(), e); + throw new HoodieIndexException("Error while loading hashing metadata", e); + } + } + + /*** + * COMMIT MARKER RECOVERY JOB. + * If particular hashing metadta file doesn't have commit marker then there could be a case where clustering is done but post commit marker + * creation operation failed. In this case this method will check file group id from consistent hashing metadata against storage base file group ids. + * if one of the file group matches then we can conclude that this is the latest metadata file. + * Note : we will end up calling this method if there is no marker file and no replace commit on active timeline, if replace commit is not present on + * active timeline that means old file group id's before clustering operation got cleaned and only new file group id's of current clustering operation + * are present on the disk. + * @param table hoodie table + * @param metaFile metadata file on which sync check needs to be performed + * @param partition partition metadata file belongs to + * @return true if hashing metadata file is latest else false + */ + private static boolean recommitMetadataFile(HoodieTable table, FileStatus metaFile, String partition) { + Path partitionPath = FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition); + String timestamp = getTimestampFromFile(metaFile.getPath().getName()); + if (table.getPendingCommitTimeline().containsInstant(timestamp)) { + return false; + } + Option hoodieConsistentHashingMetadataOption = loadMetadataFromGivenFile(table, metaFile); + if (!hoodieConsistentHashingMetadataOption.isPresent()) { + return false; + } + HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata = hoodieConsistentHashingMetadataOption.get(); + + Predicate hoodieFileGroupIdPredicate = hoodieBaseFile -> hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node -> node.getFileIdPrefix().equals(hoodieBaseFile)); + if (table.getBaseFileOnlyView().getLatestBaseFiles(partition) + .map(fileIdPrefix -> FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate)) { + try { + createCommitMarker(table, metaFile.getPath(), partitionPath); + return true; + } catch (IOException e) { + throw new HoodieIOException("Exception while creating marker file " + metaFile.getPath().getName() + " for partition " + partition, e); + } + } + return false; + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java index 9dcd1d876a0e1..bbd9eaa4e02a5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java @@ -18,11 +18,14 @@ package org.apache.hudi.client.functional; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy; import org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy; import org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -33,10 +36,11 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; @@ -52,6 +56,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.mapred.JobConf; @@ -149,6 +154,59 @@ public void testResizing(boolean isSplit) throws IOException { }); } + /** + * Test running archival after clustering + * @throws IOException + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLoadMetadata(boolean isCommitFilePresent) throws IOException { + final int maxFileSize = 5120; + final int targetBucketNum = 14; + setup(maxFileSize); + writeClient.getConfig().setValue(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"); + writeClient.getConfig().setValue(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"); + writeClient.getConfig().setValue(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"); + writeData(HoodieActiveTimeline.createNewInstantTime(), 2000, true); + String clusteringTime = (String) writeClient.scheduleClustering(Option.empty()).get(); + writeClient.cluster(clusteringTime, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTable table = HoodieSparkTable.create(config, context, metaClient); + writeClient.clean(); + HoodieTimelineArchiver hoodieTimelineArchiver = new HoodieTimelineArchiver(writeClient.getConfig(), table); + hoodieTimelineArchiver.archiveIfRequired(context); + Arrays.stream(dataGen.getPartitionPaths()).forEach(p -> { + if (!isCommitFilePresent) { + Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), p); + try { + Arrays.stream(table.getMetaClient().getFs().listStatus(metadataPath)).forEach(fl -> { + if (fl.getPath().getName().contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX)) { + try { + // delete commit marker to test recovery job + table.getMetaClient().getFs().delete(fl.getPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + HoodieConsistentHashingMetadata metadata = HoodieSparkConsistentBucketIndex.loadMetadata(table, p).get(); + Assertions.assertEquals(targetBucketNum, metadata.getNodes().size()); + }); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true); + Assertions.assertEquals(2080, readRecords(dataGen.getPartitionPaths()).size()); + } + /** * 1. Test PARTITION_SORT mode, i.e., sort by the record key * 2. Test custom column sort diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java index d91ee2487ba00..120ba487fcfd3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java @@ -49,6 +49,7 @@ public class HoodieConsistentHashingMetadata implements Serializable { */ public static final int HASH_VALUE_MASK = Integer.MAX_VALUE; public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta"; + public static final String HASHING_METADATA_COMMIT_FILE_SUFFIX = ".commit"; private final short version; private final String partitionPath;