diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java index 894b41b51c6bf..7316043ebd965 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java @@ -122,13 +122,15 @@ private JavaPairRDD lookupIndex( // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, // that contains it. + JavaRDD> fileComparisonsRDD = + explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD); Map comparisonsPerFileGroup = - computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); + computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD); int inputParallelism = partitionRecordKeyPairRDD.partitions().size(); int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" + config.getBloomIndexParallelism() + "}"); - return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, + return findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup); } @@ -137,14 +139,12 @@ private JavaPairRDD lookupIndex( */ private Map computeComparisonsPerFileGroup(final Map recordsPerPartition, final Map> partitionToFileInfo, - JavaPairRDD partitionRecordKeyPairRDD) { - + final JavaRDD> fileComparisonsRDD) { Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? - fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD) - .mapToPair(t -> t).countByKey(); + fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); partitionToFileInfo.forEach((key, value) -> { @@ -252,11 +252,10 @@ JavaRDD> explodeRecordRDDWithFileComparisons( * Make sure the parallelism is atleast the groupby parallelism for tagging location */ JavaPairRDD findMatchingFilesForRecordKeys( - final Map> partitionToFileIndexInfo, - JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable, + JavaRDD> fileComparisonsRDD, + int shuffleParallelism, + HoodieTable hoodieTable, Map fileGroupToComparisons) { - JavaRDD> fileComparisonsRDD = - explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); if (config.useBloomIndexBucketizedChecking()) { Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,