Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
Expand Down Expand Up @@ -1012,28 +1011,21 @@ private void initialCommit(String createInstantTime, List<MetadataPartitionType>
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions");

Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();

List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
List<String> partitions = new ArrayList<>();
AtomicLong totalFiles = new AtomicLong(0);
Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream().map(p -> {
final String partitionName = HoodieTableMetadataUtil.getPartition(p.getRelativePath());
partitions.add(partitionName);
totalFiles.addAndGet(p.getTotalFiles());
return Pair.of(partitionName, p.getFileNameToSizeMap());
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
.map(p -> {
String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
return Pair.of(partitionName, p.getFileNameToSizeMap());
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());

if (partitionTypes.contains(MetadataPartitionType.FILES)) {
// Record which saves the list of all partitions
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just pure duplication

// in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
Collections.singletonList(allPartitionRecord), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
commit(createInstantTime, partitionToRecordsMap, false);
return;
}
HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
Expand All @@ -1051,28 +1043,31 @@ private void initialCommit(String createInstantTime, List<MetadataPartitionType>
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
}

LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata");
LOG.info("Committing " + partitions.size() + " partitions and " + partitionToFilesMap.values().size() + " files to metadata");

commit(createInstantTime, partitionToRecordsMap, false);
}

private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) {
HoodieData<HoodieRecord> filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) {
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
// filter for files that are part of the completed commits
Map<String, Long> validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
String commitTime = FSUtils.getCommitTime(fileSizePair.getKey());
return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord(
HoodieTableMetadataUtil.getPartition(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty());
});
filesPartitionRecords = filesPartitionRecords.union(fileListRecords);
if (partitionInfoList.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just inverted conditional to simplify control flow

return filesPartitionRecords;
}
return filesPartitionRecords;

HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
// filter for files that are part of the completed commits
Map<String, Long> validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
String commitTime = FSUtils.getCommitTime(fileSizePair.getKey());
return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord(
HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty());
});

return filesPartitionRecords.union(fileListRecords);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,7 @@ public void testColStatsPrefixLookup() throws IOException {
.forEach(partitionWriteStat -> {
String partitionStatName = partitionWriteStat.getKey();
List<HoodieWriteStat> writeStats = partitionWriteStat.getValue();
String partition = HoodieTableMetadataUtil.getPartition(partitionStatName);
String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;

/**
Expand Down Expand Up @@ -256,7 +256,7 @@ public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List
*/
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions, boolean isDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, isDeleted)));
partitions.forEach(partition -> fileInfo.put(getPartitionIdentifier(partition), new HoodieMetadataFileInfo(0L, isDeleted)));

HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());

// Add record bearing added partitions list
List<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
List<String> partitionsAdded = getPartitionsAdded(commitMetadata);

// Add record bearing deleted partitions list
List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata);
Expand All @@ -312,7 +312,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue();

String partition = getPartition(partitionStatName);
String partition = getPartitionIdentifier(partitionStatName);

HashMap<String, Long> updatedFilesToSizesMapping =
writeStats.stream().reduce(new HashMap<>(writeStats.size()),
Expand Down Expand Up @@ -352,16 +352,26 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
return records;
}

private static ArrayList<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetadata) {
return commitMetadata.getPartitionToWriteStats().keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}

private static List<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();
if (!partitionToReplaceFileIds.isEmpty()) {
return new ArrayList<>(partitionToReplaceFileIds.keySet());
}

return partitionToReplaceFileIds.keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}
return new ArrayList<>();

return Collections.emptyList();
}

/**
Expand Down Expand Up @@ -469,7 +479,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCl
int[] fileDeleteCount = {0};
List<String> deletedPartitions = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Expand Down Expand Up @@ -776,7 +786,7 @@ private static List<HoodieRecord> convertFilesToFilesPartitionRecords(Map<String

partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
fileChangeCount[0] += deletedFiles.size();
final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);

Option<Map<String, Long>> filesAdded = Option.empty();
if (partitionToAppendedFiles.containsKey(partitionName)) {
Expand All @@ -789,7 +799,7 @@ private static List<HoodieRecord> convertFilesToFilesPartitionRecords(Map<String
});

partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);
fileChangeCount[1] += appendedFileMap.size();

// Validate that no appended file has been deleted
Expand All @@ -811,12 +821,9 @@ private static List<HoodieRecord> convertFilesToFilesPartitionRecords(Map<String

/**
* Returns partition name for the given path.
*
* @param path
* @return
*/
public static String getPartition(@Nonnull String path) {
return EMPTY_PARTITION_NAME.equals(path) ? NON_PARTITIONED_NAME : path;
public static String getPartitionIdentifier(@Nonnull String relativePartitionPath) {
return EMPTY_PARTITION_NAME.equals(relativePartitionPath) ? NON_PARTITIONED_NAME : relativePartitionPath;
}

/**
Expand All @@ -842,7 +849,7 @@ public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEn
return Stream.empty();
}

final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);
return Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true));
}).iterator();
Expand All @@ -857,7 +864,7 @@ public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEn
HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
final String partitionName = partitionToAppendedFilesPair.getLeft();
final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight();
final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);
return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
final String appendedFile = appendedFileLengthPairEntry.getKey();
if (!FSUtils.isBaseFile(new Path(appendedFile))) {
Expand Down Expand Up @@ -912,7 +919,7 @@ public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEn

HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
final String partitionName = partitionToDeletedFilesPair.getLeft();
final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);
final List<String> deletedFileList = partitionToDeletedFilesPair.getRight();

return deletedFileList.stream().flatMap(deletedFile -> {
Expand All @@ -929,7 +936,7 @@ public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEn

HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
final String partitionName = partitionToAppendedFilesPair.getLeft();
final String partition = getPartition(partitionName);
final String partition = getPartitionIdentifier(partitionName);
final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight();

return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> {
Expand Down Expand Up @@ -1133,7 +1140,7 @@ private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted) {
String partitionName = getPartition(partitionPath);
String partitionName = getPartitionIdentifier(partitionPath);
// NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
// absolute path
String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
Expand Down