Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static com.facebook.airlift.http.client.HttpStatus.NO_CONTENT;
Expand Down Expand Up @@ -183,16 +184,15 @@ public final class HttpRemoteTaskWithEventLoop
private long currentRequestLastTaskUpdate;

private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = HashMultimap.create();
private volatile int pendingSourceSplitCount;
private volatile long pendingSourceSplitsWeight;
private final AtomicInteger pendingSourceSplitCount = new AtomicInteger();
private final AtomicLong pendingSourceSplitsWeight = new AtomicLong();
private final SetMultimap<PlanNodeId, Lifespan> pendingNoMoreSplitsForLifespan = HashMultimap.create();
// The keys of this map represent all plan nodes that have "no more splits".
// The boolean value of each entry represents whether the "no more splits" notification is pending delivery to workers.
private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<>();
private OutputBuffers outputBuffers;
private final FutureStateChange<?> whenSplitQueueHasSpace = new FutureStateChange<>();
private volatile boolean splitQueueHasSpace;
private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty();
private volatile long whenSplitQueueWeightThreshold = Long.MAX_VALUE;

private final boolean summarizeTaskInfo;

Expand Down Expand Up @@ -431,8 +431,8 @@ private HttpRemoteTaskWithEventLoop(Session session,
pendingSourceSplitsWeight = addExact(pendingSourceSplitsWeight, SplitWeight.rawValueSum(tableScanSplits, Split::getSplitWeight));
}
}
this.pendingSourceSplitCount = pendingSourceSplitCount;
this.pendingSourceSplitsWeight = pendingSourceSplitsWeight;
this.pendingSourceSplitCount.set(pendingSourceSplitCount);
this.pendingSourceSplitsWeight.set(pendingSourceSplitsWeight);

List<BufferInfo> bufferStates = outputBuffers.getBuffers()
.keySet().stream()
Expand Down Expand Up @@ -555,32 +555,37 @@ public void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
return;
}

int count = 0;
long weight = 0;
for (Entry<PlanNodeId, Collection<Split>> entry : splitsBySource.asMap().entrySet()) {
PlanNodeId sourceId = entry.getKey();
Collection<Split> splits = entry.getValue();

if (tableScanPlanNodeIds.contains(sourceId)) {
count += splits.size();
weight += splits.stream().map(Split::getSplitWeight)
.mapToLong(SplitWeight::getRawValue)
Comment on lines +558 to +567
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Potential for double-counting splits if addSplits is called concurrently.

Because pendingSplits is not thread-safe, concurrent calls to addSplits may cause race conditions. Please either document that addSplits must be single-threaded or switch to a thread-safe collection for pendingSplits.

.sum();
}
}
if (count != 0) {
pendingSourceSplitCount.addAndGet(count);
pendingSourceSplitsWeight.addAndGet(weight);
updateTaskStats();
}

safeExecuteOnEventLoop(() -> {
boolean updateNeeded = false;
for (Entry<PlanNodeId, Collection<Split>> entry : splitsBySource.asMap().entrySet()) {
PlanNodeId sourceId = entry.getKey();
Collection<Split> splits = entry.getValue();
boolean isTableScanSource = tableScanPlanNodeIds.contains(sourceId);

checkState(!noMoreSplits.containsKey(sourceId), "noMoreSplits has already been set for %s", sourceId);
int added = 0;
long addedWeight = 0;
for (Split split : splits) {
if (pendingSplits.put(sourceId, new ScheduledSplit(nextSplitId++, sourceId, split))) {
if (isTableScanSource) {
added++;
addedWeight = addExact(addedWeight, split.getSplitWeight().getRawValue());
}
}
}
if (isTableScanSource) {
pendingSourceSplitCount += added;
pendingSourceSplitsWeight = addExact(pendingSourceSplitsWeight, addedWeight);
updateTaskStats();
pendingSplits.put(sourceId, new ScheduledSplit(nextSplitId++, sourceId, split));
}
updateNeeded = true;
}
updateSplitQueueSpace();

if (updateNeeded) {
needsUpdate = true;
Expand Down Expand Up @@ -722,9 +727,7 @@ public PartitionedSplitsInfo getPartitionedSplitsInfo()
@SuppressWarnings("FieldAccessNotGuarded")
public PartitionedSplitsInfo getUnacknowledgedPartitionedSplitsInfo()
{
int count = pendingSourceSplitCount;
long weight = pendingSourceSplitsWeight;
return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
return PartitionedSplitsInfo.forSplitCountAndWeightSum(pendingSourceSplitCount.get(), pendingSourceSplitsWeight.get());
}

@Override
Expand All @@ -749,7 +752,7 @@ public int getUnacknowledgedPartitionedSplitCount()
@SuppressWarnings("FieldAccessNotGuarded")
private int getPendingSourceSplitCount()
{
return pendingSourceSplitCount;
return pendingSourceSplitCount.get();
}

private long getQueuedPartitionedSplitsWeight()
Expand All @@ -764,7 +767,7 @@ private long getQueuedPartitionedSplitsWeight()
@SuppressWarnings("FieldAccessNotGuarded")
private long getPendingSourceSplitsWeight()
{
return pendingSourceSplitsWeight;
return pendingSourceSplitsWeight.get();
}

@Override
Expand All @@ -782,35 +785,45 @@ public void addFinalTaskInfoListener(StateChangeListener<TaskInfo> stateChangeLi
@Override
public ListenableFuture<?> whenSplitQueueHasSpace(long weightThreshold)
{
if (splitQueueHasSpace) {
setSplitQueueWeightThreshold(weightThreshold);

if (splitQueueHasSpace()) {
return immediateFuture(null);
}
SettableFuture<?> future = SettableFuture.create();
safeExecuteOnEventLoop(() -> {
if (whenSplitQueueHasSpaceThreshold.isPresent()) {
checkArgument(weightThreshold == whenSplitQueueHasSpaceThreshold.getAsLong(), "Multiple split queue space notification thresholds not supported");
if (splitQueueHasSpace()) {
future.set(null);
}
else {
whenSplitQueueHasSpaceThreshold = OptionalLong.of(weightThreshold);
updateSplitQueueSpace();
whenSplitQueueHasSpace.createNewListener().addListener(() -> future.set(null), taskEventLoop);
}
if (splitQueueHasSpace) {
future.set(null);
}
whenSplitQueueHasSpace.createNewListener().addListener(() -> future.set(null), taskEventLoop);
}, "whenSplitQueueHasSpace");
return future;
}

private void setSplitQueueWeightThreshold(long weightThreshold)
{
long currentValue = whenSplitQueueWeightThreshold;
if (currentValue != Long.MAX_VALUE) {
checkArgument(weightThreshold == currentValue, "Multiple split queue space notification thresholds not supported");
}
else {
whenSplitQueueWeightThreshold = weightThreshold;
}
}

private boolean splitQueueHasSpace()
{
return getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits &&
getQueuedPartitionedSplitsWeight() < whenSplitQueueWeightThreshold;
}

private void updateSplitQueueSpace()
{
verify(taskEventLoop.inEventLoop());

// Must check whether the unacknowledged split count threshold is reached even without listeners registered yet
splitQueueHasSpace = getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits &&
(!whenSplitQueueHasSpaceThreshold.isPresent() || getQueuedPartitionedSplitsWeight() < whenSplitQueueHasSpaceThreshold.getAsLong());
// Only trigger notifications if a listener might be registered
if (splitQueueHasSpace && whenSplitQueueHasSpaceThreshold.isPresent()) {
if (splitQueueHasSpace()) {
whenSplitQueueHasSpace.complete(null, taskEventLoop);
}
}
Expand Down Expand Up @@ -838,12 +851,13 @@ private void processTaskUpdate(TaskInfo newValue, List<TaskSource> sources)
//Once it is converted to thrift we can use the isThrift enabled flag here.
updateTaskInfo(newValue);

int removed = 0;
long removedWeight = 0;

// remove acknowledged splits, which frees memory
for (TaskSource source : sources) {
PlanNodeId planNodeId = source.getPlanNodeId();
boolean isTableScanSource = tableScanPlanNodeIds.contains(planNodeId);
int removed = 0;
long removedWeight = 0;
for (ScheduledSplit split : source.getSplits()) {
if (pendingSplits.remove(planNodeId, split)) {
if (isTableScanSource) {
Expand All @@ -858,14 +872,14 @@ private void processTaskUpdate(TaskInfo newValue, List<TaskSource> sources)
for (Lifespan lifespan : source.getNoMoreSplitsForLifespan()) {
pendingNoMoreSplitsForLifespan.remove(planNodeId, lifespan);
}
if (isTableScanSource) {
pendingSourceSplitCount -= removed;
pendingSourceSplitsWeight -= removedWeight;
}
}
// Update stats before split queue space to ensure node stats are up to date before waking up the scheduler
updateTaskStats();
updateSplitQueueSpace();
if (removed != 0) {
pendingSourceSplitCount.addAndGet(-removed);
pendingSourceSplitsWeight.addAndGet(-removedWeight);
updateTaskStats();
updateSplitQueueSpace();
}
}

private void onSuccessTaskInfo(TaskInfo result)
Expand Down Expand Up @@ -1115,10 +1129,9 @@ private void cleanUpTask()

// clear pending splits to free memory
pendingSplits.clear();
pendingSourceSplitCount = 0;
pendingSourceSplitsWeight = 0;
pendingSourceSplitCount.set(0);
pendingSourceSplitsWeight.set(0);
updateTaskStats();
splitQueueHasSpace = true;
whenSplitQueueHasSpace.complete(null, taskEventLoop);

// cancel pending request
Expand Down
Loading