Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
recordRDD.unpersist(); // unpersist the input Record RDD
keyFilenamePairRDD.unpersist();
}

return taggedRecordRDD;
}

Expand Down Expand Up @@ -321,8 +320,9 @@ JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();

return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
.map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath)))
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey
}

@Override
protected void start() {}
protected void start() {
}

@Override
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
Expand Down Expand Up @@ -113,6 +114,7 @@ protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
}

@Override
protected void end() {}
protected void end() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import scala.Tuple2;
Expand All @@ -59,7 +58,7 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
@Override
@VisibleForTesting
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
Expand All @@ -74,7 +73,7 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
*
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will
* be ignored since the search scope should be bigger than that
Expand All @@ -85,10 +84,6 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
Map<String, String> indexToPartitionMap = new HashMap<>();
for (Entry<String, List<BloomIndexFileInfo>> entry : partitionToFileIndexInfo.entrySet()) {
entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileId(), entry.getKey()));
}

IndexFileFilter indexFileFilter =
config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
Expand All @@ -98,26 +93,37 @@ JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();

return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
.map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}


/**
* Tagging for global index should only consider the record key.
*/
@Override
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD =
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {

JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));

// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), p._2)))
.values().map(value -> getTaggedRecord(value._1, Option.ofNullable(value._2.orNull())));
JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));

// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
final HoodieRecord<T> hoodieRecord = record._1;
final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
if (recordLocationHoodieKeyPair.isPresent()) {
// Record key matched to file
return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
} else {
return getTaggedRecord(hoodieRecord, Option.empty());
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.common.util.collection.Pair;

import java.io.Serializable;
import java.util.Set;

Expand All @@ -27,12 +29,13 @@
public interface IndexFileFilter extends Serializable {

/**
* Fetches all matching files for a given record key and partition.
* Fetches all matching files and partition pair for a given record key and partition path.
*
* @param partitionPath the partition path of interest
* @param recordKey the record key to be looked up
* @return the {@link Set} of matching file names where the record could potentially be present.
* @param recordKey the record key to be looked up
* @return the {@link Set} of matching <Partition path, file name> pairs where the record could potentially be
* present.
*/
Set<String> getMatchingFiles(String partitionPath, String recordKey);
Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.hudi.index.bloom;

import java.util.Collection;
import org.apache.hudi.common.util.collection.Pair;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Interval Tree based index look up for Global Index. Builds an {@link KeyRangeLookupTree} for all index files (across
Expand All @@ -34,15 +36,23 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {

private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree();
private final Set<String> filesWithNoRanges = new HashSet<>();
private final Map<String, String> fileIdToPartitionPathMap = new HashMap<>();

/**
* Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter}.
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s
*/
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
List<BloomIndexFileInfo> allIndexFiles =
partitionToFileIndexInfo.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>();

partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> {
bloomIndexFileInfoList.forEach(file -> {
fileIdToPartitionPathMap.put(file.getFileId(), parition);
allIndexFiles.add(file);
});
});

// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed
// which could result in N search time instead of NlogN.
Expand All @@ -58,10 +68,12 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
}

@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
toReturn.addAll(filesWithNoRanges);
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
Set<String> matchingFiles = new HashSet<>();
matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
matchingFiles.addAll(filesWithNoRanges);
Set<Pair<String, String>> toReturn = new HashSet<>();
matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file)));
return toReturn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.common.util.collection.Pair;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -62,14 +64,16 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter {
}

@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
Set<Pair<String, String>> toReturn = new HashSet<>();
// could be null, if there are no files in a given partition yet or if all index files have no ranges
if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
toReturn.addAll(partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey));
partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
toReturn.add(Pair.of(partitionPath, file)));
}
if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
toReturn.addAll(partitionToFilesWithNoRanges.get(partitionPath));
partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
toReturn.add(Pair.of(partitionPath, file)));
}
return toReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public String toString() {
*
* @param that the {@link KeyRangeNode} to be compared with
* @return the result of comparison. 0 if both min and max are equal in both. 1 if this {@link KeyRangeNode} is
* greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this
* {@link KeyRangeNode}
* greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this {@link
* KeyRangeNode}
*/
@Override
public int compareTo(KeyRangeNode that) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.common.util.collection.Pair;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -35,17 +37,14 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter {
}

@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
partitionToFileIndexInfo.values().forEach(indexInfos -> {
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
toReturn.add(indexInfo.getFileId());
}
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
Set<Pair<String, String>> toReturn = new HashSet<>();
partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> {
bloomIndexFileInfoList.forEach(file -> {
if (shouldCompareWithFile(file, recordKey)) {
toReturn.add(Pair.of(partition, file.getFileId()));
}
}
});
});
return toReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.common.util.collection.Pair;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -41,14 +43,14 @@ class ListBasedIndexFileFilter implements IndexFileFilter {
}

@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
List<BloomIndexFileInfo> indexInfos = partitionToFileIndexInfo.get(partitionPath);
Set<String> toReturn = new HashSet<>();
Set<Pair<String, String>> toReturn = new HashSet<>();
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
toReturn.add(indexInfo.getFileId());
toReturn.add(Pair.of(partitionPath, indexInfo.getFileId()));
}
}
}
Expand Down
Loading