diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java index 9ef85adcae21a..c66d4e9ec8dbc 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java @@ -88,12 +88,13 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import static com.facebook.airlift.http.client.HttpStatus.NO_CONTENT; @@ -183,16 +184,15 @@ public final class HttpRemoteTaskWithEventLoop private long currentRequestLastTaskUpdate; private final SetMultimap pendingSplits = HashMultimap.create(); - private volatile int pendingSourceSplitCount; - private volatile long pendingSourceSplitsWeight; + private final AtomicInteger pendingSourceSplitCount = new AtomicInteger(); + private final AtomicLong pendingSourceSplitsWeight = new AtomicLong(); private final SetMultimap pendingNoMoreSplitsForLifespan = HashMultimap.create(); // The keys of this map represent all plan nodes that have "no more splits". // The boolean value of each entry represents whether the "no more splits" notification is pending delivery to workers. private final Map noMoreSplits = new HashMap<>(); private OutputBuffers outputBuffers; private final FutureStateChange whenSplitQueueHasSpace = new FutureStateChange<>(); - private volatile boolean splitQueueHasSpace; - private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty(); + private volatile long whenSplitQueueWeightThreshold = Long.MAX_VALUE; private final boolean summarizeTaskInfo; @@ -431,8 +431,8 @@ private HttpRemoteTaskWithEventLoop(Session session, pendingSourceSplitsWeight = addExact(pendingSourceSplitsWeight, SplitWeight.rawValueSum(tableScanSplits, Split::getSplitWeight)); } } - this.pendingSourceSplitCount = pendingSourceSplitCount; - this.pendingSourceSplitsWeight = pendingSourceSplitsWeight; + this.pendingSourceSplitCount.set(pendingSourceSplitCount); + this.pendingSourceSplitsWeight.set(pendingSourceSplitsWeight); List bufferStates = outputBuffers.getBuffers() .keySet().stream() @@ -555,32 +555,37 @@ public void addSplits(Multimap splitsBySource) return; } + int count = 0; + long weight = 0; + for (Entry> entry : splitsBySource.asMap().entrySet()) { + PlanNodeId sourceId = entry.getKey(); + Collection splits = entry.getValue(); + + if (tableScanPlanNodeIds.contains(sourceId)) { + count += splits.size(); + weight += splits.stream().map(Split::getSplitWeight) + .mapToLong(SplitWeight::getRawValue) + .sum(); + } + } + if (count != 0) { + pendingSourceSplitCount.addAndGet(count); + pendingSourceSplitsWeight.addAndGet(weight); + updateTaskStats(); + } + safeExecuteOnEventLoop(() -> { boolean updateNeeded = false; for (Entry> entry : splitsBySource.asMap().entrySet()) { PlanNodeId sourceId = entry.getKey(); Collection splits = entry.getValue(); - boolean isTableScanSource = tableScanPlanNodeIds.contains(sourceId); checkState(!noMoreSplits.containsKey(sourceId), "noMoreSplits has already been set for %s", sourceId); - int added = 0; - long addedWeight = 0; for (Split split : splits) { - if (pendingSplits.put(sourceId, new ScheduledSplit(nextSplitId++, sourceId, split))) { - if (isTableScanSource) { - added++; - addedWeight = addExact(addedWeight, split.getSplitWeight().getRawValue()); - } - } - } - if (isTableScanSource) { - pendingSourceSplitCount += added; - pendingSourceSplitsWeight = addExact(pendingSourceSplitsWeight, addedWeight); - updateTaskStats(); + pendingSplits.put(sourceId, new ScheduledSplit(nextSplitId++, sourceId, split)); } updateNeeded = true; } - updateSplitQueueSpace(); if (updateNeeded) { needsUpdate = true; @@ -722,9 +727,7 @@ public PartitionedSplitsInfo getPartitionedSplitsInfo() @SuppressWarnings("FieldAccessNotGuarded") public PartitionedSplitsInfo getUnacknowledgedPartitionedSplitsInfo() { - int count = pendingSourceSplitCount; - long weight = pendingSourceSplitsWeight; - return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight); + return PartitionedSplitsInfo.forSplitCountAndWeightSum(pendingSourceSplitCount.get(), pendingSourceSplitsWeight.get()); } @Override @@ -749,7 +752,7 @@ public int getUnacknowledgedPartitionedSplitCount() @SuppressWarnings("FieldAccessNotGuarded") private int getPendingSourceSplitCount() { - return pendingSourceSplitCount; + return pendingSourceSplitCount.get(); } private long getQueuedPartitionedSplitsWeight() @@ -764,7 +767,7 @@ private long getQueuedPartitionedSplitsWeight() @SuppressWarnings("FieldAccessNotGuarded") private long getPendingSourceSplitsWeight() { - return pendingSourceSplitsWeight; + return pendingSourceSplitsWeight.get(); } @Override @@ -782,35 +785,45 @@ public void addFinalTaskInfoListener(StateChangeListener stateChangeLi @Override public ListenableFuture whenSplitQueueHasSpace(long weightThreshold) { - if (splitQueueHasSpace) { + setSplitQueueWeightThreshold(weightThreshold); + + if (splitQueueHasSpace()) { return immediateFuture(null); } SettableFuture future = SettableFuture.create(); safeExecuteOnEventLoop(() -> { - if (whenSplitQueueHasSpaceThreshold.isPresent()) { - checkArgument(weightThreshold == whenSplitQueueHasSpaceThreshold.getAsLong(), "Multiple split queue space notification thresholds not supported"); + if (splitQueueHasSpace()) { + future.set(null); } else { - whenSplitQueueHasSpaceThreshold = OptionalLong.of(weightThreshold); - updateSplitQueueSpace(); + whenSplitQueueHasSpace.createNewListener().addListener(() -> future.set(null), taskEventLoop); } - if (splitQueueHasSpace) { - future.set(null); - } - whenSplitQueueHasSpace.createNewListener().addListener(() -> future.set(null), taskEventLoop); }, "whenSplitQueueHasSpace"); return future; } + private void setSplitQueueWeightThreshold(long weightThreshold) + { + long currentValue = whenSplitQueueWeightThreshold; + if (currentValue != Long.MAX_VALUE) { + checkArgument(weightThreshold == currentValue, "Multiple split queue space notification thresholds not supported"); + } + else { + whenSplitQueueWeightThreshold = weightThreshold; + } + } + + private boolean splitQueueHasSpace() + { + return getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits && + getQueuedPartitionedSplitsWeight() < whenSplitQueueWeightThreshold; + } + private void updateSplitQueueSpace() { verify(taskEventLoop.inEventLoop()); - - // Must check whether the unacknowledged split count threshold is reached even without listeners registered yet - splitQueueHasSpace = getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits && - (!whenSplitQueueHasSpaceThreshold.isPresent() || getQueuedPartitionedSplitsWeight() < whenSplitQueueHasSpaceThreshold.getAsLong()); // Only trigger notifications if a listener might be registered - if (splitQueueHasSpace && whenSplitQueueHasSpaceThreshold.isPresent()) { + if (splitQueueHasSpace()) { whenSplitQueueHasSpace.complete(null, taskEventLoop); } } @@ -838,12 +851,13 @@ private void processTaskUpdate(TaskInfo newValue, List sources) //Once it is converted to thrift we can use the isThrift enabled flag here. updateTaskInfo(newValue); + int removed = 0; + long removedWeight = 0; + // remove acknowledged splits, which frees memory for (TaskSource source : sources) { PlanNodeId planNodeId = source.getPlanNodeId(); boolean isTableScanSource = tableScanPlanNodeIds.contains(planNodeId); - int removed = 0; - long removedWeight = 0; for (ScheduledSplit split : source.getSplits()) { if (pendingSplits.remove(planNodeId, split)) { if (isTableScanSource) { @@ -858,14 +872,14 @@ private void processTaskUpdate(TaskInfo newValue, List sources) for (Lifespan lifespan : source.getNoMoreSplitsForLifespan()) { pendingNoMoreSplitsForLifespan.remove(planNodeId, lifespan); } - if (isTableScanSource) { - pendingSourceSplitCount -= removed; - pendingSourceSplitsWeight -= removedWeight; - } } // Update stats before split queue space to ensure node stats are up to date before waking up the scheduler - updateTaskStats(); - updateSplitQueueSpace(); + if (removed != 0) { + pendingSourceSplitCount.addAndGet(-removed); + pendingSourceSplitsWeight.addAndGet(-removedWeight); + updateTaskStats(); + updateSplitQueueSpace(); + } } private void onSuccessTaskInfo(TaskInfo result) @@ -1115,10 +1129,9 @@ private void cleanUpTask() // clear pending splits to free memory pendingSplits.clear(); - pendingSourceSplitCount = 0; - pendingSourceSplitsWeight = 0; + pendingSourceSplitCount.set(0); + pendingSourceSplitsWeight.set(0); updateTaskStats(); - splitQueueHasSpace = true; whenSplitQueueHasSpace.complete(null, taskEventLoop); // cancel pending request