-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41246][core] Solve the problem of RddId negative #38781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
What problem does it cause? there is no detail here or in the JIRA |
| return id | ||
| } | ||
| this.synchronized { | ||
| id = nextRddId.getAndIncrement() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid the duplicate call of 'nextRddId.getAndIncrement()'?
| } | ||
| this.synchronized { | ||
| id = nextRddId.getAndIncrement() | ||
| if (id < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this if (id < 0) condition, else part of the previous if (id >= 0) can be used
| id = nextRddId.getAndIncrement() | ||
| if (id < 0) { | ||
| nextRddId = new AtomicInteger(0) | ||
| id = nextRddId.getAndIncrement() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try this ?
nextRddId = new AtomicInteger(1)
id = nextRddId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review. I updated the code.
| private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() | ||
|
|
||
| private val nextRddId = new AtomicInteger(0) | ||
| private var nextRddId = new AtomicInteger(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when the RDD ID overflows within a SparkContext? Are there tests that cover these cases?
Curious if it would be better to switch to an AtomicLong and just modulo max int?
| private var nextRddId = new AtomicInteger(0) | |
| private[spark] def newRddId(): Int = (nextRddId.getAndIncrement() % Integer.MAX_VALUE).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when the RDD ID overflows within a SparkContext?
When BlockManager generates BlockId, BlockId only supports positive rddid, so BlockId generation fails.
switch to an AtomicLong ?
The scope of influence is very broad, which may require extensive discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You dont need to make this a var - see below.
|
Agree with @srowen , what is the issue here ?
Additionally, if RDD id is wrapping around, it can cause rdd id conflicts - we have to think through the impact of that as well. |
|
update detail n the JIRA When BlockManager generates BlockId, BlockId only supports positive rddid, so BlockId generation fails object BlockId { |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the details, I can see how this can happen for very long running and high frequency streaming applications.
Unfortunately, this also means we can end up with multiple RDD's with the same id - which will require some thinking.
I think moving to long might be a better idea - though that would be fairly disruptive.
+CC @Ngone51, @tgravescs, @cloud-fan
| private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() | ||
|
|
||
| private val nextRddId = new AtomicInteger(0) | ||
| private var nextRddId = new AtomicInteger(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You dont need to make this a var - see below.
| private[spark] def newRddId(): Int = { | ||
| var id = nextRddId.getAndIncrement() | ||
| if (id >= 0) { | ||
| return id | ||
| } | ||
| this.synchronized { | ||
| nextRddId = new AtomicInteger(0) | ||
| id = nextRddId.getAndIncrement() | ||
| } | ||
| id | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this would remove the need for synchronization, etc
| private[spark] def newRddId(): Int = { | |
| var id = nextRddId.getAndIncrement() | |
| if (id >= 0) { | |
| return id | |
| } | |
| this.synchronized { | |
| nextRddId = new AtomicInteger(0) | |
| id = nextRddId.getAndIncrement() | |
| } | |
| id | |
| } | |
| private[spark] def newRddId(): Int = { | |
| nextRddId.getAndUpdate { i => | |
| var nextValue = i + 1 | |
| if (nextValue < 0) { | |
| nextValue = 0 | |
| } | |
| nextValue | |
| } | |
| } |
|
Please update the PR details with description @yorksity |
|
is the problem caused by integer overflow? |
@cloud-fan According to the JIRA description, it is. |
|
@yorksity Did you see this issue in the real world? If so, do you have any details about the job? Like batch or streaming? How long it takes to hit this issue? etc. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Why are the changes needed?
solve the problem occurs in long running tasks, such as stream tasks
Does this PR introduce any user-facing change?
How was this patch tested?