diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 7c033f8509667..8d4cb17c56951 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -87,7 +87,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; @@ -1012,28 +1011,21 @@ private void initialCommit(String createInstantTime, List LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); + Map> partitionToRecordsMap = new HashMap<>(); + List partitionInfoList = listAllPartitions(dataMetaClient); - List partitions = new ArrayList<>(); - AtomicLong totalFiles = new AtomicLong(0); - Map> partitionToFilesMap = partitionInfoList.stream().map(p -> { - final String partitionName = HoodieTableMetadataUtil.getPartition(p.getRelativePath()); - partitions.add(partitionName); - totalFiles.addAndGet(p.getTotalFiles()); - return Pair.of(partitionName, p.getFileNameToSizeMap()); - }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - final Map> partitionToRecordsMap = new HashMap<>(); + Map> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + List partitions = new ArrayList<>(partitionToFilesMap.keySet()); if (partitionTypes.contains(MetadataPartitionType.FILES)) { // Record which saves the list of all partitions HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); - if (partitions.isEmpty()) { - // in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit - final HoodieData allPartitionRecordsRDD = engineContext.parallelize( - Collections.singletonList(allPartitionRecord), 1); - partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD); - commit(createInstantTime, partitionToRecordsMap, false); - return; - } HoodieData filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); @@ -1051,28 +1043,31 @@ private void initialCommit(String createInstantTime, List partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); } - LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata"); + LOG.info("Committing " + partitions.size() + " partitions and " + partitionToFilesMap.values().size() + " files to metadata"); + commit(createInstantTime, partitionToRecordsMap, false); } private HoodieData getFilesPartitionRecords(String createInstantTime, List partitionInfoList, HoodieRecord allPartitionRecord) { HoodieData filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); - if (!partitionInfoList.isEmpty()) { - HoodieData fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { - Map fileNameToSizeMap = partitionInfo.getFileNameToSizeMap(); - // filter for files that are part of the completed commits - Map validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> { - String commitTime = FSUtils.getCommitTime(fileSizePair.getKey()); - return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime); - }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - // Record which saves files within a partition - return HoodieMetadataPayload.createPartitionFilesRecord( - HoodieTableMetadataUtil.getPartition(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty()); - }); - filesPartitionRecords = filesPartitionRecords.union(fileListRecords); + if (partitionInfoList.isEmpty()) { + return filesPartitionRecords; } - return filesPartitionRecords; + + HoodieData fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { + Map fileNameToSizeMap = partitionInfo.getFileNameToSizeMap(); + // filter for files that are part of the completed commits + Map validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> { + String commitTime = FSUtils.getCommitTime(fileSizePair.getKey()); + return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime); + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // Record which saves files within a partition + return HoodieMetadataPayload.createPartitionFilesRecord( + HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty()); + }); + + return filesPartitionRecords.union(fileListRecords); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index aebf046fe50cf..54de53581792a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1454,7 +1454,7 @@ public void testColStatsPrefixLookup() throws IOException { .forEach(partitionWriteStat -> { String partitionStatName = partitionWriteStat.getKey(); List writeStats = partitionWriteStat.getValue(); - String partition = HoodieTableMetadataUtil.getPartition(partitionStatName); + String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName); if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) { commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 1b33de795b956..58d186f971cb8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -80,7 +80,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; -import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartition; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal; /** @@ -256,7 +256,7 @@ public static HoodieRecord createPartitionListRecord(List */ public static HoodieRecord createPartitionListRecord(List partitions, boolean isDeleted) { Map fileInfo = new HashMap<>(); - partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, isDeleted))); + partitions.forEach(partition -> fileInfo.put(getPartitionIdentifier(partition), new HoodieMetadataFileInfo(0L, isDeleted))); HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 21337ceaeb3a7..24ac935f88abf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -297,7 +297,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); // Add record bearing added partitions list - List partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); + List partitionsAdded = getPartitionsAdded(commitMetadata); // Add record bearing deleted partitions list List partitionsDeleted = getPartitionsDeleted(commitMetadata); @@ -312,7 +312,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo String partitionStatName = entry.getKey(); List writeStats = entry.getValue(); - String partition = getPartition(partitionStatName); + String partition = getPartitionIdentifier(partitionStatName); HashMap updatedFilesToSizesMapping = writeStats.stream().reduce(new HashMap<>(writeStats.size()), @@ -352,16 +352,26 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo return records; } - private static ArrayList getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { + private static List getPartitionsAdded(HoodieCommitMetadata commitMetadata) { + return commitMetadata.getPartitionToWriteStats().keySet().stream() + // We need to make sure we properly handle case of non-partitioned tables + .map(HoodieTableMetadataUtil::getPartitionIdentifier) + .collect(Collectors.toList()); + } + + private static List getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { if (commitMetadata instanceof HoodieReplaceCommitMetadata && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { Map> partitionToReplaceFileIds = ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); - if (!partitionToReplaceFileIds.isEmpty()) { - return new ArrayList<>(partitionToReplaceFileIds.keySet()); - } + + return partitionToReplaceFileIds.keySet().stream() + // We need to make sure we properly handle case of non-partitioned tables + .map(HoodieTableMetadataUtil::getPartitionIdentifier) + .collect(Collectors.toList()); } - return new ArrayList<>(); + + return Collections.emptyList(); } /** @@ -469,7 +479,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCl int[] fileDeleteCount = {0}; List deletedPartitions = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> { - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); // Files deleted from a partition List deletedFiles = partitionMetadata.getDeletePathPatterns(); HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), @@ -776,7 +786,7 @@ private static List convertFilesToFilesPartitionRecords(Map { fileChangeCount[0] += deletedFiles.size(); - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); Option> filesAdded = Option.empty(); if (partitionToAppendedFiles.containsKey(partitionName)) { @@ -789,7 +799,7 @@ private static List convertFilesToFilesPartitionRecords(Map { - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); fileChangeCount[1] += appendedFileMap.size(); // Validate that no appended file has been deleted @@ -811,12 +821,9 @@ private static List convertFilesToFilesPartitionRecords(Map convertFilesToBloomFilterRecords(HoodieEn return Stream.empty(); } - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); return Stream.of(HoodieMetadataPayload.createBloomFilterMetadataRecord( partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true)); }).iterator(); @@ -857,7 +864,7 @@ public static HoodieData convertFilesToBloomFilterRecords(HoodieEn HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { final String partitionName = partitionToAppendedFilesPair.getLeft(); final Map appendedFileMap = partitionToAppendedFilesPair.getRight(); - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> { final String appendedFile = appendedFileLengthPairEntry.getKey(); if (!FSUtils.isBaseFile(new Path(appendedFile))) { @@ -912,7 +919,7 @@ public static HoodieData convertFilesToColumnStatsRecords(HoodieEn HoodieData deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> { final String partitionName = partitionToDeletedFilesPair.getLeft(); - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); final List deletedFileList = partitionToDeletedFilesPair.getRight(); return deletedFileList.stream().flatMap(deletedFile -> { @@ -929,7 +936,7 @@ public static HoodieData convertFilesToColumnStatsRecords(HoodieEn HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { final String partitionName = partitionToAppendedFilesPair.getLeft(); - final String partition = getPartition(partitionName); + final String partition = getPartitionIdentifier(partitionName); final Map appendedFileMap = partitionToAppendedFilesPair.getRight(); return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> { @@ -1133,7 +1140,7 @@ private static Stream getColumnStatsRecords(String partitionPath, HoodieTableMetaClient datasetMetaClient, List columnsToIndex, boolean isDeleted) { - String partitionName = getPartition(partitionPath); + String partitionName = getPartitionIdentifier(partitionPath); // NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like // absolute path String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;