Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String
return;
}


// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
* The record is tagged with respective file slice's location based on its record key.
*/
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));

return records.stream().map(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
* The record is tagged with respective file slice's location based on its record key.
*/
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));

return recordsRDD.map(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,8 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except
});
}


/**
* Tests that virtual key configs are honored in base files after compaction in metadata table.
*
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersI

// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true);
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
if (latestFileSlices.size() == 0) {
// empty partition
return Pair.of(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cl
* @return a list of metadata table records
*/
public static List<HoodieRecord> convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline,
HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
Expand Down Expand Up @@ -334,29 +334,65 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou
}

/**
* Loads the list of file groups for a partition of the Metadata Table with latest file slices.
* Get the latest file slices for a Metadata Table partition. If the file slice is
* because of pending compaction instant, then merge the file slice with the one
* just before the compaction instant time. The list of file slices returned is
* sorted in the correct order of file group name.
*
* The list of file slices returned is sorted in the correct order of file group name.
* @param metaClient instance of {@link HoodieTableMetaClient}.
* @param partition The name of the partition whose file groups are to be loaded.
* @param isReader true if reader code path, false otherwise.
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) {
LOG.info("Loading file groups for metadata table partition " + partition);
public static List<FileSlice> getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading latest merged file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, partition, true);
}

/**
* Get the latest file slices for a Metadata Table partition. The list of file slices
* returned is sorted in the correct order of file group name.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading latest file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, partition, false);
}

// If there are no commits on the metadata table then the table's default FileSystemView will not return any file
// slices even though we may have initialized them.
/**
* Get the latest file slices for a given partition.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @param mergeFileSlices - When enabled, will merge the latest file slices with the last known
* completed instant. This is useful for readers when there are pending
* compactions. MergeFileSlices when disabled, will return the latest file
* slices without any merging, and this is needed for the writers.
* @return List of latest file slices for all file groups in a given partition.
*/
private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient, String partition,
boolean mergeFileSlices) {
// If there are no commits on the metadata table then the table's
// default FileSystemView will not return any file slices even
// though we may have initialized them.
HoodieTimeline timeline = metaClient.getActiveTimeline();
if (timeline.empty()) {
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime());
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime());
timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails);
}

HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline);
Stream<FileSlice> fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) :
fsView.getLatestFileSlices(partition);
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId()))
.collect(Collectors.toList());
Stream<FileSlice> fileSliceStream;
if (mergeFileSlices) {
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp());
} else {
fileSliceStream = fsView.getLatestFileSlices(partition);
}
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
}

}