Skip to content

Conversation

@EdwinGuo
Copy link
Contributor

@EdwinGuo EdwinGuo commented Jun 9, 2020

… twice in lookUpIndex

Tips

What is the purpose of the pull request

Cache the explodeRecordRDDWithFileComparisons instead of commuting it twice in lookUpIndex
(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-commenter
Copy link

codecov-commenter commented Jun 9, 2020

Codecov Report

Merging #1721 into master will decrease coverage by 0.00%.
The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1721      +/-   ##
============================================
- Coverage     18.16%   18.15%   -0.01%     
  Complexity      860      860              
============================================
  Files           352      352              
  Lines         15411    15410       -1     
  Branches       1525     1524       -1     
============================================
- Hits           2799     2798       -1     
  Misses        12254    12254              
  Partials        358      358              
Impacted Files Coverage Δ Complexity Δ
.../org/apache/hudi/index/bloom/HoodieBloomIndex.java 58.16% <100.00%> (-0.43%) 16.00 <0.00> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 37838ce...ef08b1e. Read the comment docs.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one. If you don't mind, can you run a sample job(with 1 M records or something) and show the spark UI stages screen shot to see the difference with and w/o this optimization.

@EdwinGuo
Copy link
Contributor Author

Good one. If you don't mind, can you run a sample job(with 1 M records or something) and show the spark UI stages screen shot to see the difference with and w/o this optimization.

Will do. Thanks for reviewing.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a bit more thought.. the exploded RDD can be large and caching may incurs more overhead than recomputing in cases..

@vinothchandar
Copy link
Member

Can you please include the jira number in the pr title

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EdwinGuo @nsivabalan let's hash this out.. its an interesting one.. Although it may seem like we are computing the fully exploded RDD in both places.. if you look closely, we do

fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD)
          .mapToPair(t -> t).countByKey();

countByKey() does not shuffle actual data, but just the counts per file.. We only pay the compute cost of exploding twice.. And all this to estimate the parallelism. given this is jsut an estimate, would it be better to introduce an option to simply down sample and estimate, rather than adding caching?

eg.

fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD.sample(true, 0.1))
          .mapToPair(t -> t).countByKey();

would cut the cost down by 90% .. we need to adjust the computations in the map accordingly ofc..
Even spark sort does some kind of reservoir sampling.. So this could be a valid approach overall.

What do you both think? I am bit concerned about caching this exploded RDD (that's why I chose to recompute to begin with)

@vinothchandar vinothchandar changed the title Cache the explodeRecordRDDWithFileComparisons instead of commuting it… [WIP] Cache the explodeRecordRDDWithFileComparisons instead of commuting it… Jun 24, 2020
@vinothchandar vinothchandar added the status:in-progress Work in progress label Jun 24, 2020
@EdwinGuo EdwinGuo changed the title [WIP] Cache the explodeRecordRDDWithFileComparisons instead of commuting it… [WIP] [HUDI-1041] Cache the explodeRecordRDDWithFileComparisons instead of commuting it… Jun 24, 2020
@EdwinGuo
Copy link
Contributor Author

Can you please include the jira number in the pr title

Done. https://issues.apache.org/jira/browse/HUDI-1041

@EdwinGuo
Copy link
Contributor Author

@EdwinGuo @nsivabalan let's hash this out.. its an interesting one.. Although it may seem like we are computing the fully exploded RDD in both places.. if you look closely, we do

fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD)
          .mapToPair(t -> t).countByKey();

countByKey() does not shuffle actual data, but just the counts per file.. We only pay the compute cost of exploding twice.. And all this to estimate the parallelism. given this is jsut an estimate, would it be better to introduce an option to simply down sample and estimate, rather than adding caching?

eg.

fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD.sample(true, 0.1))
          .mapToPair(t -> t).countByKey();

would cut the cost down by 90% .. we need to adjust the computations in the map accordingly ofc..
Even spark sort does some kind of reservoir sampling.. So this could be a valid approach overall.

What do you both think? I am bit concerned about caching this exploded RDD (that's why I chose to recompute to begin with)

I can provide some performance comparison tomorrow. fileComparisonsRDD is being compute in a different patterns within findMatchingFilesForRecordKeys and computeComparisonsPerFileGroup, so yes, countByKey is light in shuffle but could be heavy in IO for some of the cases. I agree StorageLevel.MEMORY_AND_DISK_SER() could be heavy than recompute in some of the scenario, so let me conduct some performance testing and get back to you.

Regarding sampling, what if some of the partitions are skewed? Will that cause more overhead than flush the file out?

@vinothchandar
Copy link
Member

Regarding sampling, what if some of the partitions are skewed? Will that cause more overhead than flush the file out?

IIRC the partitionRecordKeyPairRDD would have even distribution of keys from the precombine step which just does a reduceByKey. We can always support a config to increase the sampling rate, right? All depends on how much difference there is in the computed parallelism with samplingRate=0.1 and 1.0?

@EdwinGuo
Copy link
Contributor Author

EdwinGuo commented Jul 1, 2020

Regarding sampling, what if some of the partitions are skewed? Will that cause more overhead than flush the file out?

IIRC the partitionRecordKeyPairRDD would have even distribution of keys from the precombine step which just does a reduceByKey. We can always support a config to increase the sampling rate, right? All depends on how much difference there is in the computed parallelism with samplingRate=0.1 and 1.0?

Ok, let me work on a sampling rate and get back with the performance result. Thanks

@vinothchandar
Copy link
Member

Closing due to inactivity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

status:in-progress Work in progress

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants