-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the scalability of the join between the LHS and GroupBys by breaking up the join #621
base: main
Are you sure you want to change the base?
Conversation
@@ -67,6 +67,7 @@ class Join(joinConf: api.Join, | |||
extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) { | |||
|
|||
private val bootstrapTable = joinConf.metaData.bootstrapTable | |||
private val joinsAtATime = 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this consume a spark conf param - via tableUtils?
@@ -324,6 +324,9 @@ case class TableUtils(sparkSession: SparkSession) { | |||
df | |||
} | |||
|
|||
def addJoinBreak(dataFrame: DataFrame): DataFrame = | |||
dataFrame.cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableUtils has a cache_level param and a wrap with cache method that does exception handling to release the resources claimed by the cache. I think we should use that here.
case (partialDf, (rightPart, rightDf)) => joinWithLeft(partialDf, rightDf, rightPart) | ||
case (partialDf, ((rightPart, rightDf), i)) => | ||
val next = joinWithLeft(partialDf, rightDf, rightPart) | ||
if (((i + 1) % joinsAtATime) == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have 24 parts - there will be 3 cache points - at 8, 16, 24
16 should evict the 8 cache. 24 shouldn't cache since it is the last one.
Does the PR mean we will break up the batch request into mini batch request and fetch them parallel? @nikhilsimha |
This basically only applies to spark offline jobs Yang. |
I added some details to the PR description. And sorry, the PR is still a WIP. I'm working on getting the CI setup to work. |
Summary
Improve the scalability of the join between the LHS and GroupBys by breaking up the join. Previously, when joining together a large number of GroupBys, the Spark job could get stuck.
Why / Goal
Prevent the Spark job from getting stuck when joining the LHS with a large number of GroupBys.
Test Plan
Checklist
N.A.
Reviewers