diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 0cef5550af8b7..1eee18b9d480f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -133,6 +133,23 @@ public class HoodieIndexConfig extends HoodieConfig { + "When true, the index lookup uses bloom filters and column stats from metadata " + "table when available to speed up the process."); + public static final ConfigProperty BLOOM_INDEX_METADATA_READ_PARALLELISM = ConfigProperty + .key("hoodie.bloom.index.metadata.read.parallelism") + .defaultValue(10) + .sinceVersion("0.13.0") + .withDocumentation("Only applies if index type is BLOOM and metadata table is enabled " + + "for index lookup (hoodie.bloom.index.use.metadata=true). " + + "Determines the parallelism for reading the index from metadata table."); + + public static final ConfigProperty BLOOM_INDEX_METADATA_BLOOM_FILTER_READ_BATCH_SIZE = ConfigProperty + .key("hoodie.bloom.index.metadata.bloom.filter.read.batch.size") + .defaultValue(128) + .sinceVersion("0.13.0") + .withDocumentation("Only applies if index type is BLOOM and metadata table is enabled " + + "for index lookup (hoodie.bloom.index.use.metadata=true). " + + "Determines the batch size for reading bloom filters from metadata table. " + + "Smaller value puts less pressure on the executor memory."); + public static final ConfigProperty BLOOM_INDEX_TREE_BASED_FILTER = ConfigProperty .key("hoodie.bloom.index.use.treebased.filter") .defaultValue("true") @@ -540,6 +557,16 @@ public Builder bloomIndexUseMetadata(boolean useMetadata) { return this; } + public Builder bloomIndexMetadataReadParallelism(int parallelism) { + hoodieIndexConfig.setValue(BLOOM_INDEX_METADATA_READ_PARALLELISM, String.valueOf(parallelism)); + return this; + } + + public Builder bloomIndexMetadataBloomFilterReadBatchSize(int batchSize) { + hoodieIndexConfig.setValue(BLOOM_INDEX_METADATA_BLOOM_FILTER_READ_BATCH_SIZE, String.valueOf(batchSize)); + return this; + } + public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) { hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER, String.valueOf(useTreeFilter)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6178e63e3606c..d48777d41bbb4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1580,6 +1580,14 @@ public boolean getBloomIndexUseMetadata() { return getBooleanOrDefault(HoodieIndexConfig.BLOOM_INDEX_USE_METADATA); } + public int getBloomIndexMetadataReadParallelism() { + return getIntOrDefault(HoodieIndexConfig.BLOOM_INDEX_METADATA_READ_PARALLELISM); + } + + public int getBloomIndexMetadataBloomFilterReadBatchSize() { + return getIntOrDefault(HoodieIndexConfig.BLOOM_INDEX_METADATA_BLOOM_FILTER_READ_BATCH_SIZE); + } + public boolean useBloomIndexTreebasedFilter() { return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java index 9430d9bb5e50b..05c1b27a54cb7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java @@ -29,7 +29,6 @@ import org.apache.hudi.table.HoodieTable; import java.io.Serializable; -import java.util.List; import java.util.Map; /** @@ -43,7 +42,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable { * @param context {@link HoodieEngineContext} instance to use. * @param hoodieTable {@link HoodieTable} instance to use. * @param partitionRecordKeyPairs Pairs of partition path and record key. - * @param fileComparisonPairs Pairs of filename and record key based on file comparisons. + * @param fileComparisonPairs Pairs of (file ID, filename) pair and record key based on file comparisons. * @param partitionToFileInfo Partition path to {@link BloomIndexFileInfo} map. * @param recordsPerPartition Number of records per partition in a map. * @return {@link HoodiePairData} of {@link HoodieKey} and {@link HoodieRecordLocation} pairs. @@ -51,7 +50,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable { public abstract HoodiePairData findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData partitionRecordKeyPairs, - HoodieData> fileComparisonPairs, - Map> partitionToFileInfo, + HoodieData, HoodieKey>> fileComparisonPairs, + Map> partitionToFileInfo, Map recordsPerPartition); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java index 11ffb785f014e..f1692ebcf9226 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java @@ -27,19 +27,20 @@ public class BloomIndexFileInfo implements Serializable { private final String fileId; - + private final String filename; private final String minRecordKey; - private final String maxRecordKey; - public BloomIndexFileInfo(String fileId, String minRecordKey, String maxRecordKey) { + public BloomIndexFileInfo(String fileId, String filename, String minRecordKey, String maxRecordKey) { this.fileId = fileId; + this.filename = filename; this.minRecordKey = minRecordKey; this.maxRecordKey = maxRecordKey; } - public BloomIndexFileInfo(String fileId) { + public BloomIndexFileInfo(String fileId, String filename) { this.fileId = fileId; + this.filename = filename; this.minRecordKey = null; this.maxRecordKey = null; } @@ -48,6 +49,10 @@ public String getFileId() { return fileId; } + public String getFilename() { + return filename; + } + public String getMinRecordKey() { return minRecordKey; } @@ -78,20 +83,23 @@ public boolean equals(Object o) { } BloomIndexFileInfo that = (BloomIndexFileInfo) o; - return Objects.equals(that.fileId, fileId) && Objects.equals(that.minRecordKey, minRecordKey) + return Objects.equals(that.fileId, fileId) + && Objects.equals(that.filename, filename) + && Objects.equals(that.minRecordKey, minRecordKey) && Objects.equals(that.maxRecordKey, maxRecordKey); } @Override public int hashCode() { - return Objects.hash(fileId, minRecordKey, maxRecordKey); + return Objects.hash(fileId, filename, minRecordKey, maxRecordKey); } @Override public String toString() { final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {"); sb.append(" fileId=").append(fileId); + sb.append(" filename=").append(filename); sb.append(" minRecordKey=").append(minRecordKey); sb.append(" maxRecordKey=").append(maxRecordKey); sb.append('}'); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 1417e40a9f587..387837c3f3af8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -40,6 +40,7 @@ import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.HoodieRangeInfoHandle; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -47,11 +48,10 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper; @@ -121,12 +121,14 @@ private HoodiePairData lookupIndex( // Step 2: Load all involved files as pairs List> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList); - final Map> partitionToFileInfo = - fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList()))); + // partition -> {file ID -> BloomIndexFileInfo instance} + final Map> partitionToFileInfo = fileInfoList.stream() + .collect(groupingBy(Pair::getLeft, toMap(entry -> entry.getRight().getFileId(), Pair::getRight))); // Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id, // that contains it. - HoodieData> fileComparisonPairs = + // Each entry: ((File ID, Filename), HoodieKey instance) + HoodieData, HoodieKey>> fileComparisonPairs = explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs); return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable, @@ -142,7 +144,8 @@ private List> getBloomIndexFileInfoForPartition // load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available if (config.getBloomIndexUseMetadata() && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) { - fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable); + fileInfoList = loadColumnRangesFromMetaIndex( + affectedPartitionPathList, context, hoodieTable, config.getBloomIndexMetadataReadParallelism()); } // fallback to loading column ranges from files if (isNullOrEmpty(fileInfoList)) { @@ -161,21 +164,30 @@ private List> getBloomIndexFileInfoForPartition List> loadColumnRangesFromFiles( List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { // Obtain the latest data files from all the partitions. - List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() - .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) - .collect(toList()); + // Each entry: (relative partition path, (File ID, Filename)) + List>> partitionPathFileIdNameList = + getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() + .map(pair -> Pair.of( + pair.getKey(), + Pair.of(pair.getValue().getFileId(), pair.getValue().getFileName()))) + .collect(toList()); context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName()); - return context.map(partitionPathFileIDList, pf -> { + return context.map(partitionPathFileIdNameList, pf -> { try { - HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); + HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle( + config, hoodieTable, Pair.of(pf.getLeft(), pf.getRight().getLeft())); String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); - return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); + return Pair.of( + pf.getKey(), + new BloomIndexFileInfo(pf.getValue().getKey(), pf.getValue().getValue(), minMaxKeys[0], minMaxKeys[1])); } catch (MetadataNotFoundException me) { LOG.warn("Unable to find range metadata in file :" + pf); - return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); + return Pair.of( + pf.getKey(), + new BloomIndexFileInfo(pf.getValue().getKey(), pf.getValue().getValue())); } - }, Math.max(partitionPathFileIDList.size(), 1)); + }, Math.max(partitionPathFileIdNameList.size(), 1)); } /** @@ -188,12 +200,17 @@ List> loadColumnRangesFromFiles( */ private List> getFileInfoForLatestBaseFiles( List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { - List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, - hoodieTable).stream() - .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) - .collect(toList()); + List>> partitionPathFileIDList = + getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() + .map(pair -> Pair.of( + pair.getKey(), + Pair.of(pair.getValue().getFileId(), pair.getValue().getFileName()))) + .collect(toList()); return partitionPathFileIDList.stream() - .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); + .map(pf -> Pair.of( + pf.getKey(), + new BloomIndexFileInfo(pf.getValue().getKey(), pf.getValue().getValue()))) + .collect(toList()); } /** @@ -202,41 +219,57 @@ private List> getFileInfoForLatestBaseFiles( * @param partitions - List of partitions for which column stats need to be loaded * @param context - Engine context * @param hoodieTable - Hoodie table + * @param parallelism - Parallelism for reading column stats from metadata table * @return List of partition and file column range info pairs */ protected List> loadColumnRangesFromMetaIndex( - List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { + List partitions, final HoodieEngineContext context, + final HoodieTable hoodieTable, int parallelism) { // also obtain file ranges, if range pruning is enabled context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName()); final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); - return context.flatMap(partitions, partitionName -> { - // Partition and file name pairs - List> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName, - hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName())) - .sorted() - .collect(toList()); - if (partitionFileNameList.isEmpty()) { - return Stream.empty(); - } - try { - Map, HoodieMetadataColumnStats> fileToColumnStatsMap = - hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField); - List> result = new ArrayList<>(); - for (Map.Entry, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) { - result.add(Pair.of(entry.getKey().getLeft(), - new BloomIndexFileInfo( - FSUtils.getFileId(entry.getKey().getRight()), - // NOTE: Here we assume that the type of the primary key field is string - (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()), - (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue()) - ))); - } - return result.stream(); - } catch (MetadataNotFoundException me) { - throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me); - } - }, Math.max(partitions.size(), 1)); + return context.parallelize(partitions, Math.max(Math.min(partitions.size(), parallelism), 1)) + .mapPartitions(partitionIterator -> { + List partitionNameList = new ArrayList<>(); + List> partitionFileNameList = new ArrayList<>(); + List> result = new ArrayList<>(); + + // For the list of partitions, get all file names to fetch column stats + while (partitionIterator.hasNext()) { + String partitionName = partitionIterator.next(); + partitionNameList.add(partitionName); + partitionFileNameList.addAll( + HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName, hoodieTable).stream() + .map(baseFile -> Pair.of(partitionName, baseFile.getFileName())) + .collect(Collectors.toList())); + } + if (partitionFileNameList.isEmpty()) { + return result.iterator(); + } + + // Sort the file name list and do lookup of column stats from metadata table + List> sortedPartitionFileNameList = + partitionFileNameList.stream().sorted().collect(toList()); + try { + Map, HoodieMetadataColumnStats> fileToColumnStatsMap = + hoodieTable.getMetadataTable().getColumnStats(sortedPartitionFileNameList, keyField); + for (Map.Entry, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) { + result.add(Pair.of(entry.getKey().getLeft(), + new BloomIndexFileInfo( + FSUtils.getFileId(entry.getKey().getRight()), + entry.getKey().getRight(), + // NOTE: Here we assume that the type of the primary key field is string + (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()), + (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue()) + ))); + } + return result.iterator(); + } catch (MetadataNotFoundException me) { + throw new HoodieMetadataException("Unable to find column range metadata for partitions:" + partitionNameList, me); + } + }, true) + .collectAsList(); } @Override @@ -277,8 +310,8 @@ public boolean isImplicitWithStorage() { * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on * recordKey ranges in the index info. */ - HoodieData> explodeRecordsWithFileComparisons( - final Map> partitionToFileIndexInfo, + HoodieData, HoodieKey>> explodeRecordsWithFileComparisons( + final Map> partitionToFileIndexInfo, HoodiePairData partitionRecordKeyPairs) { IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) @@ -289,8 +322,9 @@ HoodieData> explodeRecordsWithFileComparisons( String partitionPath = partitionRecordKeyPair.getLeft(); return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() - .map(partitionFileIdPair -> (Pair) new ImmutablePair<>(partitionFileIdPair.getRight(), - new HoodieKey(recordKey, partitionPath))) + .map(partitionFileIdPair -> (Pair, HoodieKey>) + new ImmutablePair<>(partitionFileIdPair.getRight(), + new HoodieKey(recordKey, partitionPath))) .collect(Collectors.toList()); }).flatMap(List::iterator); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java index 5f2007ea53668..040dd6b3972bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java @@ -74,8 +74,8 @@ List> loadColumnRangesFromFiles(List pa */ @Override - HoodieData> explodeRecordsWithFileComparisons( - final Map> partitionToFileIndexInfo, + HoodieData, HoodieKey>> explodeRecordsWithFileComparisons( + final Map> partitionToFileIndexInfo, HoodiePairData partitionRecordKeyPairs) { IndexFileFilter indexFileFilter = @@ -87,7 +87,7 @@ HoodieData> explodeRecordsWithFileComparisons( String partitionPath = partitionRecordKeyPair.getLeft(); return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() - .map(partitionFileIdPair -> (Pair) new ImmutablePair<>(partitionFileIdPair.getRight(), + .map(partitionFileIdPair -> (Pair, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(), new HoodieKey(recordKey, partitionFileIdPair.getLeft()))) .collect(Collectors.toList()); }).flatMap(List::iterator); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java index 017a5692252a3..5d26cd01a37b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java @@ -33,9 +33,9 @@ public interface IndexFileFilter extends Serializable { * * @param partitionPath the partition path of interest * @param recordKey the record key to be looked up - * @return the {@link Set} of matching pairs where the record could potentially be + * @return the {@link Set} of matching > pairs where the record could potentially be * present. */ - Set> getMatchingFilesAndPartition(String partitionPath, String recordKey); + Set>> getMatchingFilesAndPartition(String partitionPath, String recordKey); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java index 18b094890081b..c48a2f786898c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java @@ -36,20 +36,21 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree(); private final Set filesWithNoRanges = new HashSet<>(); - private final Map fileIdToPartitionPathMap = new HashMap<>(); + private final Map> fileIdToPartitionPathAndFilenameMap = new HashMap<>(); /** * Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter}. * * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s */ - IntervalTreeBasedGlobalIndexFileFilter(final Map> partitionToFileIndexInfo) { + IntervalTreeBasedGlobalIndexFileFilter(final Map> partitionToFileIndexInfo) { List allIndexFiles = new ArrayList<>(); - partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> { - fileIdToPartitionPathMap.put(file.getFileId(), partition); - allIndexFiles.add(file); - })); + partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoMap) -> bloomIndexFileInfoMap.values().forEach( + file -> { + fileIdToPartitionPathAndFilenameMap.put(file.getFileId(), Pair.of(partition, file.getFilename())); + allIndexFiles.add(file); + })); // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed @@ -66,12 +67,17 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { } @Override - public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + public Set>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { Set matchingFiles = new HashSet<>(); matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey)); matchingFiles.addAll(filesWithNoRanges); - Set> toReturn = new HashSet<>(); - matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file))); + Set>> toReturn = new HashSet<>(); + matchingFiles.forEach(fileId -> { + Pair partitionPathAndFilenamePair = fileIdToPartitionPathAndFilenameMap.get(fileId); + toReturn.add(Pair.of( + partitionPathAndFilenamePair.getLeft(), + Pair.of(fileId, partitionPathAndFilenamePair.getRight()))); + }); return toReturn; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java index 53ef653eb107d..7f6296463b378 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.util.collection.Pair; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -35,20 +36,23 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { private final Map partitionToFileIndexLookUpTree = new HashMap<>(); private final Map> partitionToFilesWithNoRanges = new HashMap<>(); + private final Map fileIdToFilenameMap = new HashMap<>(); /** * Instantiates {@link IntervalTreeBasedIndexFileFilter}. * * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s */ - IntervalTreeBasedIndexFileFilter(final Map> partitionToFileIndexInfo) { - partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> { + IntervalTreeBasedIndexFileFilter(final Map> partitionToFileIndexInfo) { + partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoMap) -> { // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be // skewed which could result in N search time instead of logN. + List bloomIndexFiles = new ArrayList<>(bloomIndexFileInfoMap.values()); Collections.shuffle(bloomIndexFiles); KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree(); bloomIndexFiles.forEach(indexFileInfo -> { + fileIdToFilenameMap.put(indexFileInfo.getFileId(), indexFileInfo.getFilename()); if (indexFileInfo.hasKeyRanges()) { lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileId())); @@ -64,16 +68,16 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { } @Override - public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { - Set> toReturn = new HashSet<>(); + public Set>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + Set>> toReturn = new HashSet<>(); // could be null, if there are no files in a given partition yet or if all index files have no ranges if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) { - partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file -> - toReturn.add(Pair.of(partitionPath, file))); + partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(fileId -> + toReturn.add(Pair.of(partitionPath, Pair.of(fileId, fileIdToFilenameMap.get(fileId))))); } if (partitionToFilesWithNoRanges.containsKey(partitionPath)) { - partitionToFilesWithNoRanges.get(partitionPath).forEach(file -> - toReturn.add(Pair.of(partitionPath, file))); + partitionToFilesWithNoRanges.get(partitionPath).forEach(fileId -> + toReturn.add(Pair.of(partitionPath, Pair.of(fileId, fileIdToFilenameMap.get(fileId))))); } return toReturn; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java index dce763a175337..52c8d278609ad 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.util.collection.Pair; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -32,18 +31,19 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter { * * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo} */ - ListBasedGlobalIndexFileFilter(Map> partitionToFileIndexInfo) { + ListBasedGlobalIndexFileFilter(Map> partitionToFileIndexInfo) { super(partitionToFileIndexInfo); } @Override - public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { - Set> toReturn = new HashSet<>(); - partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> { - if (shouldCompareWithFile(file, recordKey)) { - toReturn.add(Pair.of(partition, file.getFileId())); - } - })); + public Set>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + Set>> toReturn = new HashSet<>(); + partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoListMap) -> + bloomIndexFileInfoListMap.values().forEach(fileInfo -> { + if (shouldCompareWithFile(fileInfo, recordKey)) { + toReturn.add(Pair.of(partition, Pair.of(fileInfo.getFileId(), fileInfo.getFilename()))); + } + })); return toReturn; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java index cffee5ee74081..d8ea67c6e6d12 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java @@ -56,10 +56,11 @@ public static ListBasedHoodieBloomIndexHelper getInstance() { public HoodiePairData findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData partitionRecordKeyPairs, - HoodieData> fileComparisonPairs, - Map> partitionToFileInfo, Map recordsPerPartition) { + HoodieData, HoodieKey>> fileComparisonPairs, + Map> partitionToFileInfo, Map recordsPerPartition) { List> fileComparisonPairList = fileComparisonPairs.collectAsList().stream() + .map(e -> Pair.of(e.getLeft().getLeft(), e.getRight())) .sorted(Comparator.comparing(Pair::getLeft)).collect(toList()); List keyLookupResults = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java index c5e39146fbdf9..9b56f1481f4ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.util.collection.Pair; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -31,26 +30,26 @@ */ class ListBasedIndexFileFilter implements IndexFileFilter { - final Map> partitionToFileIndexInfo; + final Map> partitionToFileIndexInfo; /** * Instantiates {@link ListBasedIndexFileFilter}. * * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo} */ - ListBasedIndexFileFilter(final Map> partitionToFileIndexInfo) { + ListBasedIndexFileFilter(final Map> partitionToFileIndexInfo) { this.partitionToFileIndexInfo = partitionToFileIndexInfo; } @Override - public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { - List indexInfos = partitionToFileIndexInfo.get(partitionPath); - Set> toReturn = new HashSet<>(); + public Set>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + Map indexInfos = partitionToFileIndexInfo.get(partitionPath); + Set>> toReturn = new HashSet<>(); if (indexInfos != null) { // could be null, if there are no files in a given partition yet. // for each candidate file in partition, that needs to be compared. - for (BloomIndexFileInfo indexInfo : indexInfos) { + for (BloomIndexFileInfo indexInfo : indexInfos.values()) { if (shouldCompareWithFile(indexInfo, recordKey)) { - toReturn.add(Pair.of(partitionPath, indexInfo.getFileId())); + toReturn.add(Pair.of(partitionPath, Pair.of(indexInfo.getFileId(), indexInfo.getFilename()))); } } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/BloomIndexTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/BloomIndexTestUtils.java new file mode 100644 index 0000000000000..9204785f9870e --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/BloomIndexTestUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.index.bloom.BloomIndexFileInfo; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BloomIndexTestUtils { + public static Map convertBloomIndexFileInfoListToMap( + List infoList) { + return infoList.stream().collect(Collectors.toMap(BloomIndexFileInfo::getFileId, Function.identity())); + } + + public static Map convertBloomIndexFileInfoPairListToMap( + List> filesList) { + Map filesMap = new HashMap<>(); + for (Pair t : filesList) { + filesMap.put(t.getKey() + "/" + t.getValue().getFileId(), t.getValue()); + } + return filesMap; + } + + public static void assertBloomInfoEquals( + List> expected, List> actual) { + assertEquals(expected.size(), actual.size()); + Map expectedMap = convertBloomIndexFileInfoPairListToMap(expected); + Map actualMap = convertBloomIndexFileInfoPairListToMap(actual); + + for (String key : expectedMap.keySet()) { + assertTrue(actualMap.containsKey(key)); + BloomIndexFileInfo expectedInfo = expectedMap.get(key); + BloomIndexFileInfo actualInfo = actualMap.get(key); + assertEquals(expectedInfo.getFileId(), actualInfo.getFileId()); + assertEquals(expectedInfo.getMinRecordKey(), actualInfo.getMinRecordKey()); + assertEquals(expectedInfo.getMaxRecordKey(), actualInfo.getMaxRecordKey()); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index b62e09eb275f6..50ca2b77bddb1 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -18,8 +18,6 @@ package org.apache.hudi.index.bloom; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; @@ -38,6 +36,9 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,6 +55,8 @@ import static java.util.Arrays.asList; import static java.util.UUID.randomUUID; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.apache.hudi.testutils.BloomIndexTestUtils.assertBloomInfoEquals; +import static org.apache.hudi.testutils.BloomIndexTestUtils.convertBloomIndexFileInfoListToMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -154,11 +157,11 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b // no longer sorted, but should have same files. List> expected = - asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2")), - Pair.of("2015/03/12", new BloomIndexFileInfo("1")), - Pair.of("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), - Pair.of("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); - assertEquals(expected, filesList); + asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2", "")), + Pair.of("2015/03/12", new BloomIndexFileInfo("1", "")), + Pair.of("2015/03/12", new BloomIndexFileInfo("3", "", "000", "000")), + Pair.of("2015/03/12", new BloomIndexFileInfo("4", "", "001", "003"))); + assertBloomInfoEquals(expected, filesList); } } @@ -168,11 +171,12 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance()); - final Map> partitionToFileIndexInfo = new HashMap<>(); + final Map> partitionToFileIndexInfo = new HashMap<>(); partitionToFileIndexInfo.put("2017/10/22", - asList(new BloomIndexFileInfo("f1"), new BloomIndexFileInfo("f2", "000", "000"), - new BloomIndexFileInfo("f3", "001", "003"), new BloomIndexFileInfo("f4", "002", "007"), - new BloomIndexFileInfo("f5", "009", "010"))); + convertBloomIndexFileInfoListToMap(asList(new BloomIndexFileInfo("f1", ""), new BloomIndexFileInfo("f2", "", "000", "000"), + new BloomIndexFileInfo("f3", "", "001", "003"), + new BloomIndexFileInfo("f4", "", "002", "007"), + new BloomIndexFileInfo("f5", "", "009", "010")))); Map> partitionRecordKeyMap = new HashMap<>(); asList(Pair.of("2017/10/22", "003"), Pair.of("2017/10/22", "002"), @@ -183,11 +187,14 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea partitionRecordKeyMap.put(t.getLeft(), recordKeyList); }); - List> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList(); + List, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons( + partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList(); assertEquals(10, comparisonKeyList.size()); java.util.Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList()))); + .collect(java.util.stream.Collectors.groupingBy( + t -> t.getRight().getRecordKey(), + java.util.stream.Collectors.mapping(t -> t.getLeft().getLeft(), java.util.stream.Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002"))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java index 8a2958eab9da8..e759fb76710e8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java @@ -18,31 +18,33 @@ package org.apache.hudi.index.bloom; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.HoodieKeyLookupResult; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.function.Function2; -import scala.Tuple2; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Set; + +import scala.Tuple2; /** * Spark Function2 implementation for checking bloom filters for the @@ -53,26 +55,30 @@ * keys are checked against them. */ public class HoodieMetadataBloomIndexCheckFunction implements - Function2>, Iterator>> { + Function2, HoodieKey>>, Iterator>> { private static final Logger LOG = LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class); - // Assuming each file bloom filter takes up 512K, sizing the max file count - // per batch so that the total fetched bloom filters would not cross 128 MB. - private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256; private final HoodieTable hoodieTable; + private final String basePath; + private final int batchSize; - public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable) { + public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable, String basePath, int batchSize) { this.hoodieTable = hoodieTable; + this.basePath = basePath; + this.batchSize = batchSize; } @Override - public Iterator> call(Integer integer, Iterator> tuple2Iterator) throws Exception { + public Iterator> call(Integer integer, Iterator, HoodieKey>> tuple2Iterator) throws Exception { return new BloomIndexLazyKeyCheckIterator(tuple2Iterator); } - private class BloomIndexLazyKeyCheckIterator extends LazyIterableIterator, List> { - public BloomIndexLazyKeyCheckIterator(Iterator> tuple2Iterator) { + private class BloomIndexLazyKeyCheckIterator extends LazyIterableIterator, HoodieKey>, List> { + + private final Set processedFileIdSet = new HashSet<>(); + + public BloomIndexLazyKeyCheckIterator(Iterator, HoodieKey>> tuple2Iterator) { super(tuple2Iterator); } @@ -83,37 +89,64 @@ protected void start() { @Override protected List computeNext() { // Partition path and file name pair to list of keys - final Map, List> fileToKeysMap = new HashMap<>(); - final Map fileIDBaseFileMap = new HashMap<>(); + final Map, List> batchFileToKeysMap = new HashMap<>(); final List resultList = new ArrayList<>(); + String lastFileId = null; + + try { + // Here we batch process the lookup of bloom filters in metadata table + // assuming the partition path and file name pairs are already sorted by the corresponding key + while (inputItr.hasNext()) { + Tuple2, HoodieKey> entry = inputItr.next(); + final String partitionPath = entry._2.getPartitionPath(); + final String fileId = entry._1._1(); + final String filename = entry._1._2(); + + if (lastFileId == null || !lastFileId.equals(fileId)) { + if (processedFileIdSet.contains(fileId)) { + LOG.warn(String.format("Fetching the bloom filter for file ID %s again. " + + " The input pairs of file ID and record key are not sorted.", fileId)); + } + lastFileId = fileId; + processedFileIdSet.add(fileId); + } + + batchFileToKeysMap.computeIfAbsent(Pair.of(partitionPath, filename), k -> new ArrayList<>()).add(entry._2); - while (inputItr.hasNext()) { - Tuple2 entry = inputItr.next(); - final String partitionPath = entry._2.getPartitionPath(); - final String fileId = entry._1; - if (!fileIDBaseFileMap.containsKey(fileId)) { - Option baseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId); - if (!baseFile.isPresent()) { - throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath - + ", fileId: " + fileId); + if (batchFileToKeysMap.size() == batchSize) { + resultList.addAll(lookupKeysInBloomFilters(batchFileToKeysMap)); + batchFileToKeysMap.clear(); } - fileIDBaseFileMap.put(fileId, baseFile.get()); } - fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), - k -> new ArrayList<>()).add(entry._2); - if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) { - break; + + if (batchFileToKeysMap.size() > 0) { + resultList.addAll(lookupKeysInBloomFilters(batchFileToKeysMap)); + batchFileToKeysMap.clear(); } + + return resultList; + } catch (Throwable e) { + if (e instanceof HoodieException) { + throw e; + } + throw new HoodieIndexException("Error checking bloom filter using metadata table.", e); } - if (fileToKeysMap.isEmpty()) { - return Collections.emptyList(); - } + } + + @Override + protected void end() { + } - List> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet()); + private List lookupKeysInBloomFilters( + Map, List> fileToKeysMap) { + List resultList = new ArrayList<>(); + List> partitionPathFileNameList = new ArrayList<>(fileToKeysMap.keySet()); + HoodieTimer timer = HoodieTimer.start(); Map, BloomFilter> fileToBloomFilterMap = - hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList); + hoodieTable.getMetadataTable().getBloomFilters(partitionPathFileNameList); + LOG.error(String.format("Took %d ms to look up %s bloom filters", + timer.endTimer(), partitionPathFileNameList.size())); - final AtomicInteger totalKeys = new AtomicInteger(0); fileToKeysMap.forEach((partitionPathFileNamePair, hoodieKeyList) -> { final String partitionPath = partitionPathFileNamePair.getLeft(); final String fileName = partitionPathFileNamePair.getRight(); @@ -127,28 +160,24 @@ protected List computeNext() { List candidateRecordKeys = new ArrayList<>(); hoodieKeyList.forEach(hoodieKey -> { - totalKeys.incrementAndGet(); if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) { candidateRecordKeys.add(hoodieKey.getRecordKey()); } }); - final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId); List matchingKeys = - HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), candidateRecordKeys, + HoodieIndexUtils.filterKeysFromFile( + new CachingPath(FSUtils.getPartitionPath(basePath, partitionPath), fileName), + candidateRecordKeys, hoodieTable.getHadoopConf()); LOG.debug( String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", hoodieKeyList.size(), candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size())); - resultList.add(new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys)); + resultList.add(new HoodieKeyLookupResult(fileId, partitionPath, FSUtils.getCommitTime(fileName), matchingKeys)); }); return resultList; } - - @Override - protected void end() { - } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 5736024dc2455..e9cc783ed0b8f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -25,10 +25,13 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.hash.FileIndexID; +import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.io.HoodieKeyLookupResult; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; @@ -66,12 +69,14 @@ public static SparkHoodieBloomIndexHelper getInstance() { public HoodiePairData findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData partitionRecordKeyPairs, - HoodieData> fileComparisonPairs, - Map> partitionToFileInfo, + HoodieData, HoodieKey>> fileComparisonPairs, + Map> partitionToFileInfo, Map recordsPerPartition) { - JavaRDD> fileComparisonsRDD = + JavaRDD, HoodieKey>> fileComparisonsRDD = HoodieJavaRDD.getJavaRDD(fileComparisonPairs) - .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight())); + .map(pair -> new Tuple2<>( + new Tuple2<>(pair.getLeft().getLeft(), pair.getLeft().getRight()), + pair.getRight())); int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size(); int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); @@ -82,25 +87,33 @@ public HoodiePairData findMatchingFilesForRecor if (config.getBloomIndexUseMetadata() && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions() .contains(BLOOM_FILTERS.getPartitionPath())) { - // Step 1: Sort by file id - JavaRDD> sortedFileIdAndKeyPairs = - fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism); + // Step 1: Sort by bloom filter key (Hash ID of partition and file name) in metadata table + JavaRDD, HoodieKey>> sortedFileIdAndKeyPairs = + fileComparisonsRDD + .sortBy(t -> HoodieMetadataPayload.getBloomFilterIndexKey( + new PartitionIndexID(t._1._1), new FileIndexID(t._1._2)), true, joinParallelism); // Step 2: Use bloom filter to filter and the actual log file to get the record location - keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex( - new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true); + keyLookupResultRDD = sortedFileIdAndKeyPairs.coalesce(config.getBloomIndexMetadataReadParallelism()) + .mapPartitionsWithIndex(new HoodieMetadataBloomIndexCheckFunction( + hoodieTable, + config.getBasePath(), + config.getBloomIndexMetadataBloomFilterReadBatchSize()), true); } else if (config.useBloomIndexBucketizedChecking()) { Map comparisonsPerFileGroup = computeComparisonsPerFileGroup( config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context); Partitioner partitioner = new BucketizedBloomCheckPartitioner(joinParallelism, comparisonsPerFileGroup, config.getBloomIndexKeysPerBucket()); - keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t)) + keyLookupResultRDD = fileComparisonsRDD + .mapToPair(t -> new Tuple2<>(Pair.of(t._1._1, t._2.getRecordKey()), new Tuple2<>(t._1._1, t._2))) .repartitionAndSortWithinPartitions(partitioner) .map(Tuple2::_2) .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true); } else { - keyLookupResultRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism) + keyLookupResultRDD = fileComparisonsRDD + .map(t -> new Tuple2<>(t._1._1, t._2)) + .sortBy(t -> t._1, true, joinParallelism) .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true); } @@ -118,19 +131,19 @@ public HoodiePairData findMatchingFilesForRecor private Map computeComparisonsPerFileGroup( final HoodieWriteConfig config, final Map recordsPerPartition, - final Map> partitionToFileInfo, - final JavaRDD> fileComparisonsRDD, + final Map> partitionToFileInfo, + final JavaRDD, HoodieKey>> fileComparisonsRDD, final HoodieEngineContext context) { Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName()); - fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); + fileToComparisons = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(t._1._1, t._2)).countByKey(); } else { fileToComparisons = new HashMap<>(); - partitionToFileInfo.forEach((key, value) -> { - for (BloomIndexFileInfo fileInfo : value) { + partitionToFileInfo.forEach((key, infoMap) -> { + for (BloomIndexFileInfo fileInfo : infoMap.values()) { // each file needs to be compared against all the records coming into the partition fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key)); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 5be4e4ce624a3..6481452d2edae 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -71,6 +71,8 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudoRandomUUID; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.apache.hudi.testutils.BloomIndexTestUtils.assertBloomInfoEquals; +import static org.apache.hudi.testutils.BloomIndexTestUtils.convertBloomIndexFileInfoListToMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -231,12 +233,12 @@ public void testLoadInvolvedFiles( // no longer sorted, but should have same files. - List> expected = - Arrays.asList(new ImmutablePair<>("2016/04/01", new BloomIndexFileInfo("2")), - new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("1")), - new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), - new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); - assertEquals(expected, filesList); + List> expected = + Arrays.asList(new ImmutablePair<>("2016/04/01", new BloomIndexFileInfo("2", "")), + new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("1", "")), + new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("3", "", "000", "000")), + new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("4", "", "001", "003"))); + assertBloomInfoEquals(expected, filesList); } } @@ -249,22 +251,25 @@ public void testRangePruning( makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); - final Map> partitionToFileIndexInfo = new HashMap<>(); + final Map> partitionToFileIndexInfo = new HashMap<>(); partitionToFileIndexInfo.put("2017/10/22", - Arrays.asList(new BloomIndexFileInfo("f1"), new BloomIndexFileInfo("f2", "000", "000"), - new BloomIndexFileInfo("f3", "001", "003"), new BloomIndexFileInfo("f4", "002", "007"), - new BloomIndexFileInfo("f5", "009", "010"))); + convertBloomIndexFileInfoListToMap(Arrays.asList(new BloomIndexFileInfo("f1", ""), new BloomIndexFileInfo("f2", "", "000", "000"), + new BloomIndexFileInfo("f3", "", "001", "003"), + new BloomIndexFileInfo("f4", "", "002", "007"), + new BloomIndexFileInfo("f5", "", "009", "010")))); JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); - List> comparisonKeyList = HoodieJavaRDD.getJavaRDD( + List, HoodieKey>> comparisonKeyList = HoodieJavaRDD.getJavaRDD( index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect(); assertEquals(10, comparisonKeyList.size()); Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(Pair::getLeft, Collectors.toList()))); + .collect(Collectors.groupingBy( + t -> t.getRight().getRecordKey(), + Collectors.mapping(t -> t.getLeft().getLeft(), Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 3ad8952feea84..b8f2455813e85 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -57,6 +58,9 @@ import scala.Tuple2; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.apache.hudi.testutils.BloomIndexTestUtils.assertBloomInfoEquals; +import static org.apache.hudi.testutils.BloomIndexTestUtils.convertBloomIndexFileInfoListToMap; +import static org.apache.hudi.testutils.BloomIndexTestUtils.convertBloomIndexFileInfoPairListToMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -164,7 +168,7 @@ public void testLoadInvolvedFiles() throws Exception { filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable); assertEquals(4, filesList.size()); - Map filesMap = toFileMap(filesList); + Map filesMap = convertBloomIndexFileInfoPairListToMap(filesList); // key ranges checks assertNull(filesMap.get("2016/04/01/2").getMaxRecordKey()); assertNull(filesMap.get("2016/04/01/2").getMinRecordKey()); @@ -173,13 +177,13 @@ public void testLoadInvolvedFiles() throws Exception { assertNotNull(filesMap.get("2015/03/12/3").getMinRecordKey()); assertTrue(filesMap.get("2015/03/12/3").hasKeyRanges()); - Map expected = new HashMap<>(); - expected.put("2016/04/01/2", new BloomIndexFileInfo("2")); - expected.put("2015/03/12/1", new BloomIndexFileInfo("1")); - expected.put("2015/03/12/3", new BloomIndexFileInfo("3", "000", "000")); - expected.put("2015/03/12/4", new BloomIndexFileInfo("4", "001", "003")); + List> expected = + Arrays.asList(new ImmutablePair<>("2016/04/01", new BloomIndexFileInfo("2", "")), + new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("1", "")), + new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("3", "", "000", "000")), + new ImmutablePair<>("2015/03/12", new BloomIndexFileInfo("4", "", "001", "003"))); - assertEquals(expected, filesMap); + assertBloomInfoEquals(expected, filesList); } @Test @@ -189,19 +193,23 @@ public void testExplodeRecordRDDWithFileComparisons() { HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); - final Map> partitionToFileIndexInfo = new HashMap<>(); - partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"), - new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003"))); + final Map> partitionToFileIndexInfo = new HashMap<>(); + partitionToFileIndexInfo.put("2017/10/22", convertBloomIndexFileInfoListToMap( + Arrays.asList(new BloomIndexFileInfo("f1", ""), + new BloomIndexFileInfo("f2", "", "000", "000"), + new BloomIndexFileInfo("f3", "", "001", "003")))); partitionToFileIndexInfo.put("2017/10/23", - Arrays.asList(new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); + convertBloomIndexFileInfoListToMap( + Arrays.asList(new BloomIndexFileInfo("f4", "", "002", "007"), + new BloomIndexFileInfo("f5", "", "009", "010")))); // the partition of the key of the incoming records will be ignored JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t); - List> comparisonKeyList = HoodieJavaRDD.getJavaRDD( + List, HoodieKey>> comparisonKeyList = HoodieJavaRDD.getJavaRDD( index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect(); @@ -216,7 +224,7 @@ public void testExplodeRecordRDDWithFileComparisons() { assertEquals(10, comparisonKeyList.size()); Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(Pair::getKey, Collectors.toList()))); + .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(t -> t.getLeft().getLeft(), Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002"))); @@ -437,14 +445,4 @@ public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception { assertEquals(p1, record.getPartitionPath()); assertEquals(incomingPayloadSamePartition.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData()); } - - // convert list to map to avoid sorting order dependencies - private static Map toFileMap(List> filesList) { - Map filesMap = new HashMap<>(); - for (Pair t : filesList) { - filesMap.put(t.getKey() + "/" + t.getValue().getFileId(), t.getValue()); - } - return filesMap; - } - } diff --git a/pom.xml b/pom.xml index 6489e632b45fd..38479568e99bb 100644 --- a/pom.xml +++ b/pom.xml @@ -1853,6 +1853,7 @@ flink1.14 + !disabled