Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.execution.TaskId;
Expand All @@ -37,6 +38,7 @@
import io.trino.spi.HostAddress;
import io.trino.spi.TrinoException;
import io.trino.spi.memory.MemoryPoolInfo;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.assertj.core.util.VisibleForTesting;
Expand Down Expand Up @@ -106,6 +108,8 @@ public class BinPackingNodeAllocatorService
private final boolean optimizedLocalScheduling;

private final StatsHolder stats = new StatsHolder();
private final CounterStat processCalls = new CounterStat();
private final CounterStat processPending = new CounterStat();

@Inject
public BinPackingNodeAllocatorService(
Expand Down Expand Up @@ -219,6 +223,7 @@ void refreshNodePoolMemoryInfos()
@VisibleForTesting
synchronized void processPendingAcquires()
{
processCalls.update(1);
// Process EAGER_SPECULATIVE first; it increases the chance that tasks which have potential to end query early get scheduled to worker nodes.
// Even though EAGER_SPECULATIVE tasks depend on upstream STANDARD tasks this logic will not lead to deadlock.
// When processing STANDARD acquires below, we will ignore EAGER_SPECULATIVE (and SPECULATIVE) tasks when assessing if node has enough resources for processing task.
Expand Down Expand Up @@ -258,7 +263,9 @@ private void processPendingAcquires(TaskExecutionClass executionClass)
continue;
}

processPending.update(1);
BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire);
pendingAcquire.setLastReservationStatus(result.getStatus());
switch (result.getStatus()) {
case RESERVED:
InternalNode reservedNode = result.getNode();
Expand Down Expand Up @@ -331,20 +338,61 @@ public StatsHolder getStats()
return stats;
}

@Managed
@Nested
public CounterStat processCalls()
{
return processCalls;
}

@Managed
@Nested
public CounterStat processPending()
{
return processPending;
}

private void updateStats()
{
long pendingStandard = 0;
long pendingSpeculative = 0;
long pendingEagerSpeculative = 0;
long pendingStandardNoneMatching = 0;
long pendingStandardNotEnoughResources = 0;
long pendingStandardUnknown = 0;
long pendingSpeculativeNoneMatching = 0;
long pendingSpeculativeNotEnoughResources = 0;
long pendingSpeculativeUnknown = 0;
long pendingEagerSpeculativeNoneMatching = 0;
long pendingEagerSpeculativeNotEnoughResources = 0;
long pendingEagerSpeculativeUnknown = 0;
long fulfilledStandard = 0;
long fulfilledSpeculative = 0;
long fulfilledEagerSpeculative = 0;

for (PendingAcquire acquire : pendingAcquires) {
switch (acquire.getExecutionClass()) {
case STANDARD -> pendingStandard++;
case SPECULATIVE -> pendingSpeculative++;
case EAGER_SPECULATIVE -> pendingEagerSpeculative++;
case STANDARD -> {
switch (acquire.getLastReservationStatus()) {
case NONE_MATCHING -> pendingStandardNoneMatching++;
case NOT_ENOUGH_RESOURCES_NOW -> pendingStandardNotEnoughResources++;
case null -> pendingStandardUnknown++;
case RESERVED -> throw new IllegalArgumentException("unexpected last reservation status");
}
}
case SPECULATIVE -> {
switch (acquire.getLastReservationStatus()) {
case NONE_MATCHING -> pendingSpeculativeNoneMatching++;
case NOT_ENOUGH_RESOURCES_NOW -> pendingSpeculativeNotEnoughResources++;
case null -> pendingSpeculativeUnknown++;
case RESERVED -> throw new IllegalArgumentException("unexpected last reservation status");
}
}
case EAGER_SPECULATIVE -> {
switch (acquire.getLastReservationStatus()) {
case NONE_MATCHING -> pendingEagerSpeculativeNoneMatching++;
case NOT_ENOUGH_RESOURCES_NOW -> pendingEagerSpeculativeNotEnoughResources++;
case null -> pendingEagerSpeculativeUnknown++;
case RESERVED -> throw new IllegalArgumentException("unexpected last reservation status");
}
}
}
}

Expand All @@ -356,9 +404,15 @@ private void updateStats()
}
}
stats.updateStats(new Stats(
pendingStandard,
pendingSpeculative,
pendingEagerSpeculative,
pendingStandardNoneMatching,
pendingStandardNotEnoughResources,
pendingStandardUnknown,
pendingSpeculativeNoneMatching,
pendingSpeculativeNotEnoughResources,
pendingSpeculativeUnknown,
pendingEagerSpeculativeNoneMatching,
pendingEagerSpeculativeNotEnoughResources,
pendingEagerSpeculativeUnknown,
fulfilledStandard,
fulfilledSpeculative,
fulfilledEagerSpeculative));
Expand All @@ -369,6 +423,7 @@ private static class PendingAcquire
private final NodeRequirements nodeRequirements;
private final BinPackingNodeLease lease;
private final Stopwatch noMatchingNodeStopwatch;
@Nullable private volatile BinPackingSimulation.ReservationStatus lastReservationStatus;

private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease, Ticker ticker)
{
Expand Down Expand Up @@ -419,6 +474,18 @@ public TaskExecutionClass getExecutionClass()
{
return lease.getExecutionClass();
}

@Nullable
public BinPackingSimulation.ReservationStatus getLastReservationStatus()
{
return lastReservationStatus;
}

public void setLastReservationStatus(BinPackingSimulation.ReservationStatus lastReservationStatus)
{
requireNonNull(lastReservationStatus, "lastReservationStatus is null");
this.lastReservationStatus = lastReservationStatus;
}
}

private class BinPackingNodeLease
Expand Down Expand Up @@ -777,21 +844,57 @@ public void updateStats(Stats stats)
}

@Managed
public long getPendingStandard()
public long getPendingStandardNoneMatching()
{
return statsReference.get().pendingStandardNoneMatching();
}

@Managed
public long getPendingStandardNotEnoughResources()
{
return statsReference.get().pendingStandardNotEnoughResources();
}

@Managed
public long getPendingStandardUnknown()
{
return statsReference.get().pendingStandardUnknown();
}

@Managed
public long getPendingSpeculativeNoneMatching()
{
return statsReference.get().pendingSpeculativeNoneMatching();
}

@Managed
public long getPendingSpeculativeNotEnoughResources()
{
return statsReference.get().pendingSpeculativeNotEnoughResources();
}

@Managed
public long getPendingSpeculativeUnknown()
{
return statsReference.get().pendingSpeculativeUnknown();
}

@Managed
public long getPendingEagerSpeculativeNoneMatching()
{
return statsReference.get().pendingStandard();
return statsReference.get().pendingEagerSpeculativeNoneMatching();
}

@Managed
public long getPendingSpeculative()
public long getPendingEagerSpeculativeNotEnoughResources()
{
return statsReference.get().pendingSpeculative();
return statsReference.get().pendingEagerSpeculativeNotEnoughResources();
}

@Managed
public long getPendingEagerSpeculative()
public long getPendingEagerSpeculativeUnknown()
{
return statsReference.get().pendingEagerSpeculative();
return statsReference.get().pendingEagerSpeculativeUnknown();
}

@Managed
Expand All @@ -814,13 +917,19 @@ public long getFulfilledEagerSpeculative()
}

private record Stats(
long pendingStandard,
long pendingSpeculative,
long pendingEagerSpeculative,
long pendingStandardNoneMatching,
long pendingStandardNotEnoughResources,
long pendingStandardUnknown,
long pendingSpeculativeNoneMatching,
long pendingSpeculativeNotEnoughResources,
long pendingSpeculativeUnknown,
long pendingEagerSpeculativeNoneMatching,
long pendingEagerSpeculativeNotEnoughResources,
long pendingEagerSpeculativeUnknown,
long fulfilledStandard,
long fulfilledSpeculative,
long fulfilledEagerSpeculative)
{
static final Stats ZERO = new Stats(0, 0, 0, 0, 0, 0);
static final Stats ZERO = new Stats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
}
}