Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import java.util.List;
import java.util.Map;

import scala.Tuple2;

import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -106,14 +104,14 @@ private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

// Step 2: Load all involved files as <Partition, filename> pairs
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));

// Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
// that contains it.
List<Tuple2<String, HoodieKey>> fileComparisons =
List<Pair<String, HoodieKey>> fileComparisons =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
}
Expand All @@ -122,7 +120,7 @@ private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
* Load all involved files as <Partition, filename> pair List.
*/
//TODO duplicate code with spark, we can optimize this method later
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
Expand All @@ -136,15 +134,15 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
.map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
}

Expand Down Expand Up @@ -186,19 +184,19 @@ public boolean isImplicitWithStorage() {
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
List<Tuple2<String, HoodieKey>> explodeRecordsWithFileComparisons(
List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
Map<String, List<String>> partitionRecordKeyMap) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);

List<Tuple2<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
partitionRecordKeyMap.keySet().forEach(partitionPath -> {
List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
hoodieRecordKeys.forEach(hoodieRecordKey -> {
indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(),
fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
new HoodieKey(hoodieRecordKey, partitionPath)));
});
});
Expand All @@ -210,10 +208,10 @@ List<Tuple2<String, HoodieKey>> explodeRecordsWithFileComparisons(
* Find out <RowKey, filename> pair.
*/
Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
List<Tuple2<String, HoodieKey>> fileComparisons,
List<Pair<String, HoodieKey>> fileComparisons,
HoodieTable hoodieTable) {

fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1._1.compareTo(o2._1)).collect(toList());
fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList());

List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();

Expand Down Expand Up @@ -244,17 +242,17 @@ protected List<HoodieRecord<T>> tagLocationBacktoRecords(
records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
// Here as the record might have more data than rowKey (some rowKeys' fileId is null),
// so we do left outer join.
List<Tuple2<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
keyRecordPairMap.keySet().forEach(k -> {
if (keyFilenamePair.containsKey(k)) {
newList.add(new Tuple2(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
} else {
newList.add(new Tuple2(keyRecordPairMap.get(k), null));
newList.add(Pair.of(keyRecordPairMap.get(k), null));
}
});
List<HoodieRecord<T>> res = Lists.newArrayList();
for (Tuple2<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2)));
for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight())));
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@
import java.util.Iterator;
import java.util.List;

import scala.Tuple2;

/**
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
*/
//TODO we can move this class into the hudi-client-common and reuse it for spark client
public class HoodieFlinkBloomIndexCheckFunction
implements Function<Iterator<Tuple2<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {

private final HoodieTable hoodieTable;

Expand All @@ -52,25 +50,25 @@ public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteCo
}

@Override
public Iterator<List<KeyLookupResult>> apply(Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
}

@Override
public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? super V, ? extends Iterator<Tuple2<String, HoodieKey>>> before) {
public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? super V, ? extends Iterator<Pair<String, HoodieKey>>> before) {
return null;
}

@Override
public <V> Function<Iterator<Tuple2<String, HoodieKey>>, V> andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
public <V> Function<Iterator<Pair<String, HoodieKey>>, V> andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
return null;
}

class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {

private HoodieKeyLookupHandle keyLookupHandle;

LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
}

Expand All @@ -84,10 +82,10 @@ protected List<KeyLookupResult> computeNext() {
try {
// process one file in each go.
while (inputItr.hasNext()) {
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
String fileId = currentTuple._1;
String partitionPath = currentTuple._2.getPartitionPath();
String recordKey = currentTuple._2.getRecordKey();
Pair<String, HoodieKey> currentTuple = inputItr.next();
String fileId = currentTuple.getLeft();
String partitionPath = currentTuple.getRight().getPartitionPath();
String recordKey = currentTuple.getRight().getRecordKey();
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

// lazily init state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import java.util.Map;
import java.util.stream.Stream;

import scala.Tuple2;

import static java.util.Arrays.asList;
import static java.util.UUID.randomUUID;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
Expand Down Expand Up @@ -130,7 +128,7 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);

List<String> partitions = asList("2016/01/21", "2016/04/01", "2015/03/12");
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable);
List<Pair<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable);
// Still 0, as no valid commit
assertEquals(0, filesList.size());

Expand All @@ -145,20 +143,20 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b

if (rangePruning) {
// these files will not have the key ranges
assertNull(filesList.get(0)._2().getMaxRecordKey());
assertNull(filesList.get(0)._2().getMinRecordKey());
assertFalse(filesList.get(1)._2().hasKeyRanges());
assertNotNull(filesList.get(2)._2().getMaxRecordKey());
assertNotNull(filesList.get(2)._2().getMinRecordKey());
assertTrue(filesList.get(3)._2().hasKeyRanges());
assertNull(filesList.get(0).getRight().getMaxRecordKey());
assertNull(filesList.get(0).getRight().getMinRecordKey());
assertFalse(filesList.get(1).getRight().hasKeyRanges());
assertNotNull(filesList.get(2).getRight().getMaxRecordKey());
assertNotNull(filesList.get(2).getRight().getMinRecordKey());
assertTrue(filesList.get(3).getRight().hasKeyRanges());

// no longer sorted, but should have same files.

List<Tuple2<String, BloomIndexFileInfo>> expected =
asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003")));
List<Pair<String, BloomIndexFileInfo>> expected =
asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2")),
Pair.of("2015/03/12", new BloomIndexFileInfo("1")),
Pair.of("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
Pair.of("2015/03/12", new BloomIndexFileInfo("4", "001", "003")));
assertEquals(expected, filesList);
}
}
Expand All @@ -176,20 +174,20 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea
new BloomIndexFileInfo("f5", "009", "010")));

Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"),
new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))
asList(Pair.of("2017/10/22", "003"), Pair.of("2017/10/22", "002"),
Pair.of("2017/10/22", "005"), Pair.of("2017/10/22", "004"))
.forEach(t -> {
List<String> recordKeyList = partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>());
recordKeyList.add(t._2);
partitionRecordKeyMap.put(t._1, recordKeyList);
List<String> recordKeyList = partitionRecordKeyMap.getOrDefault(t.getLeft(), new ArrayList<>());
recordKeyList.add(t.getRight());
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
});

List<scala.Tuple2<String, HoodieKey>> comparisonKeyList =
List<Pair<String, HoodieKey>> comparisonKeyList =
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyMap);

assertEquals(10, comparisonKeyList.size());
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
.collect(java.util.stream.Collectors.groupingBy(t -> t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1, java.util.stream.Collectors.toList())));
.collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList())));

assertEquals(4, recordKeyToFileComps.size());
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));
Expand Down