Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ protected Map<String, Long> runOnce()
session,
false,
false,
false,
false,
false);

CpuTimer cpuTimer = new CpuTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public List<Page> execute(@Language("SQL") String query)
localQueryRunner.getDefaultSession(),
false,
false,
false,
false,
false);

// Use NullOutputFactory to avoid coping out results to avoid affecting benchmark results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class BasicStageExecutionStats
false,
ImmutableSet.of(),

new DataSize(0, BYTE),

OptionalDouble.empty());

private final boolean isScheduled;
Expand All @@ -68,6 +70,7 @@ public class BasicStageExecutionStats
private final Duration totalScheduledTime;
private final boolean fullyBlocked;
private final Set<BlockedReason> blockedReasons;
private final DataSize totalAllocation;
private final OptionalDouble progressPercentage;

public BasicStageExecutionStats(
Expand All @@ -91,6 +94,8 @@ public BasicStageExecutionStats(
boolean fullyBlocked,
Set<BlockedReason> blockedReasons,

DataSize totalAllocation,

OptionalDouble progressPercentage)
{
this.isScheduled = isScheduled;
Expand All @@ -107,6 +112,7 @@ public BasicStageExecutionStats(
this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null");
this.fullyBlocked = fullyBlocked;
this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null"));
this.totalAllocation = requireNonNull(totalAllocation, "totalAllocation is null");
this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null");
}

Expand Down Expand Up @@ -180,6 +186,11 @@ public Set<BlockedReason> getBlockedReasons()
return blockedReasons;
}

public DataSize getTotalAllocation()
{
return totalAllocation;
}

public OptionalDouble getProgressPercentage()
{
return progressPercentage;
Expand Down Expand Up @@ -207,6 +218,8 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
boolean fullyBlocked = true;
Set<BlockedReason> blockedReasons = new HashSet<>();

long totalAllocation = 0;

for (BasicStageExecutionStats stageStats : stages) {
totalDrivers += stageStats.getTotalDrivers();
queuedDrivers += stageStats.getQueuedDrivers();
Expand All @@ -225,6 +238,8 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
fullyBlocked &= stageStats.isFullyBlocked();
blockedReasons.addAll(stageStats.getBlockedReasons());

totalAllocation += stageStats.getTotalAllocation().toBytes();

rawInputDataSize += stageStats.getRawInputDataSize().toBytes();
rawInputPositions += stageStats.getRawInputPositions();
}
Expand Down Expand Up @@ -255,6 +270,8 @@ public static BasicStageExecutionStats aggregateBasicStageStats(Iterable<BasicSt
fullyBlocked,
blockedReasons,

succinctBytes(totalAllocation),

progressPercentage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageExecutionStats> rootS

stageStats.isFullyBlocked(),
stageStats.getBlockedReasons(),

stageStats.getTotalAllocation(),

stageStats.getProgressPercentage());

return new BasicQueryInfo(
Expand Down Expand Up @@ -468,6 +471,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
long retriedCpuTime = 0;
long totalBlockedTime = 0;

long totalAllocation = 0;

long rawInputDataSize = 0;
long rawInputPositions = 0;

Expand Down Expand Up @@ -514,6 +519,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
blockedReasons.addAll(stageExecutionStats.getBlockedReasons());
}

totalAllocation += stageExecutionStats.getTotalAllocation().toBytes();

if (stageInfo.getPlan().isPresent()) {
PlanFragment plan = stageInfo.getPlan().get();
if (!plan.getTableScanSchedulingOrder().isEmpty()) {
Expand Down Expand Up @@ -596,6 +603,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
fullyBlocked,
blockedReasons,

succinctBytes(totalAllocation),

succinctBytes(rawInputDataSize),
rawInputPositions,
succinctBytes(processedInputDataSize),
Expand Down Expand Up @@ -1090,6 +1099,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getTotalBlockedTime(),
queryStats.isFullyBlocked(),
queryStats.getBlockedReasons(),
queryStats.getTotalAllocation(),
queryStats.getRawInputDataSize(),
queryStats.getRawInputPositions(),
queryStats.getProcessedInputDataSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class QueryStats
private final boolean fullyBlocked;
private final Set<BlockedReason> blockedReasons;

private final DataSize totalAllocation;

private final DataSize rawInputDataSize;
private final long rawInputPositions;

Expand Down Expand Up @@ -141,6 +143,8 @@ public QueryStats(
@JsonProperty("fullyBlocked") boolean fullyBlocked,
@JsonProperty("blockedReasons") Set<BlockedReason> blockedReasons,

@JsonProperty("totalAllocation") DataSize totalAllocation,

@JsonProperty("rawInputDataSize") DataSize rawInputDataSize,
@JsonProperty("rawInputPositions") long rawInputPositions,

Expand Down Expand Up @@ -208,6 +212,8 @@ public QueryStats(
this.fullyBlocked = fullyBlocked;
this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null"));

this.totalAllocation = requireNonNull(totalAllocation, "totalAllocation is null");

this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null");
checkArgument(rawInputPositions >= 0, "rawInputPositions is negative");
this.rawInputPositions = rawInputPositions;
Expand Down Expand Up @@ -270,6 +276,7 @@ public static QueryStats immediateFailureQueryStats()
false,
ImmutableSet.of(),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
0,
new DataSize(0, BYTE),
0,
Expand Down Expand Up @@ -488,6 +495,12 @@ public Set<BlockedReason> getBlockedReasons()
return blockedReasons;
}

@JsonProperty
public DataSize getTotalAllocation()
{
return totalAllocation;
}

@JsonProperty
public DataSize getRawInputDataSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class SqlTaskExecutionFactory
private final SplitMonitor splitMonitor;
private final boolean perOperatorCpuTimerEnabled;
private final boolean cpuTimerEnabled;
private final boolean perOperatorAllocationTrackingEnabled;
private final boolean allocationTrackingEnabled;
private final boolean legacyLifespanCompletionCondition;

public SqlTaskExecutionFactory(
Expand All @@ -68,6 +70,8 @@ public SqlTaskExecutionFactory(
requireNonNull(config, "config is null");
this.perOperatorCpuTimerEnabled = config.isPerOperatorCpuTimerEnabled();
this.cpuTimerEnabled = config.isTaskCpuTimerEnabled();
this.perOperatorAllocationTrackingEnabled = config.isPerOperatorAllocationTrackingEnabled();
this.allocationTrackingEnabled = config.isTaskAllocationTrackingEnabled();
this.legacyLifespanCompletionCondition = config.isLegacyLifespanCompletionCondition();
}

Expand All @@ -86,6 +90,8 @@ public SqlTaskExecution create(
session,
perOperatorCpuTimerEnabled,
cpuTimerEnabled,
perOperatorAllocationTrackingEnabled,
allocationTrackingEnabled,
legacyLifespanCompletionCondition);

LocalExecutionPlan localExecutionPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ public BasicStageExecutionStats getBasicStageStats(Supplier<Iterable<TaskInfo>>
boolean fullyBlocked = true;
Set<BlockedReason> blockedReasons = new HashSet<>();

long totalAllocation = 0;

for (TaskInfo taskInfo : taskInfos) {
TaskState taskState = taskInfo.getTaskStatus().getState();
TaskStats taskStats = taskInfo.getStats();
Expand All @@ -279,6 +281,8 @@ public BasicStageExecutionStats getBasicStageStats(Supplier<Iterable<TaskInfo>>
blockedReasons.addAll(taskStats.getBlockedReasons());
}

totalAllocation += taskStats.getTotalAllocation().toBytes();

if (containsTableScans) {
rawInputDataSize += taskStats.getRawInputDataSize().toBytes();
rawInputPositions += taskStats.getRawInputPositions();
Expand Down Expand Up @@ -311,6 +315,8 @@ public BasicStageExecutionStats getBasicStageStats(Supplier<Iterable<TaskInfo>>
fullyBlocked,
blockedReasons,

succinctBytes(totalAllocation),

progressPercentage);
}

Expand Down Expand Up @@ -349,6 +355,8 @@ public StageExecutionInfo getStageExecutionInfo(Supplier<Iterable<TaskInfo>> tas
long retriedCpuTime = 0;
long totalBlockedTime = 0;

long totalAllocation = 0;

long rawInputDataSize = 0;
long rawInputPositions = 0;

Expand Down Expand Up @@ -406,6 +414,8 @@ public StageExecutionInfo getStageExecutionInfo(Supplier<Iterable<TaskInfo>> tas
blockedReasons.addAll(taskStats.getBlockedReasons());
}

totalAllocation += taskStats.getTotalAllocation().toBytes();

rawInputDataSize += taskStats.getRawInputDataSize().toBytes();
rawInputPositions += taskStats.getRawInputPositions();

Expand Down Expand Up @@ -462,6 +472,8 @@ public StageExecutionInfo getStageExecutionInfo(Supplier<Iterable<TaskInfo>> tas
fullyBlocked && runningTasks > 0,
blockedReasons,

succinctBytes(totalAllocation),

succinctBytes(rawInputDataSize),
rawInputPositions,
succinctBytes(processedInputDataSize),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class StageExecutionStats
private final boolean fullyBlocked;
private final Set<BlockedReason> blockedReasons;

private final DataSize totalAllocation;

private final DataSize rawInputDataSize;
private final long rawInputPositions;

Expand Down Expand Up @@ -118,6 +120,8 @@ public StageExecutionStats(
@JsonProperty("fullyBlocked") boolean fullyBlocked,
@JsonProperty("blockedReasons") Set<BlockedReason> blockedReasons,

@JsonProperty("totalAllocation") DataSize totalAllocation,

@JsonProperty("rawInputDataSize") DataSize rawInputDataSize,
@JsonProperty("rawInputPositions") long rawInputPositions,

Expand Down Expand Up @@ -172,6 +176,8 @@ public StageExecutionStats(
this.fullyBlocked = fullyBlocked;
this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null"));

this.totalAllocation = requireNonNull(totalAllocation, "totalAllocation is null");

this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null");
checkArgument(rawInputPositions >= 0, "rawInputPositions is negative");
this.rawInputPositions = rawInputPositions;
Expand Down Expand Up @@ -324,6 +330,12 @@ public Set<BlockedReason> getBlockedReasons()
return blockedReasons;
}

@JsonProperty
public DataSize getTotalAllocation()
{
return totalAllocation;
}

@JsonProperty
public DataSize getRawInputDataSize()
{
Expand Down Expand Up @@ -408,6 +420,7 @@ public BasicStageExecutionStats toBasicStageStats(StageExecutionState stageExecu
totalScheduledTime,
fullyBlocked,
blockedReasons,
totalAllocation,
progressPercentage);
}

Expand Down Expand Up @@ -437,6 +450,7 @@ public static StageExecutionStats zero(int stageId)
false,
ImmutableSet.of(),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
0,
new DataSize(0, BYTE),
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class TaskManagerConfig
private boolean perOperatorCpuTimerEnabled = true;
private boolean taskCpuTimerEnabled = true;
private boolean statisticsCpuTimerEnabled = true;
private boolean perOperatorAllocationTrackingEnabled = true;
private boolean taskAllocationTrackingEnabled = true;
private DataSize maxPartialAggregationMemoryUsage = new DataSize(16, Unit.MEGABYTE);
private DataSize maxLocalExchangeBufferSize = new DataSize(32, Unit.MEGABYTE);
private DataSize maxIndexMemoryUsage = new DataSize(64, Unit.MEGABYTE);
Expand Down Expand Up @@ -165,6 +167,30 @@ public TaskManagerConfig setStatisticsCpuTimerEnabled(boolean statisticsCpuTimer
return this;
}

public boolean isPerOperatorAllocationTrackingEnabled()
{
return perOperatorAllocationTrackingEnabled;
}

@Config("task.per-operator-allocation-tracking-enabled")
public TaskManagerConfig setPerOperatorAllocationTrackingEnabled(boolean perOperatorAllocationTrackingEnabled)
{
this.perOperatorAllocationTrackingEnabled = perOperatorAllocationTrackingEnabled;
return this;
}

public boolean isTaskAllocationTrackingEnabled()
{
return taskAllocationTrackingEnabled;
}

@Config("task.allocation-tracking-enabled")
public TaskManagerConfig setTaskAllocationTrackingEnabled(boolean taskAllocationTrackingEnabled)
{
this.taskAllocationTrackingEnabled = taskAllocationTrackingEnabled;
return this;
}

@NotNull
public DataSize getMaxPartialAggregationMemoryUsage()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ public TaskContext addTaskContext(
Session session,
boolean perOperatorCpuTimerEnabled,
boolean cpuTimerEnabled,
boolean perOperatorAllocationTrackingEnabled,
boolean allocationTrackingEnabled,
boolean legacyLifespanCompletionCondition)
{
TaskContext taskContext = TaskContext.createTaskContext(
Expand All @@ -272,6 +274,8 @@ public TaskContext addTaskContext(
queryMemoryContext.newMemoryTrackingContext(),
perOperatorCpuTimerEnabled,
cpuTimerEnabled,
perOperatorAllocationTrackingEnabled,
allocationTrackingEnabled,
legacyLifespanCompletionCondition);
taskContexts.put(taskStateMachine.getTaskId(), taskContext);
return taskContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ private OperationTimer createTimer()
{
return new OperationTimer(
driverContext.isCpuTimerEnabled(),
driverContext.isCpuTimerEnabled() && driverContext.isPerOperatorCpuTimerEnabled());
driverContext.isCpuTimerEnabled() && driverContext.isPerOperatorCpuTimerEnabled(),
driverContext.isAllocationTrackingEnabled(),
driverContext.isAllocationTrackingEnabled() && driverContext.isPerOperatorAllocationTrackingEnabled());
}

private ListenableFuture<?> updateDriverBlockedFuture(ListenableFuture<?> sourceBlockedFuture)
Expand Down
Loading