From 27dde93547c0afaf57005c931a587847751b4b3c Mon Sep 17 00:00:00 2001 From: shenh062326 Date: Sun, 7 Mar 2021 20:45:57 +0800 Subject: [PATCH] [HUDI-1673] Replace scala.Tule2 to Pair in FlinkHoodieBloomIndex --- .../index/bloom/FlinkHoodieBloomIndex.java | 36 ++++++++--------- .../HoodieFlinkBloomIndexCheckFunction.java | 22 +++++----- .../bloom/TestFlinkHoodieBloomIndex.java | 40 +++++++++---------- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java index 6a3edc7c38bea..255a66b492d34 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java @@ -44,8 +44,6 @@ import java.util.List; import java.util.Map; -import scala.Tuple2; - import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; @@ -106,14 +104,14 @@ private Map lookupIndex( List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs - List> fileInfoList = + List> fileInfoList = loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable); final Map> partitionToFileInfo = - fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); + fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList()))); // Step 3: Obtain a List, for each incoming record, that already exists, with the file id, // that contains it. - List> fileComparisons = + List> fileComparisons = explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap); return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable); } @@ -122,7 +120,7 @@ private Map lookupIndex( * Load all involved files as pair List. */ //TODO duplicate code with spark, we can optimize this method later - List> loadInvolvedFiles(List partitions, final HoodieEngineContext context, + List> loadInvolvedFiles(List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { // Obtain the latest data files from all the partitions. List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() @@ -136,15 +134,15 @@ List> loadInvolvedFiles(List partitio try { HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); - return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); + return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); } catch (MetadataNotFoundException me) { LOG.warn("Unable to find range metadata in file :" + pf); - return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); + return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); } }, Math.max(partitionPathFileIDList.size(), 1)); } else { return partitionPathFileIDList.stream() - .map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); + .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); } } @@ -186,19 +184,19 @@ public boolean isImplicitWithStorage() { * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on * recordKey ranges in the index info. */ - List> explodeRecordsWithFileComparisons( + List> explodeRecordsWithFileComparisons( final Map> partitionToFileIndexInfo, Map> partitionRecordKeyMap) { IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) : new ListBasedIndexFileFilter(partitionToFileIndexInfo); - List> fileRecordPairs = new ArrayList<>(); + List> fileRecordPairs = new ArrayList<>(); partitionRecordKeyMap.keySet().forEach(partitionPath -> { List hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath); hoodieRecordKeys.forEach(hoodieRecordKey -> { indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> { - fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(), + fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(), new HoodieKey(hoodieRecordKey, partitionPath))); }); }); @@ -210,10 +208,10 @@ List> explodeRecordsWithFileComparisons( * Find out pair. */ Map findMatchingFilesForRecordKeys( - List> fileComparisons, + List> fileComparisons, HoodieTable hoodieTable) { - fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1._1.compareTo(o2._1)).collect(toList()); + fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList()); List keyLookupResults = new ArrayList<>(); @@ -244,17 +242,17 @@ protected List> tagLocationBacktoRecords( records.forEach(r -> keyRecordPairMap.put(r.getKey(), r)); // Here as the record might have more data than rowKey (some rowKeys' fileId is null), // so we do left outer join. - List, HoodieRecordLocation>> newList = new ArrayList<>(); + List, HoodieRecordLocation>> newList = new ArrayList<>(); keyRecordPairMap.keySet().forEach(k -> { if (keyFilenamePair.containsKey(k)) { - newList.add(new Tuple2(keyRecordPairMap.get(k), keyFilenamePair.get(k))); + newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k))); } else { - newList.add(new Tuple2(keyRecordPairMap.get(k), null)); + newList.add(Pair.of(keyRecordPairMap.get(k), null)); } }); List> res = Lists.newArrayList(); - for (Tuple2, HoodieRecordLocation> v : newList) { - res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2))); + for (Pair, HoodieRecordLocation> v : newList) { + res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight()))); } return res; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java index 33ec9e65d0b73..a147c145d4930 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java @@ -33,14 +33,12 @@ import java.util.Iterator; import java.util.List; -import scala.Tuple2; - /** * Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files. */ //TODO we can move this class into the hudi-client-common and reuse it for spark client public class HoodieFlinkBloomIndexCheckFunction - implements Function>, Iterator>> { + implements Function>, Iterator>> { private final HoodieTable hoodieTable; @@ -52,25 +50,25 @@ public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteCo } @Override - public Iterator> apply(Iterator> fileParitionRecordKeyTripletItr) { + public Iterator> apply(Iterator> fileParitionRecordKeyTripletItr) { return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr); } @Override - public Function>> compose(Function>> before) { + public Function>> compose(Function>> before) { return null; } @Override - public Function>, V> andThen(Function>, ? extends V> after) { + public Function>, V> andThen(Function>, ? extends V> after) { return null; } - class LazyKeyCheckIterator extends LazyIterableIterator, List> { + class LazyKeyCheckIterator extends LazyIterableIterator, List> { private HoodieKeyLookupHandle keyLookupHandle; - LazyKeyCheckIterator(Iterator> filePartitionRecordKeyTripletItr) { + LazyKeyCheckIterator(Iterator> filePartitionRecordKeyTripletItr) { super(filePartitionRecordKeyTripletItr); } @@ -84,10 +82,10 @@ protected List computeNext() { try { // process one file in each go. while (inputItr.hasNext()) { - Tuple2 currentTuple = inputItr.next(); - String fileId = currentTuple._1; - String partitionPath = currentTuple._2.getPartitionPath(); - String recordKey = currentTuple._2.getRecordKey(); + Pair currentTuple = inputItr.next(); + String fileId = currentTuple.getLeft(); + String partitionPath = currentTuple.getRight().getPartitionPath(); + String recordKey = currentTuple.getRight().getRecordKey(); Pair partitionPathFilePair = Pair.of(partitionPath, fileId); // lazily init state diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index 0dc6997794764..e485c00682d17 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -50,8 +50,6 @@ import java.util.Map; import java.util.stream.Stream; -import scala.Tuple2; - import static java.util.Arrays.asList; import static java.util.UUID.randomUUID; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; @@ -130,7 +128,7 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); List partitions = asList("2016/01/21", "2016/04/01", "2015/03/12"); - List> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable); + List> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable); // Still 0, as no valid commit assertEquals(0, filesList.size()); @@ -145,20 +143,20 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b if (rangePruning) { // these files will not have the key ranges - assertNull(filesList.get(0)._2().getMaxRecordKey()); - assertNull(filesList.get(0)._2().getMinRecordKey()); - assertFalse(filesList.get(1)._2().hasKeyRanges()); - assertNotNull(filesList.get(2)._2().getMaxRecordKey()); - assertNotNull(filesList.get(2)._2().getMinRecordKey()); - assertTrue(filesList.get(3)._2().hasKeyRanges()); + assertNull(filesList.get(0).getRight().getMaxRecordKey()); + assertNull(filesList.get(0).getRight().getMinRecordKey()); + assertFalse(filesList.get(1).getRight().hasKeyRanges()); + assertNotNull(filesList.get(2).getRight().getMaxRecordKey()); + assertNotNull(filesList.get(2).getRight().getMinRecordKey()); + assertTrue(filesList.get(3).getRight().hasKeyRanges()); // no longer sorted, but should have same files. - List> expected = - asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); + List> expected = + asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2")), + Pair.of("2015/03/12", new BloomIndexFileInfo("1")), + Pair.of("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), + Pair.of("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); assertEquals(expected, filesList); } } @@ -176,20 +174,20 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea new BloomIndexFileInfo("f5", "009", "010"))); Map> partitionRecordKeyMap = new HashMap<>(); - asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), - new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004")) + asList(Pair.of("2017/10/22", "003"), Pair.of("2017/10/22", "002"), + Pair.of("2017/10/22", "005"), Pair.of("2017/10/22", "004")) .forEach(t -> { - List recordKeyList = partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>()); - recordKeyList.add(t._2); - partitionRecordKeyMap.put(t._1, recordKeyList); + List recordKeyList = partitionRecordKeyMap.getOrDefault(t.getLeft(), new ArrayList<>()); + recordKeyList.add(t.getRight()); + partitionRecordKeyMap.put(t.getLeft(), recordKeyList); }); - List> comparisonKeyList = + List> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyMap); assertEquals(10, comparisonKeyList.size()); java.util.Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(java.util.stream.Collectors.groupingBy(t -> t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1, java.util.stream.Collectors.toList()))); + .collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));