Make HttpRemoteTaskWithEventLoop::addSplit() update pending split stats immediately#26126
Conversation
Reviewer's GuideThis PR refactors HttpRemoteTaskWithEventLoop to use atomic counters for pending split stats, updates those stats synchronously in addSplits(), and streamlines split queue space threshold management to prevent skewed split distribution. File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java:558-567` </location>
<code_context>
return;
}
+ int count = 0;
+ long weight = 0;
+ for (Entry<PlanNodeId, Collection<Split>> entry : splitsBySource.asMap().entrySet()) {
+ PlanNodeId sourceId = entry.getKey();
+ Collection<Split> splits = entry.getValue();
+
+ if (tableScanPlanNodeIds.contains(sourceId)) {
+ count += splits.size();
+ weight += splits.stream().map(Split::getSplitWeight)
+ .mapToLong(SplitWeight::getRawValue)
+ .sum();
+ }
+ }
+ if (count != 0) {
+ pendingSourceSplitCount.addAndGet(count);
+ pendingSourceSplitsWeight.addAndGet(weight);
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential for double-counting splits if addSplits is called concurrently.
Because pendingSplits is not thread-safe, concurrent calls to addSplits may cause race conditions. Please either document that addSplits must be single-threaded or switch to a thread-safe collection for pendingSplits.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| int count = 0; | ||
| long weight = 0; | ||
| for (Entry<PlanNodeId, Collection<Split>> entry : splitsBySource.asMap().entrySet()) { | ||
| PlanNodeId sourceId = entry.getKey(); | ||
| Collection<Split> splits = entry.getValue(); | ||
|
|
||
| if (tableScanPlanNodeIds.contains(sourceId)) { | ||
| count += splits.size(); | ||
| weight += splits.stream().map(Split::getSplitWeight) | ||
| .mapToLong(SplitWeight::getRawValue) |
There was a problem hiding this comment.
issue (bug_risk): Potential for double-counting splits if addSplits is called concurrently.
Because pendingSplits is not thread-safe, concurrent calls to addSplits may cause race conditions. Please either document that addSplits must be single-threaded or switch to a thread-safe collection for pendingSplits.
|
Thanks for the investigation @spershin. Does this relate in any way to the recent work on task based scheduling for Prestissimo clusters? |
Description
Introduction of HttpRemoteTaskWithEventLoop to solve lock contention had an unexpected effect: split statistic in the class which is being used by the split scheduling code would not update immediately.
This could lead in the split scheduling code adding an overwhelming number of splits to a single worker. Cases of 1200 splits sent in a single message have been detected when a usual packet would contain 1-5 splits.
This is being fixed by updating the critical statistics synchronously while leaving the rest to be run asynchronously.
Test Plan
This has been tested in an intense production shadow environment, where previously we had very bad skews of split distribution.