diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eb4b24a34d8e6..875507dff3038 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -696,7 +696,6 @@ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String return; } - // Trigger compaction with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index d11f570a65d77..204a8fc310d11 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -154,7 +154,7 @@ protected void commit(HoodieData hoodieDataRecords, String partiti * The record is tagged with respective file slice's location based on its record key. */ private List prepRecords(List records, String partitionName, int numFileGroups) { - List fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); + List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); return records.stream().map(r -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index b7b5961e40d6e..5aa6917d695a6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -169,7 +169,7 @@ protected void commit(HoodieData hoodieDataRecords, String partiti * The record is tagged with respective file slice's location based on its record key. */ private JavaRDD prepRecords(JavaRDD recordsRDD, String partitionName, int numFileGroups) { - List fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); + List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); return recordsRDD.map(r -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 0339c4737e311..9b44b9fa5ec34 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -414,10 +414,8 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except }); } - /** * Tests that virtual key configs are honored in base files after compaction in metadata table. - * */ @ParameterizedTest @ValueSource(booleans = {true, false}) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 58c25a17e0b70..05d9d7349dbb8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -245,7 +245,7 @@ private Pair openReadersI // Metadata is in sync till the latest completed instant on the dataset HoodieTimer timer = new HoodieTimer().startTimer(); - List latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true); + List latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); if (latestFileSlices.size() == 0) { // empty partition return Pair.of(null, null); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index b4dfbbd631f9f..10d362e9c97f7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -157,7 +157,7 @@ public static List convertMetadataToRecords(HoodieCleanMetadata cl * @return a list of metadata table records */ public static List convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline, - HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { + HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { Map> partitionToAppendedFiles = new HashMap<>(); Map> partitionToDeletedFiles = new HashMap<>(); restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { @@ -334,29 +334,65 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou } /** - * Loads the list of file groups for a partition of the Metadata Table with latest file slices. + * Get the latest file slices for a Metadata Table partition. If the file slice is + * because of pending compaction instant, then merge the file slice with the one + * just before the compaction instant time. The list of file slices returned is + * sorted in the correct order of file group name. * - * The list of file slices returned is sorted in the correct order of file group name. - * @param metaClient instance of {@link HoodieTableMetaClient}. - * @param partition The name of the partition whose file groups are to be loaded. - * @param isReader true if reader code path, false otherwise. + * @param metaClient - Instance of {@link HoodieTableMetaClient}. + * @param partition - The name of the partition whose file groups are to be loaded. * @return List of latest file slices for all file groups in a given partition. */ - public static List loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) { - LOG.info("Loading file groups for metadata table partition " + partition); + public static List getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, String partition) { + LOG.info("Loading latest merged file slices for metadata table partition " + partition); + return getPartitionFileSlices(metaClient, partition, true); + } + + /** + * Get the latest file slices for a Metadata Table partition. The list of file slices + * returned is sorted in the correct order of file group name. + * + * @param metaClient - Instance of {@link HoodieTableMetaClient}. + * @param partition - The name of the partition whose file groups are to be loaded. + * @return List of latest file slices for all file groups in a given partition. + */ + public static List getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, String partition) { + LOG.info("Loading latest file slices for metadata table partition " + partition); + return getPartitionFileSlices(metaClient, partition, false); + } - // If there are no commits on the metadata table then the table's default FileSystemView will not return any file - // slices even though we may have initialized them. + /** + * Get the latest file slices for a given partition. + * + * @param metaClient - Instance of {@link HoodieTableMetaClient}. + * @param partition - The name of the partition whose file groups are to be loaded. + * @param mergeFileSlices - When enabled, will merge the latest file slices with the last known + * completed instant. This is useful for readers when there are pending + * compactions. MergeFileSlices when disabled, will return the latest file + * slices without any merging, and this is needed for the writers. + * @return List of latest file slices for all file groups in a given partition. + */ + private static List getPartitionFileSlices(HoodieTableMetaClient metaClient, String partition, + boolean mergeFileSlices) { + // If there are no commits on the metadata table then the table's + // default FileSystemView will not return any file slices even + // though we may have initialized them. HoodieTimeline timeline = metaClient.getActiveTimeline(); if (timeline.empty()) { - final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime()); + final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, + HoodieActiveTimeline.createNewInstantTime()); timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails); } HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline); - Stream fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) : - fsView.getLatestFileSlices(partition); - return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())) - .collect(Collectors.toList()); + Stream fileSliceStream; + if (mergeFileSlices) { + fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( + partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()); + } else { + fileSliceStream = fsView.getLatestFileSlices(partition); + } + return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList()); } + }