diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index ca960ef8825fb..feb2f024450d6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -151,13 +151,15 @@ private JavaPairRDD lookupIndex( // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, // that contains it. + JavaRDD> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD); + fileComparisonsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); Map comparisonsPerFileGroup = - computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); + computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD); int inputParallelism = partitionRecordKeyPairRDD.partitions().size(); int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" + config.getBloomIndexParallelism() + "}"); - return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, + return findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup); } @@ -166,14 +168,13 @@ private JavaPairRDD lookupIndex( */ private Map computeComparisonsPerFileGroup(final Map recordsPerPartition, final Map> partitionToFileInfo, - JavaPairRDD partitionRecordKeyPairRDD) { + JavaRDD> fileComparisonsRDD) { Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? - fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD) - .mapToPair(t -> t).countByKey(); + fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); partitionToFileInfo.forEach((key, value) -> { @@ -280,12 +281,8 @@ JavaRDD> explodeRecordRDDWithFileComparisons( * Make sure the parallelism is atleast the groupby parallelism for tagging location */ JavaPairRDD findMatchingFilesForRecordKeys( - final Map> partitionToFileIndexInfo, - JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable, + JavaRDD> fileComparisonsRDD, int shuffleParallelism, HoodieTable hoodieTable, Map fileGroupToComparisons) { - JavaRDD> fileComparisonsRDD = - explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); - if (config.useBloomIndexBucketizedChecking()) { Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons, config.getBloomIndexKeysPerBucket());