diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java b/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java index 3baae38c107f0..f03c5e364bc86 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -40,6 +41,7 @@ import static com.facebook.presto.common.RuntimeUnit.NANO; import static com.facebook.presto.common.RuntimeUnit.NONE; import static com.facebook.presto.execution.StageExecutionState.FINISHED; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.units.Duration.succinctDuration; import static java.lang.Math.max; import static java.lang.Math.min; @@ -68,185 +70,84 @@ public static StageExecutionInfo create( int finishedLifespans, int totalLifespans) { - int totalTasks = taskInfos.size(); - int runningTasks = 0; - int completedTasks = 0; - - int totalDrivers = 0; - int queuedDrivers = 0; - int runningDrivers = 0; - int blockedDrivers = 0; - int completedDrivers = 0; - - double cumulativeUserMemory = 0; - double cumulativeTotalMemory = 0; - long userMemoryReservation = 0; - long totalMemoryReservation = 0; - - long totalScheduledTime = 0; - long totalCpuTime = 0; - long retriedCpuTime = 0; - long totalBlockedTime = 0; - - long totalAllocation = 0; - - long rawInputDataSize = 0; - long rawInputPositions = 0; - - long processedInputDataSize = 0; - long processedInputPositions = 0; - - long bufferedDataSize = 0; - long outputDataSize = 0; - long outputPositions = 0; - - long physicalWrittenDataSize = 0; - - int fullGcCount = 0; - int fullGcTaskCount = 0; - int minFullGcSec = 0; - int maxFullGcSec = 0; - int totalFullGcSec = 0; - - boolean fullyBlocked = true; - Set blockedReasons = new HashSet<>(); - - Map operatorToStats = new HashMap<>(); - RuntimeStats mergedRuntimeStats = new RuntimeStats(); - mergedRuntimeStats.mergeWith(stageRuntimeStats); - - List allTaskStats = new ArrayList<>(); + TaskStatsAggregator taskStatsAggregator = new TaskStatsAggregator(taskInfos.size(), stageRuntimeStats); for (TaskInfo taskInfo : taskInfos) { TaskState taskState = taskInfo.getTaskStatus().getState(); if (taskState.isDone()) { - completedTasks++; + taskStatsAggregator.increaseCompleteTaskCount(1); } else { - runningTasks++; + taskStatsAggregator.increaseRunningTaskCount(1); } TaskStats taskStats = taskInfo.getStats(); - allTaskStats.add(taskStats); if (state == FINISHED && taskInfo.getTaskStatus().getState() == TaskState.FAILED) { - retriedCpuTime += taskStats.getTotalCpuTimeInNanos(); + taskStatsAggregator.increaseRetriedCpuTime(taskStats.getTotalCpuTimeInNanos()); } if (!taskState.isDone()) { - fullyBlocked &= taskStats.isFullyBlocked(); - blockedReasons.addAll(taskStats.getBlockedReasons()); + taskStatsAggregator.updateFullyBlocked(taskStats.isFullyBlocked()); + taskStatsAggregator.addNewBlockedReasons(taskStats.getBlockedReasons()); } - bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes(); - } - - for (TaskStats taskStats : allTaskStats) { - totalDrivers += taskStats.getTotalDrivers(); - queuedDrivers += taskStats.getQueuedDrivers(); - runningDrivers += taskStats.getRunningDrivers(); - blockedDrivers += taskStats.getBlockedDrivers(); - completedDrivers += taskStats.getCompletedDrivers(); - - cumulativeUserMemory += taskStats.getCumulativeUserMemory(); - cumulativeTotalMemory += taskStats.getCumulativeTotalMemory(); - - long taskUserMemory = taskStats.getUserMemoryReservationInBytes(); - long taskSystemMemory = taskStats.getSystemMemoryReservationInBytes(); - userMemoryReservation += taskUserMemory; - totalMemoryReservation += taskUserMemory + taskSystemMemory; - - totalScheduledTime += taskStats.getTotalScheduledTimeInNanos(); - totalCpuTime += taskStats.getTotalCpuTimeInNanos(); - totalBlockedTime += taskStats.getTotalBlockedTimeInNanos(); - - totalAllocation += taskStats.getTotalAllocationInBytes(); - - rawInputDataSize += taskStats.getRawInputDataSizeInBytes(); - rawInputPositions += taskStats.getRawInputPositions(); - - processedInputDataSize += taskStats.getProcessedInputDataSizeInBytes(); - processedInputPositions += taskStats.getProcessedInputPositions(); - - outputDataSize += taskStats.getOutputDataSizeInBytes(); - outputPositions += taskStats.getOutputPositions(); - - physicalWrittenDataSize += taskStats.getPhysicalWrittenDataSizeInBytes(); - - fullGcCount += taskStats.getFullGcCount(); - fullGcTaskCount += taskStats.getFullGcCount() > 0 ? 1 : 0; - - int gcSec = toIntExact(MILLISECONDS.toSeconds(taskStats.getFullGcTimeInMillis())); - totalFullGcSec += gcSec; - minFullGcSec = min(minFullGcSec, gcSec); - maxFullGcSec = max(maxFullGcSec, gcSec); - - for (PipelineStats pipeline : taskStats.getPipelines()) { - for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) { - String id = pipeline.getPipelineId() + "." + operatorStats.getOperatorId(); - operatorToStats.compute(id, (k, v) -> v == null ? operatorStats : v.add(operatorStats)); - } - } - mergedRuntimeStats.mergeWith(taskStats.getRuntimeStats()); - mergedRuntimeStats.addMetricValue(DRIVER_COUNT_PER_TASK, NONE, taskStats.getTotalDrivers()); - mergedRuntimeStats.addMetricValue(TASK_ELAPSED_TIME_NANOS, NANO, taskStats.getElapsedTimeInNanos()); - mergedRuntimeStats.addMetricValueIgnoreZero(TASK_QUEUED_TIME_NANOS, NANO, taskStats.getQueuedTimeInNanos()); - mergedRuntimeStats.addMetricValue(TASK_SCHEDULED_TIME_NANOS, NANO, taskStats.getTotalScheduledTimeInNanos()); - mergedRuntimeStats.addMetricValueIgnoreZero(TASK_BLOCKED_TIME_NANOS, NANO, taskStats.getTotalBlockedTimeInNanos()); + taskStatsAggregator.increaseBufferedDataSize(taskInfo.getOutputBuffers().getTotalBufferedBytes()); + taskStatsAggregator.processTaskStats(taskStats); } StageExecutionStats stageExecutionStats = new StageExecutionStats( schedulingCompleteInMillis, getSplitDistribution, - totalTasks, - runningTasks, - completedTasks, + taskStatsAggregator.totalTaskCount, + taskStatsAggregator.runningTaskCount, + taskStatsAggregator.completedTaskCount, totalLifespans, finishedLifespans, - totalDrivers, - queuedDrivers, - runningDrivers, - blockedDrivers, - completedDrivers, + taskStatsAggregator.totalDrivers, + taskStatsAggregator.queuedDrivers, + taskStatsAggregator.runningDrivers, + taskStatsAggregator.blockedDrivers, + taskStatsAggregator.completedDrivers, - cumulativeUserMemory, - cumulativeTotalMemory, - userMemoryReservation, - totalMemoryReservation, + taskStatsAggregator.cumulativeUserMemory, + taskStatsAggregator.cumulativeTotalMemory, + taskStatsAggregator.userMemoryReservation, + taskStatsAggregator.totalMemoryReservation, peakUserMemoryReservation, peakNodeTotalMemoryReservation, - succinctDuration(totalScheduledTime, NANOSECONDS), - succinctDuration(totalCpuTime, NANOSECONDS), - succinctDuration(retriedCpuTime, NANOSECONDS), - succinctDuration(totalBlockedTime, NANOSECONDS), - fullyBlocked && runningTasks > 0, - blockedReasons, - totalAllocation, - - rawInputDataSize, - rawInputPositions, - processedInputDataSize, - processedInputPositions, - bufferedDataSize, - outputDataSize, - outputPositions, - physicalWrittenDataSize, + succinctDuration(taskStatsAggregator.totalScheduledTime, NANOSECONDS), + succinctDuration(taskStatsAggregator.totalCpuTime, NANOSECONDS), + succinctDuration(taskStatsAggregator.retriedCpuTime, NANOSECONDS), + succinctDuration(taskStatsAggregator.totalBlockedTime, NANOSECONDS), + taskStatsAggregator.fullyBlocked && taskStatsAggregator.runningTaskCount > 0, + taskStatsAggregator.blockedReasons, + + taskStatsAggregator.totalAllocation, + + taskStatsAggregator.rawInputDataSize, + taskStatsAggregator.rawInputPositions, + taskStatsAggregator.processedInputDataSize, + taskStatsAggregator.processedInputPositions, + taskStatsAggregator.bufferedDataSize, + taskStatsAggregator.outputDataSize, + taskStatsAggregator.outputPositions, + taskStatsAggregator.physicalWrittenDataSize, new StageGcStatistics( stageExecutionId.getStageId().getId(), stageExecutionId.getId(), - totalTasks, - fullGcTaskCount, - minFullGcSec, - maxFullGcSec, - totalFullGcSec, - (int) (1.0 * totalFullGcSec / fullGcCount)), - - ImmutableList.copyOf(operatorToStats.values()), - mergedRuntimeStats); + taskStatsAggregator.totalTaskCount, + taskStatsAggregator.fullGcTaskCount, + taskStatsAggregator.minFullGcSec, + taskStatsAggregator.maxFullGcSec, + taskStatsAggregator.totalFullGcSec, + (int) (1.0 * taskStatsAggregator.totalFullGcSec / taskStatsAggregator.fullGcCount)), + taskStatsAggregator.getOperatorSummaries(), + taskStatsAggregator.getMergedRuntimeStats()); return new StageExecutionInfo( state, @@ -297,12 +198,198 @@ public boolean isFinal() return state.isDone() && tasks.stream().allMatch(taskInfo -> taskInfo.getTaskStatus().getState().isDone()); } - public static StageExecutionInfo unscheduledExecutionInfo(int stageId, boolean isQueryDone) + private static class OperatorKey { - return new StageExecutionInfo( - isQueryDone ? StageExecutionState.ABORTED : StageExecutionState.PLANNED, - StageExecutionStats.zero(stageId), - ImmutableList.of(), - Optional.empty()); + private final int pipelineId; + private final int operatorId; + + public OperatorKey(int pipelineId, int operatorId) + { + this.pipelineId = pipelineId; + this.operatorId = operatorId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OperatorKey that = (OperatorKey) o; + return pipelineId == that.pipelineId && operatorId == that.operatorId; + } + + @Override + public int hashCode() + { + return Objects.hash(pipelineId, operatorId); + } + } + + private static class TaskStatsAggregator + { + private final int totalTaskCount; + private int runningTaskCount; + private int completedTaskCount; + private long retriedCpuTime; + private long bufferedDataSize; + + private boolean fullyBlocked = true; + private final Set blockedReasons = new HashSet<>(); + + private int totalDrivers; + private int queuedDrivers; + private int runningDrivers; + private int blockedDrivers; + private int completedDrivers; + + private double cumulativeUserMemory; + private double cumulativeTotalMemory; + private long userMemoryReservation; + private long totalMemoryReservation; + + private long totalScheduledTime; + private long totalCpuTime; + private long totalBlockedTime; + + private long totalAllocation; + + private long rawInputDataSize; + private long rawInputPositions; + + private long processedInputDataSize; + private long processedInputPositions; + + private long outputDataSize; + private long outputPositions; + + private long physicalWrittenDataSize; + + private int fullGcCount; + private int fullGcTaskCount; + private int minFullGcSec; + private int maxFullGcSec; + private int totalFullGcSec; + + private final RuntimeStats mergedRuntimeStats = new RuntimeStats(); + private final Map> operatorStatsByKey = new HashMap<>(); + + public TaskStatsAggregator(int totalTaskCount, RuntimeStats stageRuntimeStats) + { + this.totalTaskCount = totalTaskCount; + this.mergedRuntimeStats.mergeWith(stageRuntimeStats); + } + + public void processTaskStats(TaskStats taskStats) + { + totalDrivers += taskStats.getTotalDrivers(); + queuedDrivers += taskStats.getQueuedDrivers(); + runningDrivers += taskStats.getRunningDrivers(); + blockedDrivers += taskStats.getBlockedDrivers(); + completedDrivers += taskStats.getCompletedDrivers(); + + cumulativeUserMemory += taskStats.getCumulativeUserMemory(); + cumulativeTotalMemory += taskStats.getCumulativeTotalMemory(); + + long taskUserMemory = taskStats.getUserMemoryReservationInBytes(); + long taskSystemMemory = taskStats.getSystemMemoryReservationInBytes(); + userMemoryReservation += taskUserMemory; + totalMemoryReservation += taskUserMemory + taskSystemMemory; + + totalScheduledTime += taskStats.getTotalScheduledTimeInNanos(); + totalCpuTime += taskStats.getTotalCpuTimeInNanos(); + totalBlockedTime += taskStats.getTotalBlockedTimeInNanos(); + + totalAllocation += taskStats.getTotalAllocationInBytes(); + + rawInputDataSize += taskStats.getRawInputDataSizeInBytes(); + rawInputPositions += taskStats.getRawInputPositions(); + + processedInputDataSize += taskStats.getProcessedInputDataSizeInBytes(); + processedInputPositions += taskStats.getProcessedInputPositions(); + + outputDataSize += taskStats.getOutputDataSizeInBytes(); + outputPositions += taskStats.getOutputPositions(); + + physicalWrittenDataSize += taskStats.getPhysicalWrittenDataSizeInBytes(); + + fullGcCount += taskStats.getFullGcCount(); + fullGcTaskCount += taskStats.getFullGcCount() > 0 ? 1 : 0; + + int gcSec = toIntExact(MILLISECONDS.toSeconds(taskStats.getFullGcTimeInMillis())); + totalFullGcSec += gcSec; + minFullGcSec = min(minFullGcSec, gcSec); + maxFullGcSec = max(maxFullGcSec, gcSec); + + updateOperatorStats(taskStats); + updateRuntimeStats(taskStats); + } + + private void updateOperatorStats(TaskStats taskStats) + { + // Collect all operator stats by their key + for (PipelineStats pipeline : taskStats.getPipelines()) { + for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) { + operatorStatsByKey.computeIfAbsent(new OperatorKey(pipeline.getPipelineId(), operatorStats.getOperatorId()), k -> new ArrayList<>()).add(operatorStats); + } + } + } + + private void updateRuntimeStats(TaskStats taskStats) + { + mergedRuntimeStats.mergeWith(taskStats.getRuntimeStats()); + mergedRuntimeStats.addMetricValue(DRIVER_COUNT_PER_TASK, NONE, taskStats.getTotalDrivers()); + mergedRuntimeStats.addMetricValue(TASK_ELAPSED_TIME_NANOS, NANO, taskStats.getElapsedTimeInNanos()); + mergedRuntimeStats.addMetricValueIgnoreZero(TASK_QUEUED_TIME_NANOS, NANO, taskStats.getQueuedTimeInNanos()); + mergedRuntimeStats.addMetricValue(TASK_SCHEDULED_TIME_NANOS, NANO, taskStats.getTotalScheduledTimeInNanos()); + mergedRuntimeStats.addMetricValueIgnoreZero(TASK_BLOCKED_TIME_NANOS, NANO, taskStats.getTotalBlockedTimeInNanos()); + } + + public RuntimeStats getMergedRuntimeStats() + { + return mergedRuntimeStats; + } + + public List getOperatorSummaries() + { + return operatorStatsByKey.values().stream() + .map(OperatorStats::merge) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()); + } + + public void increaseRunningTaskCount(int count) + { + runningTaskCount += count; + } + + public void increaseCompleteTaskCount(int count) + { + completedTaskCount += count; + } + + public void increaseRetriedCpuTime(long time) + { + retriedCpuTime += time; + } + + public void updateFullyBlocked(boolean blocked) + { + fullyBlocked &= blocked; + } + + public void addNewBlockedReasons(Set reasons) + { + blockedReasons.addAll(reasons); + } + + public void increaseBufferedDataSize(long bytes) + { + bufferedDataSize += bytes; + } } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorStats.java b/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorStats.java index 4cfe37c45ab31..0208a2b10d49d 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorStats.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorStats.java @@ -21,12 +21,13 @@ import com.facebook.presto.util.Mergeable; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import io.airlift.units.Duration; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import java.util.HashSet; +import java.util.List; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; @@ -742,67 +743,80 @@ public long getIsBlockedAllocationInBytes() return isBlockedAllocationInBytes; } - public OperatorStats add(OperatorStats operatorStats) + public static Optional merge(List operators) { - return add(ImmutableList.of(operatorStats)); - } - - public OperatorStats add(Iterable operators) - { - long totalDrivers = this.totalDrivers; - - long isBlockedCalls = this.isBlockedCalls; - long isBlockedWall = this.isBlockedWall.roundTo(NANOSECONDS); - long isBlockedCpu = this.isBlockedCpu.roundTo(NANOSECONDS); - long isBlockedAllocation = this.isBlockedAllocationInBytes; - - long addInputCalls = this.addInputCalls; - long addInputWall = this.addInputWall.roundTo(NANOSECONDS); - long addInputCpu = this.addInputCpu.roundTo(NANOSECONDS); - double addInputAllocation = this.addInputAllocationInBytes; - double rawInputDataSize = this.rawInputDataSizeInBytes; - long rawInputPositions = this.rawInputPositions; - double inputDataSize = this.inputDataSizeInBytes; - long inputPositions = this.inputPositions; - double sumSquaredInputPositions = this.sumSquaredInputPositions; - - long getOutputCalls = this.getOutputCalls; - long getOutputWall = this.getOutputWall.roundTo(NANOSECONDS); - long getOutputCpu = this.getOutputCpu.roundTo(NANOSECONDS); - double getOutputAllocation = this.getOutputAllocationInBytes; - double outputDataSize = this.outputDataSizeInBytes; - long outputPositions = this.outputPositions; - - double physicalWrittenDataSize = this.physicalWrittenDataSizeInBytes; - - long additionalCpu = this.additionalCpu.roundTo(NANOSECONDS); - long blockedWall = this.blockedWall.roundTo(NANOSECONDS); - - long finishCalls = this.finishCalls; - long finishWall = this.finishWall.roundTo(NANOSECONDS); - long finishCpu = this.finishCpu.roundTo(NANOSECONDS); - long finishAllocation = this.finishAllocationInBytes; - - double memoryReservation = this.userMemoryReservationInBytes; - double revocableMemoryReservation = this.revocableMemoryReservationInBytes; - double systemMemoryReservation = this.systemMemoryReservationInBytes; - double peakUserMemory = this.peakUserMemoryReservationInBytes; - double peakSystemMemory = this.peakSystemMemoryReservationInBytes; - double peakTotalMemory = this.peakTotalMemoryReservationInBytes; - - double spilledDataSize = this.spilledDataSizeInBytes; - - Optional blockedReason = this.blockedReason; + if (operators.isEmpty()) { + return Optional.empty(); + } - RuntimeStats runtimeStats = RuntimeStats.copyOf(this.runtimeStats); - DynamicFilterStats dynamicFilterStats = DynamicFilterStats.copyOf(this.dynamicFilterStats); + if (operators.size() == 1) { + return Optional.of(operators.get(0)); + } - long nullJoinBuildKeyCount = this.nullJoinBuildKeyCount; - long joinBuildKeyCount = this.joinBuildKeyCount; - long nullJoinProbeKeyCount = this.nullJoinProbeKeyCount; - long joinProbeKeyCount = this.joinProbeKeyCount; + OperatorStats first = operators.stream().findFirst().get(); + int stageId = first.getStageId(); + int operatorId = first.getOperatorId(); + int stageExecutionId = first.getStageExecutionId(); + int pipelineId = first.getPipelineId(); + PlanNodeId planNodeId = first.getPlanNodeId(); + String operatorType = first.getOperatorType(); + + long totalDrivers = 0; + + long isBlockedCalls = 0; + long isBlockedWall = 0; + long isBlockedCpu = 0; + long isBlockedAllocation = 0; + + long addInputCalls = 0; + long addInputWall = 0; + long addInputCpu = 0; + double addInputAllocation = 0; + double rawInputDataSize = 0; + long rawInputPositions = 0; + double inputDataSize = 0; + long inputPositions = 0; + double sumSquaredInputPositions = 0.0; + + long getOutputCalls = 0; + long getOutputWall = 0; + long getOutputCpu = 0; + double getOutputAllocation = 0; + double outputDataSize = 0; + long outputPositions = 0; + + double physicalWrittenDataSize = 0; + + long additionalCpu = 0; + long blockedWall = 0; + + long finishCalls = 0; + long finishWall = 0; + long finishCpu = 0; + long finishAllocation = 0; + + double memoryReservation = 0; + double revocableMemoryReservation = 0; + double systemMemoryReservation = 0; + double peakUserMemory = 0; + double peakSystemMemory = 0; + double peakTotalMemory = 0; + + double spilledDataSize = 0; + + long nullJoinBuildKeyCount = 0; + long joinBuildKeyCount = 0; + long nullJoinProbeKeyCount = 0; + long joinProbeKeyCount = 0; + + RuntimeStats runtimeStats = new RuntimeStats(); + DynamicFilterStats dynamicFilterStats = new DynamicFilterStats(new HashSet<>()); + + Optional blockedReason = Optional.empty(); + + boolean mergeInfo = first.getInfo() instanceof Mergeable; + Mergeable base = null; - Mergeable base = getMergeableInfoOrNull(info); for (OperatorStats operator : operators) { checkArgument(operator.getOperatorId() == operatorId, "Expected operatorId to be %s but was %s", operatorId, operator.getOperatorId()); @@ -855,8 +869,13 @@ public OperatorStats add(Iterable operators) } OperatorInfo info = operator.getInfo(); - if (base != null && info != null && base.getClass() == info.getClass()) { - base = mergeInfo(base, info); + if (mergeInfo) { + if (base == null) { + base = (Mergeable) info; + } + else if (info != null && info.getClass() == base.getClass()) { + base = mergeInfo(base, info); + } } runtimeStats.mergeWith(operator.getRuntimeStats()); @@ -868,7 +887,7 @@ public OperatorStats add(Iterable operators) joinProbeKeyCount += operator.getJoinProbeKeyCount(); } - return new OperatorStats( + return Optional.of(new OperatorStats( stageId, stageExecutionId, pipelineId, @@ -921,13 +940,13 @@ public OperatorStats add(Iterable operators) blockedReason, - (OperatorInfo) base, + mergeInfo ? (OperatorInfo) base : null, runtimeStats, dynamicFilterStats, nullJoinBuildKeyCount, joinBuildKeyCount, nullJoinProbeKeyCount, - joinProbeKeyCount); + joinProbeKeyCount)); } @SuppressWarnings("unchecked") diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/PipelineContext.java b/presto-main-base/src/main/java/com/facebook/presto/operator/PipelineContext.java index beecc028cc7c1..99851036dc02d 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/PipelineContext.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/PipelineContext.java @@ -23,18 +23,18 @@ import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.memory.context.MemoryTrackingContext; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -44,10 +44,12 @@ import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.System.currentTimeMillis; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; @ThreadSafe public class PipelineContext @@ -92,7 +94,7 @@ public class PipelineContext private final AtomicLong physicalWrittenDataSize = new AtomicLong(); - private final ConcurrentMap operatorSummaries = new ConcurrentHashMap<>(); + private final ConcurrentMap operatorStatsById = new ConcurrentHashMap<>(); private final MemoryTrackingContext pipelineMemoryContext; @@ -198,10 +200,10 @@ public void driverFinished(DriverContext driverContext) totalAllocation.getAndAdd(driverStats.getTotalAllocationInBytes()); - // merge the operator stats into the operator summary List operators = driverStats.getOperatorStats(); for (OperatorStats operator : operators) { - operatorSummaries.compute(operator.getOperatorId(), (operatorId, summaryStats) -> summaryStats == null ? operator : summaryStats.add(operator)); + operatorStatsById.compute(operator.getOperatorId(), + (operatorId, summaryStats) -> summaryStats == null ? operator : OperatorStats.merge(ImmutableList.of(operator, summaryStats)).orElse(null)); } rawInputDataSize.update(driverStats.getRawInputDataSizeInBytes()); @@ -377,9 +379,11 @@ public PipelineStats getPipelineStats() boolean hasUnfinishedDrivers = false; boolean unfinishedDriversFullyBlocked = true; - TreeMap operatorSummaries = new TreeMap<>(this.operatorSummaries); - ListMultimap runningOperators = ArrayListMultimap.create(); ImmutableList.Builder drivers = ImmutableList.builderWithExpectedSize(driverContexts.size()); + // Make deep copy of each list + Map> operatorStatsById = this.operatorStatsById.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> new ArrayList<>(Arrays.asList(e.getValue())))); + for (DriverContext driverContext : driverContexts) { DriverStats driverStats = driverContext.getDriverStats(); drivers.add(driverStats); @@ -401,7 +405,7 @@ public PipelineStats getPipelineStats() totalAllocation += driverStats.getTotalAllocationInBytes(); for (OperatorStats operatorStats : driverStats.getOperatorStats()) { - runningOperators.put(operatorStats.getOperatorId(), operatorStats); + operatorStatsById.computeIfAbsent(operatorStats.getOperatorId(), k -> new ArrayList<>()).add(operatorStats); } rawInputDataSize += driverStats.getRawInputDataSizeInBytes(); @@ -416,26 +420,6 @@ public PipelineStats getPipelineStats() physicalWrittenDataSize += driverStats.getPhysicalWrittenDataSizeInBytes(); } - // merge the running operator stats into the operator summary - for (Integer operatorId : runningOperators.keySet()) { - List runningStats = runningOperators.get(operatorId); - if (runningStats.isEmpty()) { - continue; - } - OperatorStats current = operatorSummaries.get(operatorId); - OperatorStats combined; - if (current != null) { - combined = current.add(runningStats); - } - else { - combined = runningStats.get(0); - if (runningStats.size() > 1) { - combined = combined.add(runningStats.subList(1, runningStats.size())); - } - } - operatorSummaries.put(operatorId, combined); - } - PipelineStatus pipelineStatus = pipelineStatusBuilder.build(); boolean fullyBlocked = hasUnfinishedDrivers && unfinishedDriversFullyBlocked; @@ -485,7 +469,11 @@ public PipelineStats getPipelineStats() physicalWrittenDataSize, - ImmutableList.copyOf(operatorSummaries.values()), + operatorStatsById.values().stream() + .map(OperatorStats::merge) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()), drivers.build()); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/operator/TestDriverStats.java b/presto-main-base/src/test/java/com/facebook/presto/operator/TestDriverStats.java index 5dd790b5384d8..6750dc4d30170 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/operator/TestDriverStats.java +++ b/presto-main-base/src/test/java/com/facebook/presto/operator/TestDriverStats.java @@ -62,7 +62,7 @@ public class TestDriverStats 20L, - ImmutableList.of(TestOperatorStats.EXPECTED)); + ImmutableList.of(TestOperatorStats.NON_MERGEABLE)); @Test public void testJson() diff --git a/presto-main-base/src/test/java/com/facebook/presto/operator/TestOperatorStats.java b/presto-main-base/src/test/java/com/facebook/presto/operator/TestOperatorStats.java index 1c43c601692b2..b2a4f01dc5a55 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/operator/TestOperatorStats.java +++ b/presto-main-base/src/test/java/com/facebook/presto/operator/TestOperatorStats.java @@ -30,6 +30,8 @@ import static com.facebook.presto.common.RuntimeUnit.NONE; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; public class TestOperatorStats @@ -39,12 +41,12 @@ public class TestOperatorStats private static final String TEST_METRIC_NAME = "test_metric"; private static final RuntimeMetric TEST_RUNTIME_METRIC_1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 10, 2, 9, 1); private static final RuntimeMetric TEST_RUNTIME_METRIC_2 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 3, 2); - private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_1 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId[] {new PlanNodeId("1"), - new PlanNodeId("2")}))); - private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_2 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId[] {new PlanNodeId("2"), - new PlanNodeId("3")}))); + private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_1 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId("1"), + new PlanNodeId("2")))); + private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_2 = new DynamicFilterStats(new HashSet<>(Arrays.asList(new PlanNodeId("2"), + new PlanNodeId("3")))); - public static final OperatorStats EXPECTED = new OperatorStats( + public static final OperatorStats NON_MERGEABLE = new OperatorStats( 0, 10, 1, @@ -165,7 +167,7 @@ public void testJson() { JsonCodec codec = JsonCodec.jsonCodec(OperatorStats.class); - String json = codec.toJson(EXPECTED); + String json = codec.toJson(NON_MERGEABLE); OperatorStats actual = codec.fromJson(json); assertExpectedOperatorStats(actual); @@ -227,9 +229,115 @@ public static void assertExpectedOperatorStats(OperatorStats actual) } @Test - public void testAdd() + public void testAddMixedStartingWithMergeable() { - OperatorStats actual = EXPECTED.add(ImmutableList.of(EXPECTED, EXPECTED)); + OperatorStats actual = OperatorStats.merge(ImmutableList.of(MERGEABLE, NON_MERGEABLE, NON_MERGEABLE)).get(); + + assertEquals(actual.getStageId(), 0); + assertEquals(actual.getStageExecutionId(), 10); + assertEquals(actual.getOperatorId(), 41); + assertEquals(actual.getOperatorType(), "test"); + + assertEquals(actual.getTotalDrivers(), 3 * 1); + assertEquals(actual.getAddInputCalls(), 3 * 2); + assertEquals(actual.getAddInputWall(), new Duration(3 * 3, NANOSECONDS)); + assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); + assertEquals(actual.getAddInputAllocationInBytes(), 3 * 123); + assertEquals(actual.getRawInputDataSizeInBytes(), 3 * 5); + assertEquals(actual.getInputDataSizeInBytes(), 3 * 6); + assertEquals(actual.getInputPositions(), 3 * 7); + assertEquals(actual.getSumSquaredInputPositions(), 3 * 8.0); + + assertEquals(actual.getGetOutputCalls(), 3 * 9); + assertEquals(actual.getGetOutputWall(), new Duration(3 * 10, NANOSECONDS)); + assertEquals(actual.getGetOutputCpu(), new Duration(3 * 11, NANOSECONDS)); + assertEquals(actual.getGetOutputAllocationInBytes(), 3 * 234); + assertEquals(actual.getOutputDataSizeInBytes(), 3 * 12); + assertEquals(actual.getOutputPositions(), 3 * 13); + + assertEquals(actual.getPhysicalWrittenDataSizeInBytes(), 3 * 14); + assertEquals(actual.getAdditionalCpu(), new Duration(3 * 100, NANOSECONDS)); + assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS)); + + assertEquals(actual.getFinishCalls(), 3 * 16); + assertEquals(actual.getFinishWall(), new Duration(3 * 17, NANOSECONDS)); + assertEquals(actual.getFinishCpu(), new Duration(3 * 18, NANOSECONDS)); + assertEquals(actual.getFinishAllocationInBytes(), 3 * 345); + + assertEquals(actual.getUserMemoryReservationInBytes(), Long.MAX_VALUE); + assertEquals(actual.getRevocableMemoryReservationInBytes(), 3 * 20); + assertEquals(actual.getSystemMemoryReservationInBytes(), 3 * 21); + assertEquals(actual.getPeakUserMemoryReservationInBytes(), 22); + assertEquals(actual.getPeakSystemMemoryReservationInBytes(), 23); + assertEquals(actual.getPeakTotalMemoryReservationInBytes(), 24); + assertEquals(actual.getSpilledDataSizeInBytes(), 3 * 25); + assertNotNull(actual.getInfo()); + + RuntimeMetric expectedMetric = RuntimeMetric.merge(TEST_RUNTIME_METRIC_2, TEST_RUNTIME_METRIC_1); + expectedMetric.mergeWith(TEST_RUNTIME_METRIC_1); + assertRuntimeMetricEquals(actual.getRuntimeStats().getMetric(TEST_METRIC_NAME), expectedMetric); + + DynamicFilterStats expectedDynamicFilterStats = DynamicFilterStats.copyOf(TEST_DYNAMIC_FILTER_STATS_1); + expectedDynamicFilterStats.mergeWith(TEST_DYNAMIC_FILTER_STATS_2); + assertEquals(actual.getDynamicFilterStats().getProducerNodeIds(), expectedDynamicFilterStats.getProducerNodeIds()); + } + + @Test + public void testSingleNonMergeable() + { + OperatorStats actual = OperatorStats.merge(ImmutableList.of(NON_MERGEABLE)).get(); + + assertEquals(actual.getStageId(), 0); + assertEquals(actual.getStageExecutionId(), 10); + assertEquals(actual.getOperatorId(), 41); + assertEquals(actual.getOperatorType(), "test"); + + assertEquals(actual.getTotalDrivers(), 1 * 1); + assertEquals(actual.getAddInputCalls(), 1 * 2); + assertEquals(actual.getAddInputWall(), new Duration(1 * 3, NANOSECONDS)); + assertEquals(actual.getAddInputCpu(), new Duration(1 * 4, NANOSECONDS)); + assertEquals(actual.getAddInputAllocationInBytes(), 1 * 123); + assertEquals(actual.getRawInputDataSizeInBytes(), 1 * 5); + assertEquals(actual.getInputDataSizeInBytes(), 1 * 6); + assertEquals(actual.getInputPositions(), 1 * 7); + assertEquals(actual.getSumSquaredInputPositions(), 1 * 8.0); + + assertEquals(actual.getGetOutputCalls(), 1 * 9); + assertEquals(actual.getGetOutputWall(), new Duration(1 * 10, NANOSECONDS)); + assertEquals(actual.getGetOutputCpu(), new Duration(1 * 11, NANOSECONDS)); + assertEquals(actual.getGetOutputAllocationInBytes(), 1 * 234); + assertEquals(actual.getOutputDataSizeInBytes(), 1 * 12); + assertEquals(actual.getOutputPositions(), 1 * 13); + + assertEquals(actual.getPhysicalWrittenDataSizeInBytes(), 1 * 14); + assertEquals(actual.getAdditionalCpu(), new Duration(1 * 100, NANOSECONDS)); + assertEquals(actual.getBlockedWall(), new Duration(1 * 15, NANOSECONDS)); + + assertEquals(actual.getFinishCalls(), 1 * 16); + assertEquals(actual.getFinishWall(), new Duration(1 * 17, NANOSECONDS)); + assertEquals(actual.getFinishCpu(), new Duration(1 * 18, NANOSECONDS)); + assertEquals(actual.getFinishAllocationInBytes(), 1 * 345); + + assertEquals(actual.getUserMemoryReservationInBytes(), Long.MAX_VALUE); + assertEquals(actual.getRevocableMemoryReservationInBytes(), 1 * 20); + assertEquals(actual.getSystemMemoryReservationInBytes(), 1 * 21); + assertEquals(actual.getPeakUserMemoryReservationInBytes(), 22); + assertEquals(actual.getPeakSystemMemoryReservationInBytes(), 23); + assertEquals(actual.getPeakTotalMemoryReservationInBytes(), 24); + assertEquals(actual.getSpilledDataSizeInBytes(), 1 * 25); + assertNotNull(actual.getInfo()); + + RuntimeMetric expectedMetric = TEST_RUNTIME_METRIC_1; + assertRuntimeMetricEquals(actual.getRuntimeStats().getMetric(TEST_METRIC_NAME), expectedMetric); + + DynamicFilterStats expectedDynamicFilterStats = TEST_DYNAMIC_FILTER_STATS_1; + assertEquals(actual.getDynamicFilterStats().getProducerNodeIds(), TEST_DYNAMIC_FILTER_STATS_1.getProducerNodeIds()); + } + + @Test + public void testAddMixedStartingWithNonMergeable() + { + OperatorStats actual = OperatorStats.merge(ImmutableList.of(NON_MERGEABLE, MERGEABLE, MERGEABLE)).get(); assertEquals(actual.getStageId(), 0); assertEquals(actual.getStageExecutionId(), 10); @@ -262,6 +370,60 @@ public void testAdd() assertEquals(actual.getFinishCpu(), new Duration(3 * 18, NANOSECONDS)); assertEquals(actual.getFinishAllocationInBytes(), 3 * 345); + assertEquals(actual.getUserMemoryReservationInBytes(), Long.MAX_VALUE); + assertEquals(actual.getRevocableMemoryReservationInBytes(), 3 * 20); + assertEquals(actual.getSystemMemoryReservationInBytes(), 3 * 21); + assertEquals(actual.getPeakUserMemoryReservationInBytes(), 22); + assertEquals(actual.getPeakSystemMemoryReservationInBytes(), 23); + assertEquals(actual.getPeakTotalMemoryReservationInBytes(), 24); + assertEquals(actual.getSpilledDataSizeInBytes(), 3 * 25); + assertNull(actual.getInfo()); + + RuntimeMetric expectedMetric = RuntimeMetric.merge(TEST_RUNTIME_METRIC_1, TEST_RUNTIME_METRIC_2); + expectedMetric.mergeWith(TEST_RUNTIME_METRIC_2); + assertRuntimeMetricEquals(actual.getRuntimeStats().getMetric(TEST_METRIC_NAME), expectedMetric); + + DynamicFilterStats expectedDynamicFilterStats = DynamicFilterStats.copyOf(TEST_DYNAMIC_FILTER_STATS_1); + expectedDynamicFilterStats.mergeWith(TEST_DYNAMIC_FILTER_STATS_2); + assertEquals(actual.getDynamicFilterStats().getProducerNodeIds(), TEST_DYNAMIC_FILTER_STATS_1.getProducerNodeIds()); + } + + @Test + public void testAddNonMergeable() + { + OperatorStats actual = OperatorStats.merge(ImmutableList.of(NON_MERGEABLE, NON_MERGEABLE, NON_MERGEABLE)).get(); + + assertEquals(actual.getStageId(), 0); + assertEquals(actual.getStageExecutionId(), 10); + assertEquals(actual.getOperatorId(), 41); + assertEquals(actual.getOperatorType(), "test"); + + assertEquals(actual.getTotalDrivers(), 3); + assertEquals(actual.getAddInputCalls(), 3 * 2); + assertEquals(actual.getAddInputWall(), new Duration(3 * 3, NANOSECONDS)); + assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); + assertEquals(actual.getAddInputAllocationInBytes(), 3 * 123); + assertEquals(actual.getRawInputDataSizeInBytes(), 3 * 5); + assertEquals(actual.getInputDataSizeInBytes(), 3 * 6); + assertEquals(actual.getInputPositions(), 3 * 7); + assertEquals(actual.getSumSquaredInputPositions(), 3 * 8.0); + + assertEquals(actual.getGetOutputCalls(), 3 * 9); + assertEquals(actual.getGetOutputWall(), new Duration(3 * 10, NANOSECONDS)); + assertEquals(actual.getGetOutputCpu(), new Duration(3 * 11, NANOSECONDS)); + assertEquals(actual.getGetOutputAllocationInBytes(), 3 * 234); + assertEquals(actual.getOutputDataSizeInBytes(), 3 * 12); + assertEquals(actual.getOutputPositions(), 3 * 13); + + assertEquals(actual.getPhysicalWrittenDataSizeInBytes(), 3 * 14); + assertEquals(actual.getAdditionalCpu(), new Duration(3 * 100, NANOSECONDS)); + assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS)); + + assertEquals(actual.getFinishCalls(), 3 * 16); + assertEquals(actual.getFinishWall(), new Duration(3 * 17, NANOSECONDS)); + assertEquals(actual.getFinishCpu(), new Duration(3 * 18, NANOSECONDS)); + assertEquals(actual.getFinishAllocationInBytes(), 3 * 345); + assertEquals(actual.getUserMemoryReservationInBytes(), Long.MAX_VALUE); assertEquals(actual.getRevocableMemoryReservationInBytes(), 3 * 20); assertEquals(actual.getSystemMemoryReservationInBytes(), 3 * 21); @@ -277,16 +439,16 @@ public void testAdd() } @Test - public void testAddMergeable() + public void testMergeWithMergeableInfo() { - OperatorStats actual = MERGEABLE.add(ImmutableList.of(MERGEABLE, MERGEABLE)); + OperatorStats actual = OperatorStats.merge(ImmutableList.of(MERGEABLE, MERGEABLE, MERGEABLE)).get(); assertEquals(actual.getStageId(), 0); assertEquals(actual.getStageExecutionId(), 10); assertEquals(actual.getOperatorId(), 41); assertEquals(actual.getOperatorType(), "test"); - assertEquals(actual.getTotalDrivers(), 3 * 1); + assertEquals(actual.getTotalDrivers(), 3); assertEquals(actual.getAddInputCalls(), 3 * 2); assertEquals(actual.getAddInputWall(), new Duration(3 * 3, NANOSECONDS)); assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); @@ -327,4 +489,11 @@ public void testAddMergeable() assertRuntimeMetricEquals(actual.getRuntimeStats().getMetric(TEST_METRIC_NAME), expectedMetric); assertEquals(actual.getDynamicFilterStats().getProducerNodeIds(), TEST_DYNAMIC_FILTER_STATS_2.getProducerNodeIds()); } + + @Test + public void testMergeEmptyCollection() + { + Optional merged = OperatorStats.merge(ImmutableList.of()); + assertFalse(merged.isPresent()); + } } diff --git a/presto-main-base/src/test/java/com/facebook/presto/operator/TestPipelineStats.java b/presto-main-base/src/test/java/com/facebook/presto/operator/TestPipelineStats.java index 03482c9e55535..6b7ac01659d9f 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/operator/TestPipelineStats.java +++ b/presto-main-base/src/test/java/com/facebook/presto/operator/TestPipelineStats.java @@ -74,7 +74,7 @@ public class TestPipelineStats 20, - ImmutableList.of(TestOperatorStats.EXPECTED), + ImmutableList.of(TestOperatorStats.NON_MERGEABLE), ImmutableList.of(TestDriverStats.EXPECTED)); @Test