Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(

// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
// that contains it.
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you actually see from the Spark UI that its not computed twice? I ask because, fileComparisonsRDD is not cached and thus even though this is declared only once, during runtime, Spark will lazily recompute fileComparisonsRDD once for each method that uses it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check the Spark UI yet, just a simple analyze the process of data writing. For each batch of records to write, the SparkHoodieBloomIndex.lookupIndex was expected to be invoked once so the fileComparisonsRDD should only be evaluated only once, is there other invocation for SparkHoodieBloomIndex.lookupIndex ? Maybe i missed something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Spark does lazy evaluation of an RDD. If the RDD is not persisted to disk/cache, it simply recomputes it. In this case, fileComparisonsRDD would be recomputed twice during runtime. the method explodeRecordRDDWithFileComparisons() will only be called once, but it does nothing in practice except "define" the RDD it returns.

In contrast, if you notice this line of code at the start of tagLocation

// Step 0: cache the input record RDD
    if (config.getBloomIndexUseCaching()) {
      recordRDD.persist(SparkMemoryUtils.getBloomIndexInputStorageLevel(config.getProps()));
    }

This caches the incoming recordRDD and thus however many times this RDD is used in the indexing DAG, it will not go to the previous stage. if we did not have the .persist() in here, then everytime an RDD derived off this recordRDD is needed for a Spark action, it will keep reading from source and compute the recordRDD again.

Apologies, if you knew all this already. :) and I am failing to see how this wont happen. but at-least you get my concern with this explanation.

Copy link
Contributor Author

@danny0405 danny0405 Dec 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, the recordRDD may be persisted, but the computation in explodeRecordRDDWithFileComparisons still need to do 2 times, right ? Sorry, i'm not that familiar with the RDD thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. That's what i think will happen

explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD);
Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD);
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}");
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
return findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable,
comparisonsPerFileGroup);
}

Expand All @@ -137,14 +139,12 @@ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
*/
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {

final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD) {
Map<String, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine comparisons
// FIX(vc): Only do sampling here and extrapolate?
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD)
.mapToPair(t -> t).countByKey();
fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
} else {
fileToComparisons = new HashMap<>();
partitionToFileInfo.forEach((key, value) -> {
Expand Down Expand Up @@ -252,11 +252,10 @@ JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
* Make sure the parallelism is atleast the groupby parallelism for tagging location
*/
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
int shuffleParallelism,
HoodieTable hoodieTable,
Map<String, Long> fileGroupToComparisons) {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);

if (config.useBloomIndexBucketizedChecking()) {
Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
Expand Down