diff --git a/presto-benchmark/src/main/java/com/facebook/presto/benchmark/AbstractOperatorBenchmark.java b/presto-benchmark/src/main/java/com/facebook/presto/benchmark/AbstractOperatorBenchmark.java index 71e4d6a0d30c3..a7c6bd17da432 100644 --- a/presto-benchmark/src/main/java/com/facebook/presto/benchmark/AbstractOperatorBenchmark.java +++ b/presto-benchmark/src/main/java/com/facebook/presto/benchmark/AbstractOperatorBenchmark.java @@ -34,6 +34,7 @@ import com.facebook.presto.operator.OperatorFactory; import com.facebook.presto.operator.PageSourceOperator; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.TaskStats; import com.facebook.presto.operator.project.InputPageProjection; import com.facebook.presto.operator.project.PageProcessor; @@ -66,6 +67,7 @@ import java.util.Optional; import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.airlift.stats.CpuTimer.CpuDuration; import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount; import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageSize; @@ -282,9 +284,12 @@ protected Map runOnce() localQueryRunner.getExecutor(), localQueryRunner.getScheduler(), new DataSize(256, MEGABYTE), - spillSpaceTracker) - .addTaskContext(new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()), + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class)) + .addTaskContext( + new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()), session, + Optional.empty(), false, false, false, diff --git a/presto-benchmark/src/test/java/com/facebook/presto/benchmark/MemoryLocalQueryRunner.java b/presto-benchmark/src/test/java/com/facebook/presto/benchmark/MemoryLocalQueryRunner.java index 88930384d0947..cdb642ed56c86 100644 --- a/presto-benchmark/src/test/java/com/facebook/presto/benchmark/MemoryLocalQueryRunner.java +++ b/presto-benchmark/src/test/java/com/facebook/presto/benchmark/MemoryLocalQueryRunner.java @@ -24,6 +24,7 @@ import com.facebook.presto.metadata.Metadata; import com.facebook.presto.operator.Driver; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.plugin.memory.MemoryConnectorFactory; import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.QueryId; @@ -42,6 +43,7 @@ import java.util.Map; import java.util.Optional; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static org.testng.Assert.assertTrue; @@ -86,11 +88,14 @@ public List execute(@Language("SQL") String query) localQueryRunner.getExecutor(), localQueryRunner.getScheduler(), new DataSize(4, GIGABYTE), - spillSpaceTracker); + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class)); TaskContext taskContext = queryContext - .addTaskContext(new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()), + .addTaskContext( + new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()), localQueryRunner.getDefaultSession(), + Optional.empty(), false, false, false, diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 7bd981a7e0275..af3e41cb369be 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -196,6 +196,7 @@ public final class SystemSessionProperties public static final String PARTIAL_RESULTS_COMPLETION_RATIO_THRESHOLD = "partial_results_completion_ratio_threshold"; public static final String PARTIAL_RESULTS_MAX_EXECUTION_TIME_MULTIPLIER = "partial_results_max_execution_time_multiplier"; public static final String OFFSET_CLAUSE_ENABLED = "offset_clause_enabled"; + public static final String VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED = "verbose_exceeded_memory_limit_errors_enabled"; private final List> sessionProperties; @@ -1048,6 +1049,11 @@ public SystemSessionProperties( PARTIAL_RESULTS_MAX_EXECUTION_TIME_MULTIPLIER, "This value is multiplied by the time taken to reach the completion ratio threshold and is set as max task end time", featuresConfig.getPartialResultsMaxExecutionTimeMultiplier(), + false), + booleanProperty( + VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, + "When enabled the error message for exceeded memory limit errors will contain additional operator memory allocation details", + nodeMemoryConfig.isVerboseExceededMemoryLimitErrorsEnabled(), false)); } @@ -1768,4 +1774,9 @@ public static boolean isOffsetClauseEnabled(Session session) { return session.getSystemProperty(OFFSET_CLAUSE_ENABLED, Boolean.class); } + + public static boolean isVerboseExceededMemoryLimitErrorsEnabled(Session session) + { + return session.getSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, Boolean.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecutionFactory.java index f192c7a722c56..0a5f2c4e9dcbd 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecutionFactory.java @@ -30,8 +30,10 @@ import com.facebook.presto.sql.planner.PlanFragment; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; +import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.execution.SqlTaskExecution.createSqlTaskExecution; import static com.google.common.base.Throwables.throwIfUnchecked; import static java.util.Objects.requireNonNull; @@ -88,6 +90,8 @@ public SqlTaskExecution create( TaskContext taskContext = queryContext.addTaskContext( taskStateMachine, session, + // Plan has to be retained only if verbose memory exceeded errors are requested + isVerboseExceededMemoryLimitErrorsEnabled(session) ? Optional.of(fragment.getRoot()) : Optional.empty(), perOperatorCpuTimerEnabled, cpuTimerEnabled, perOperatorAllocationTrackingEnabled, diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java index 8e9faabf5fff0..31160162e12b9 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java @@ -14,6 +14,7 @@ package com.facebook.presto.execution; import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean; +import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; import com.facebook.airlift.node.NodeInfo; import com.facebook.airlift.stats.CounterStat; @@ -37,6 +38,7 @@ import com.facebook.presto.metadata.MetadataUpdates; import com.facebook.presto.operator.ExchangeClientSupplier; import com.facebook.presto.operator.FragmentResultCacheManager; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.connector.ConnectorMetadataUpdater; @@ -77,6 +79,7 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.SystemSessionProperties.resourceOvercommit; import static com.facebook.presto.execution.SqlTask.createSqlTask; import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL; @@ -107,6 +110,7 @@ public class SqlTaskManager private final Duration clientTimeout; private final LocalMemoryManager localMemoryManager; + private final JsonCodec> memoryReservationSummaryJsonCodec; private final LoadingCache queryContexts; private final LoadingCache tasks; @@ -126,6 +130,7 @@ public SqlTaskManager( SplitMonitor splitMonitor, NodeInfo nodeInfo, LocalMemoryManager localMemoryManager, + JsonCodec> memoryReservationSummaryJsonCodec, TaskManagementExecutor taskManagementExecutor, TaskManagerConfig config, NodeMemoryConfig nodeMemoryConfig, @@ -162,6 +167,7 @@ public SqlTaskManager( config); this.localMemoryManager = requireNonNull(localMemoryManager, "localMemoryManager is null"); + this.memoryReservationSummaryJsonCodec = requireNonNull(memoryReservationSummaryJsonCodec, "memoryReservationSummaryJsonCodec is null"); DataSize maxQueryUserMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode(); DataSize maxQueryTotalMemoryPerNode = nodeMemoryConfig.getMaxQueryTotalMemoryPerNode(); DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode(); @@ -213,7 +219,8 @@ private QueryContext createQueryContext( taskNotificationExecutor, driverYieldExecutor, maxQuerySpillPerNode, - localSpillManager.getSpillSpaceTracker()); + localSpillManager.getSpillSpaceTracker(), + memoryReservationSummaryJsonCodec); } @Override @@ -409,6 +416,8 @@ public TaskInfo updateTask( } } + queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session)); + sqlTask.recordHeartbeat(); return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo); } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/NodeMemoryConfig.java b/presto-main/src/main/java/com/facebook/presto/memory/NodeMemoryConfig.java index 5ebe668e64fba..22190441c2b0d 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/NodeMemoryConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/NodeMemoryConfig.java @@ -42,6 +42,8 @@ public class NodeMemoryConfig private DataSize softMaxQueryTotalMemoryPerNode; private DataSize heapHeadroom = new DataSize(AVAILABLE_HEAP_MEMORY * 0.3, BYTE); + private boolean verboseExceededMemoryLimitErrorsEnabled; + @NotNull public DataSize getMaxQueryBroadcastMemory() { @@ -143,4 +145,17 @@ public NodeMemoryConfig setHeapHeadroom(DataSize heapHeadroom) this.heapHeadroom = heapHeadroom; return this; } + + public boolean isVerboseExceededMemoryLimitErrorsEnabled() + { + return verboseExceededMemoryLimitErrorsEnabled; + } + + @Config("memory.verbose-exceeded-memory-limit-errors-enabled") + @ConfigDescription("When enabled the error message for exceeded memory limit errors will contain additional operator memory allocation details") + public NodeMemoryConfig setVerboseExceededMemoryLimitErrorsEnabled(boolean verboseExceededMemoryLimitErrorsEnabled) + { + this.verboseExceededMemoryLimitErrorsEnabled = verboseExceededMemoryLimitErrorsEnabled; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java index e01b87e92f852..74043c9fb2575 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.memory; +import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.stats.GcMonitor; import com.facebook.presto.Session; import com.facebook.presto.execution.TaskId; @@ -20,7 +21,9 @@ import com.facebook.presto.memory.context.MemoryReservationHandler; import com.facebook.presto.memory.context.MemoryTrackingContext; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; +import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spiller.SpillSpaceTracker; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -35,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -52,11 +56,13 @@ import static com.facebook.presto.operator.Operator.NOT_BLOCKED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.airlift.units.DataSize.succinctBytes; import static java.lang.String.format; +import static java.util.Comparator.comparing; import static java.util.Map.Entry.comparingByValue; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -72,6 +78,7 @@ public class QueryContext private final ScheduledExecutorService yieldExecutor; private final long maxSpill; private final SpillSpaceTracker spillSpaceTracker; + private final JsonCodec> memoryReservationSummaryJsonCodec; private final Map taskContexts = new ConcurrentHashMap<>(); @GuardedBy("this") @@ -103,6 +110,9 @@ public class QueryContext @GuardedBy("this") private long spillUsed; + @GuardedBy("this") + private boolean verboseExceededMemoryLimitErrorsEnabled; + public QueryContext( QueryId queryId, DataSize maxUserMemory, @@ -114,7 +124,8 @@ public QueryContext( Executor notificationExecutor, ScheduledExecutorService yieldExecutor, DataSize maxSpill, - SpillSpaceTracker spillSpaceTracker) + SpillSpaceTracker spillSpaceTracker, + JsonCodec> memoryReservationSummaryJsonCodec) { this.queryId = requireNonNull(queryId, "queryId is null"); this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null").toBytes(); @@ -127,6 +138,7 @@ public QueryContext( this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); this.maxSpill = requireNonNull(maxSpill, "maxSpill is null").toBytes(); this.spillSpaceTracker = requireNonNull(spillSpaceTracker, "spillSpaceTracker is null"); + this.memoryReservationSummaryJsonCodec = requireNonNull(memoryReservationSummaryJsonCodec, "memoryReservationSummaryJsonCodec is null"); this.queryMemoryContext = new MemoryTrackingContext( newRootAggregatedMemoryContext(new QueryMemoryReservationHandler(this::updateUserMemory, this::tryUpdateUserMemory, this::updateBroadcastMemory, this::tryUpdateBroadcastMemory), GUARANTEED_MEMORY), newRootAggregatedMemoryContext(new QueryMemoryReservationHandler(this::updateRevocableMemory, this::tryReserveMemoryNotSupported, this::updateBroadcastMemory, this::tryUpdateBroadcastMemory), 0L), @@ -325,6 +337,7 @@ public long getMaxTotalMemory() public TaskContext addTaskContext( TaskStateMachine taskStateMachine, Session session, + Optional taskPlan, boolean perOperatorCpuTimerEnabled, boolean cpuTimerEnabled, boolean perOperatorAllocationTrackingEnabled, @@ -339,6 +352,7 @@ public TaskContext addTaskContext( yieldExecutor, session, queryMemoryContext.newMemoryTrackingContext(), + taskPlan, perOperatorCpuTimerEnabled, cpuTimerEnabled, perOperatorAllocationTrackingEnabled, @@ -388,6 +402,11 @@ public synchronized void setMemoryLimits(DataSize queryMaxTaskMemory, DataSize q memoryLimitsInitialized = true; } + public synchronized void setVerboseExceededMemoryLimitErrorsEnabled(boolean verboseExceededMemoryLimitErrorsEnabled) + { + this.verboseExceededMemoryLimitErrorsEnabled = verboseExceededMemoryLimitErrorsEnabled; + } + public synchronized long getPeakNodeTotalMemory() { return peakNodeTotalMemory; @@ -496,6 +515,17 @@ public String getAdditionalFailureInfo(long allocated, long delta) .collect(toImmutableMap(Entry::getKey, e -> succinctBytes(e.getValue()))) .toString(); - return format("%s, Top Consumers: %s", additionalInfo, topConsumers); + String message = format("%s, Top Consumers: %s", additionalInfo, topConsumers); + + if (verboseExceededMemoryLimitErrorsEnabled) { + List memoryReservationSummaries = taskContexts.values().stream() + .map(TaskContext::getMemoryReservationSummary) + .filter(summary -> summary.getReservation().toBytes() > 0) + .sorted(comparing(TaskMemoryReservationSummary::getReservation).reversed()) + .limit(3) + .collect(toImmutableList()); + message += ", Details: " + memoryReservationSummaryJsonCodec.toJson(memoryReservationSummaries); + } + return message; } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java index 45b4afffbee23..2998d00a86413 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java @@ -92,6 +92,8 @@ public class OperatorContext private final AtomicLong peakTotalMemoryReservation = new AtomicLong(); private final RuntimeStats runtimeStats = new RuntimeStats(); + private final AtomicLong currentTotalMemoryReservationInBytes = new AtomicLong(); + @GuardedBy("this") private boolean memoryRevokingRequested; @@ -129,6 +131,11 @@ public int getOperatorId() return operatorId; } + public PlanNodeId getPlanNodeId() + { + return planNodeId; + } + public String getOperatorType() { return operatorType; @@ -305,6 +312,12 @@ private void updatePeakMemoryReservations() peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max); peakSystemMemoryReservation.accumulateAndGet(systemMemory, Math::max); peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max); + currentTotalMemoryReservationInBytes.set(totalMemory); + } + + public long getCurrentTotalMemoryReservationInBytes() + { + return currentTotalMemoryReservationInBytes.get(); } // listen to revocable memory allocations and call any listeners waiting on task memory allocation diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorMemoryReservationSummary.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorMemoryReservationSummary.java new file mode 100644 index 0000000000000..eb8557f563074 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorMemoryReservationSummary.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator; + +import com.facebook.presto.spi.plan.PlanNodeId; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class OperatorMemoryReservationSummary +{ + private final String type; + private final PlanNodeId planNodeId; + private final List reservations; + private final DataSize total; + private final Optional info; + + @JsonCreator + public OperatorMemoryReservationSummary( + @JsonProperty("type") String type, + @JsonProperty("planNodeId") PlanNodeId planNodeId, + @JsonProperty("reservations") List reservations, + @JsonProperty("total") DataSize total, + @JsonProperty("info") Optional info) + { + this.type = requireNonNull(type, "type is null"); + this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); + this.reservations = ImmutableList.copyOf(requireNonNull(reservations, "reservations is null")); + this.total = requireNonNull(total, "total is null"); + this.info = requireNonNull(info, "info is null"); + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public PlanNodeId getPlanNodeId() + { + return planNodeId; + } + + @JsonProperty + public List getReservations() + { + return reservations; + } + + @JsonProperty + public DataSize getTotal() + { + return total; + } + + @JsonProperty + public Optional getInfo() + { + return info; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java index 61c28bdce0f97..1585153f00eae 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java @@ -24,11 +24,18 @@ import com.facebook.presto.execution.buffer.LazyOutputBuffer; import com.facebook.presto.memory.QueryContext; import com.facebook.presto.memory.QueryContextVisitor; +import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.memory.context.MemoryTrackingContext; +import com.facebook.presto.spi.plan.AggregationNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.sql.planner.plan.JoinNode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.DataSize; @@ -38,7 +45,10 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -46,13 +56,20 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Iterables.getLast; import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Sets.newConcurrentHashSet; import static io.airlift.units.DataSize.Unit.BYTE; +import static io.airlift.units.DataSize.succinctBytes; import static java.lang.Math.max; import static java.lang.Math.toIntExact; +import static java.util.Comparator.comparing; +import static java.util.Comparator.reverseOrder; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -113,6 +130,8 @@ public class TaskContext private final TaskMetadataContext taskMetadataContext; + private final Optional taskPlan; + public static TaskContext createTaskContext( QueryContext queryContext, TaskStateMachine taskStateMachine, @@ -121,6 +140,7 @@ public static TaskContext createTaskContext( ScheduledExecutorService yieldExecutor, Session session, MemoryTrackingContext taskMemoryContext, + Optional taskPlan, boolean perOperatorCpuTimerEnabled, boolean cpuTimerEnabled, boolean perOperatorAllocationTrackingEnabled, @@ -135,6 +155,7 @@ public static TaskContext createTaskContext( yieldExecutor, session, taskMemoryContext, + taskPlan, perOperatorCpuTimerEnabled, cpuTimerEnabled, perOperatorAllocationTrackingEnabled, @@ -144,13 +165,15 @@ public static TaskContext createTaskContext( return taskContext; } - private TaskContext(QueryContext queryContext, + private TaskContext( + QueryContext queryContext, TaskStateMachine taskStateMachine, GcMonitor gcMonitor, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session, MemoryTrackingContext taskMemoryContext, + Optional taskPlan, boolean perOperatorCpuTimerEnabled, boolean cpuTimerEnabled, boolean perOperatorAllocationTrackingEnabled, @@ -164,6 +187,7 @@ private TaskContext(QueryContext queryContext, this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); this.session = session; this.taskMemoryContext = requireNonNull(taskMemoryContext, "taskMemoryContext is null"); + this.taskPlan = requireNonNull(taskPlan, "taskPlan is null"); // Initialize the local memory contexts with the LazyOutputBuffer tag as LazyOutputBuffer will do the local memory allocations taskMemoryContext.initializeLocalMemoryContexts(LazyOutputBuffer.class.getSimpleName()); this.perOperatorCpuTimerEnabled = perOperatorCpuTimerEnabled; @@ -603,4 +627,113 @@ public QueryContext getQueryContext() { return queryContext; } + + public TaskMemoryReservationSummary getMemoryReservationSummary() + { + List operatorMemoryReservations = getOperatorMemoryReservations(); + long totalTaskMemoryReservationInBytes = operatorMemoryReservations.stream() + .map(OperatorMemoryReservationSummary::getTotal) + .mapToLong(DataSize::toBytes) + .sum(); + List topConsumers = operatorMemoryReservations.stream() + .filter(summary -> summary.getTotal().toBytes() > 0) + .sorted(comparing(OperatorMemoryReservationSummary::getTotal).reversed()) + .limit(3) + .collect(toImmutableList()); + return new TaskMemoryReservationSummary( + getShortTaskId(getTaskId()), + succinctBytes(totalTaskMemoryReservationInBytes), + topConsumers); + } + + /** + * Short task id representation doesn't include the query id + */ + private static String getShortTaskId(TaskId taskId) + { + return taskId.getStageExecutionId().getStageId().getId() + "." + taskId.getStageExecutionId().getId() + "." + taskId.getId(); + } + + private List getOperatorMemoryReservations() + { + ListMultimap, OperatorContext> operatorContexts = ArrayListMultimap.create(); + accept(new VoidTraversingQueryContextVisitor() + { + @Override + public Void visitOperatorContext(OperatorContext operatorContext, Void nothing) + { + operatorContexts.put( + ImmutableList.of(operatorContext.getDriverContext().getPipelineContext().getPipelineId(), operatorContext.getOperatorId()), + operatorContext); + return null; + } + }, null); + ImmutableList.Builder result = ImmutableList.builder(); + for (Collection operators : operatorContexts.asMap().values()) { + OperatorContext lastContext = getLast(operators); + long totalOperatorMemoryReservationInBytes = 0; + List reservations = new ArrayList<>(); + for (OperatorContext context : operators) { + long reservedTotalMemoryInBytes = context.getCurrentTotalMemoryReservationInBytes(); + totalOperatorMemoryReservationInBytes += reservedTotalMemoryInBytes; + reservations.add(succinctBytes(reservedTotalMemoryInBytes)); + } + reservations.sort(reverseOrder()); + result.add(new OperatorMemoryReservationSummary( + lastContext.getOperatorType(), + lastContext.getPlanNodeId(), + ImmutableList.copyOf(reservations), + succinctBytes(totalOperatorMemoryReservationInBytes), + getAdditionalOperatorInfo(lastContext))); + } + + return result.build(); + } + + private Optional getAdditionalOperatorInfo(OperatorContext context) + { + if (!taskPlan.isPresent()) { + return Optional.empty(); + } + + if (context.getOperatorType().equals(HashBuilderOperator.class.getSimpleName())) { + Optional planNode = findPlanNode(context.getPlanNodeId(), JoinNode.class); + if (!planNode.isPresent()) { + return Optional.empty(); + } + String info = planNode.get().getType().toString() + ";"; + if (planNode.get().getDistributionType().isPresent()) { + info += planNode.get().getDistributionType().get() + ";"; + } + return Optional.of(info); + } + + if (context.getOperatorType().equals(HashAggregationOperator.class.getSimpleName()) || + context.getOperatorType().equals(AggregationOperator.class.getSimpleName())) { + Optional planNode = findPlanNode(context.getPlanNodeId(), AggregationNode.class); + if (!planNode.isPresent()) { + return Optional.empty(); + } + boolean isDistinct = planNode.get().getAggregations().values().stream().anyMatch(AggregationNode.Aggregation::isDistinct); + boolean isOrderBy = planNode.get().getAggregations().values().stream().anyMatch(aggregation -> aggregation.getOrderBy().isPresent()); + String info = planNode.get().getStep() + ";"; + if (isDistinct) { + info += "DISTINCT;"; + } + if (isOrderBy) { + info += "ORDER_BY;"; + } + return Optional.of(info); + } + + return Optional.empty(); + } + + private Optional findPlanNode(PlanNodeId planNodeId, Class nodeType) + { + checkState(taskPlan.isPresent(), "taskPlan is expected to be present"); + return searchFrom(taskPlan.get()) + .where(node -> node.getId().equals(planNodeId) && nodeType.isInstance(node)) + .findSingle(); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskMemoryReservationSummary.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskMemoryReservationSummary.java new file mode 100644 index 0000000000000..b6ce6dcce05d7 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskMemoryReservationSummary.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class TaskMemoryReservationSummary +{ + private final String taskId; + private final DataSize reservation; + private final List topConsumers; + + @JsonCreator + public TaskMemoryReservationSummary( + @JsonProperty("taskId") String taskId, + @JsonProperty("reservation") DataSize reservation, + @JsonProperty("topConsumers") List topConsumers) + { + this.taskId = requireNonNull(taskId, "taskId is null"); + this.reservation = requireNonNull(reservation, "reservation is null"); + this.topConsumers = ImmutableList.copyOf(requireNonNull(topConsumers, "topConsumers is null")); + } + + @JsonProperty + public String getTaskId() + { + return taskId; + } + + @JsonProperty + public DataSize getReservation() + { + return reservation; + } + + @JsonProperty + public List getTopConsumers() + { + return topConsumers; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 7ba84f0b4c055..bf12d2edab4da 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -109,6 +109,7 @@ import com.facebook.presto.operator.OperatorStats; import com.facebook.presto.operator.PagesIndex; import com.facebook.presto.operator.TableCommitContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.index.IndexJoinLookupStats; import com.facebook.presto.resourcemanager.ClusterMemoryManagerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; @@ -350,6 +351,7 @@ else if (serverConfig.isCoordinator()) { thriftCodecBinder(binder).bindCustomThriftCodec(SqlInvokedFunctionCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(SqlFunctionIdCodec.class); + jsonCodecBinder(binder).bindListJsonCodec(TaskMemoryReservationSummary.class); binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON); binder.bind(TaskManager.class).to(Key.get(SqlTaskManager.class)); binder.bind(SpoolingOutputBufferFactory.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 4780d3ee1dbea..bc26f22a6ea73 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -220,6 +220,8 @@ import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.json.JsonCodec.jsonCodec; +import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.cost.StatsCalculatorModule.createNewStatsCalculator; import static com.facebook.presto.execution.scheduler.StreamingPlanSection.extractStreamingSections; import static com.facebook.presto.execution.scheduler.TableWriteInfo.createTableWriteInfo; @@ -717,12 +719,15 @@ private MaterializedResultWithPlan executeInternal(Session session, @Language("S return builder.get()::page; }); + Plan plan = createPlan(session, sql, warningCollector); + TaskContext taskContext = TestingTaskContext.builder(notificationExecutor, yieldExecutor, session) .setMaxSpillSize(nodeSpillConfig.getMaxSpillPerNode()) .setQueryMaxSpillSize(nodeSpillConfig.getQueryMaxSpillPerNode()) + .setQueryMaxTotalMemory(getQueryMaxTotalMemoryPerNode(session)) + .setTaskPlan(plan.getRoot()) .build(); - - Plan plan = createPlan(session, sql, warningCollector); + taskContext.getQueryContext().setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session)); List drivers = createDrivers(session, plan, outputFactory, taskContext); drivers.forEach(closer::register); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/TestingTaskContext.java b/presto-main/src/main/java/com/facebook/presto/testing/TestingTaskContext.java index ddc78e9eae08c..b6e2a6c8a1b46 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/TestingTaskContext.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/TestingTaskContext.java @@ -21,14 +21,18 @@ import com.facebook.presto.memory.MemoryPool; import com.facebook.presto.memory.QueryContext; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.MemoryPoolId; +import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spiller.SpillSpaceTracker; import io.airlift.units.DataSize; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; @@ -60,14 +64,23 @@ public static TaskContext createTaskContext(Executor notificationExecutor, Sched public static TaskContext createTaskContext(QueryContext queryContext, Executor executor, Session session) { - return createTaskContext(queryContext, session, new TaskStateMachine(new TaskId("query", 0, 0, 0), executor)); + return createTaskContext( + queryContext, + session, + new TaskStateMachine(new TaskId("query", 0, 0, 0), executor), + Optional.empty()); } - private static TaskContext createTaskContext(QueryContext queryContext, Session session, TaskStateMachine taskStateMachine) + private static TaskContext createTaskContext( + QueryContext queryContext, + Session session, + TaskStateMachine taskStateMachine, + Optional taskPlan) { return queryContext.addTaskContext( taskStateMachine, session, + taskPlan, true, true, true, @@ -93,6 +106,7 @@ public static class Builder private DataSize maxSpillSize = new DataSize(1, GIGABYTE); private DataSize maxRevocableMemory = new DataSize(1, GIGABYTE); private DataSize queryMaxSpillSize = new DataSize(1, GIGABYTE); + private Optional taskPlan = Optional.empty(); private Builder(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session) { @@ -144,6 +158,12 @@ public Builder setQueryId(QueryId queryId) return this; } + public Builder setTaskPlan(PlanNode taskPlan) + { + this.taskPlan = Optional.of(taskPlan); + return this; + } + public TaskContext build() { MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("test"), memoryPoolSize); @@ -159,9 +179,10 @@ public TaskContext build() notificationExecutor, yieldExecutor, queryMaxSpillSize, - spillSpaceTracker); + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class)); - return createTaskContext(queryContext, session, taskStateMachine); + return createTaskContext(queryContext, session, taskStateMachine, taskPlan); } } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java index 94881de7bd3d3..94c37a7bc3faf 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java @@ -30,6 +30,7 @@ import com.facebook.presto.metadata.Split; import com.facebook.presto.operator.StageExecutionDescriptor; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.TaskStats; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.TableHandle; @@ -77,6 +78,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.execution.StateMachine.StateChangeListener; @@ -215,10 +217,12 @@ public MockRemoteTask(TaskId taskId, executor, scheduledExecutor, new DataSize(1, MEGABYTE), - spillSpaceTracker); + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class)); this.taskContext = queryContext.addTaskContext( taskStateMachine, TEST_SESSION, + Optional.of(fragment.getRoot()), true, true, true, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java index c2d521604f42e..4cbfb57e3fb9d 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java @@ -30,6 +30,7 @@ import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.PipelineContext; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.plan.PlanNodeId; @@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.execution.SqlTask.createSqlTask; import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR; @@ -811,7 +813,8 @@ private QueryContext getOrCreateQueryContext(QueryId queryId, MemoryPool memoryP singleThreadedExecutor, scheduledExecutor, new DataSize(1, GIGABYTE), - spillSpaceTracker)); + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class))); } private TaskContext getOrCreateTaskContext(SqlTask sqlTask) diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java index 4e26a0d2ae0b4..633ef7507dd74 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java @@ -26,6 +26,7 @@ import com.facebook.presto.execution.scheduler.TableWriteInfo; import com.facebook.presto.memory.MemoryPool; import com.facebook.presto.memory.QueryContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spiller.SpillSpaceTracker; @@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.execution.SqlTask.createSqlTask; import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR; @@ -319,9 +321,18 @@ public SqlTask createInitialTask() taskNotificationExecutor, driverYieldExecutor, new DataSize(1, MEGABYTE), - new SpillSpaceTracker(new DataSize(1, GIGABYTE))); - - queryContext.addTaskContext(new TaskStateMachine(taskId, taskNotificationExecutor), testSessionBuilder().build(), false, false, false, false, false); + new SpillSpaceTracker(new DataSize(1, GIGABYTE)), + listJsonCodec(TaskMemoryReservationSummary.class)); + + queryContext.addTaskContext( + new TaskStateMachine(taskId, taskNotificationExecutor), + testSessionBuilder().build(), + Optional.of(PLAN_FRAGMENT.getRoot()), + false, + false, + false, + false, + false); return createSqlTask( taskId, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java index 5b2b8c0595d8e..c9c0e73e48972 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskExecution.java @@ -38,6 +38,7 @@ import com.facebook.presto.operator.SourceOperatorFactory; import com.facebook.presto.operator.StageExecutionDescriptor; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.TaskOutputOperator.TaskOutputOperatorFactory; import com.facebook.presto.operator.ValuesOperator.ValuesOperatorFactory; import com.facebook.presto.spi.ConnectorId; @@ -84,11 +85,13 @@ import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.block.BlockAssertions.createStringSequenceBlock; import static com.facebook.presto.block.BlockAssertions.createStringsBlock; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR; +import static com.facebook.presto.execution.TaskTestUtils.PLAN_FRAGMENT; import static com.facebook.presto.execution.TaskTestUtils.TABLE_SCAN_NODE_ID; import static com.facebook.presto.execution.TaskTestUtils.createTestSplitMonitor; import static com.facebook.presto.execution.buffer.BufferState.OPEN; @@ -625,8 +628,17 @@ private TaskContext newTestingTaskContext(ScheduledExecutorService taskNotificat taskNotificationExecutor, driverYieldExecutor, new DataSize(1, MEGABYTE), - new SpillSpaceTracker(new DataSize(1, GIGABYTE))); - return queryContext.addTaskContext(taskStateMachine, TEST_SESSION, false, false, false, false, false); + new SpillSpaceTracker(new DataSize(1, GIGABYTE)), + listJsonCodec(TaskMemoryReservationSummary.class)); + return queryContext.addTaskContext( + taskStateMachine, + TEST_SESSION, + Optional.of(PLAN_FRAGMENT.getRoot()), + false, + false, + false, + false, + false); } private PartitionedOutputBuffer newTestingOutputBuffer(ScheduledExecutorService taskNotificationExecutor) diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java index 06678ffebdccc..7f4b8164c0e46 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java @@ -32,6 +32,7 @@ import com.facebook.presto.operator.ExchangeClient; import com.facebook.presto.operator.ExchangeClientSupplier; import com.facebook.presto.operator.NoOpFragmentResultCacheManager; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spiller.LocalSpillManager; import com.facebook.presto.spiller.NodeSpillConfig; @@ -51,6 +52,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR; import static com.facebook.presto.execution.TaskTestUtils.PLAN_FRAGMENT; @@ -288,6 +290,7 @@ public SqlTaskManager createSqlTaskManager(TaskManagerConfig config) createTestSplitMonitor(), new NodeInfo("test"), localMemoryManager, + listJsonCodec(TaskMemoryReservationSummary.class), taskManagementExecutor, config, new NodeMemoryConfig(), @@ -318,6 +321,7 @@ private TaskInfo createTask(SqlTaskManager sqlTaskManager, TaskId taskId, Output .addTaskContext( new TaskStateMachine(taskId, directExecutor()), testSessionBuilder().build(), + Optional.of(PLAN_FRAGMENT.getRoot()), false, false, false, diff --git a/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryPools.java b/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryPools.java index e3a08a02a4d59..e4b83ca6d61b4 100644 --- a/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryPools.java +++ b/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryPools.java @@ -25,6 +25,7 @@ import com.facebook.presto.operator.OutputFactory; import com.facebook.presto.operator.TableScanOperator; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.plan.PlanNodeId; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.testing.LocalQueryRunner.queryRunnerWithInitialTransaction; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static com.facebook.presto.testing.TestingTaskContext.createTaskContext; @@ -103,7 +105,8 @@ private void setUp(Supplier> driversSupplier) localQueryRunner.getExecutor(), localQueryRunner.getScheduler(), TEN_MEGABYTES, - spillSpaceTracker); + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class)); taskContext = createTaskContext(queryContext, localQueryRunner.getExecutor(), session); drivers = driversSupplier.get(); } diff --git a/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryTracking.java b/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryTracking.java index afdcddde06d9e..cce51de79768d 100644 --- a/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryTracking.java +++ b/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryTracking.java @@ -26,6 +26,7 @@ import com.facebook.presto.operator.PipelineContext; import com.facebook.presto.operator.PipelineStats; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.TaskStats; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.MemoryPoolId; @@ -37,12 +38,15 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import java.util.regex.Pattern; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; +import static com.facebook.presto.execution.TaskTestUtils.PLAN_FRAGMENT; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static java.lang.String.format; @@ -108,10 +112,12 @@ public void setUpTest() notificationExecutor, yieldExecutor, queryMaxSpillSize, - spillSpaceTracker); + spillSpaceTracker, + listJsonCodec(TaskMemoryReservationSummary.class)); taskContext = queryContext.addTaskContext( new TaskStateMachine(new TaskId("query", 0, 0, 0), notificationExecutor), testSessionBuilder().build(), + Optional.of(PLAN_FRAGMENT.getRoot()), true, true, true, diff --git a/presto-main/src/test/java/com/facebook/presto/memory/TestNodeMemoryConfig.java b/presto-main/src/test/java/com/facebook/presto/memory/TestNodeMemoryConfig.java index 9269929f87ef9..6c1524a6287a8 100644 --- a/presto-main/src/test/java/com/facebook/presto/memory/TestNodeMemoryConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/memory/TestNodeMemoryConfig.java @@ -39,7 +39,8 @@ public void testDefaults() .setMaxQueryTotalMemoryPerNode(new DataSize(AVAILABLE_HEAP_MEMORY * 0.3, BYTE)) .setSoftMaxQueryTotalMemoryPerNode(new DataSize(AVAILABLE_HEAP_MEMORY * 0.3, BYTE)) .setHeapHeadroom(new DataSize(AVAILABLE_HEAP_MEMORY * 0.3, BYTE)) - .setReservedPoolEnabled(true)); + .setReservedPoolEnabled(true) + .setVerboseExceededMemoryLimitErrorsEnabled(false)); } @Test @@ -53,6 +54,7 @@ public void testExplicitPropertyMappings() .put("query.soft-max-total-memory-per-node", "2GB") .put("memory.heap-headroom-per-node", "1GB") .put("experimental.reserved-pool-enabled", "false") + .put("memory.verbose-exceeded-memory-limit-errors-enabled", "true") .build(); NodeMemoryConfig expected = new NodeMemoryConfig() @@ -62,7 +64,8 @@ public void testExplicitPropertyMappings() .setMaxQueryTotalMemoryPerNode(new DataSize(3, GIGABYTE)) .setSoftMaxQueryTotalMemoryPerNode(new DataSize(2, GIGABYTE)) .setHeapHeadroom(new DataSize(1, GIGABYTE)) - .setReservedPoolEnabled(false); + .setReservedPoolEnabled(false) + .setVerboseExceededMemoryLimitErrorsEnabled(true); assertFullMapping(properties, expected); } @@ -78,6 +81,7 @@ public void testOutOfRangeBroadcastMemoryLimit() .put("query.soft-max-total-memory-per-node", "2GB") .put("memory.heap-headroom-per-node", "1GB") .put("experimental.reserved-pool-enabled", "false") + .put("memory.verbose-exceeded-memory-limit-errors-enabled", "true") .build(); NodeMemoryConfig expected = new NodeMemoryConfig() @@ -87,7 +91,8 @@ public void testOutOfRangeBroadcastMemoryLimit() .setMaxQueryTotalMemoryPerNode(new DataSize(3, GIGABYTE)) .setSoftMaxQueryTotalMemoryPerNode(new DataSize(2, GIGABYTE)) .setHeapHeadroom(new DataSize(1, GIGABYTE)) - .setReservedPoolEnabled(false); + .setReservedPoolEnabled(false) + .setVerboseExceededMemoryLimitErrorsEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/memory/TestQueryContext.java b/presto-main/src/test/java/com/facebook/presto/memory/TestQueryContext.java index 4ae5a56b217b2..f373b033143aa 100644 --- a/presto-main/src/test/java/com/facebook/presto/memory/TestQueryContext.java +++ b/presto-main/src/test/java/com/facebook/presto/memory/TestQueryContext.java @@ -21,6 +21,7 @@ import com.facebook.presto.operator.DriverContext; import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spiller.SpillSpaceTracker; @@ -32,10 +33,13 @@ import org.testng.annotations.Test; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; +import static com.facebook.presto.execution.TaskTestUtils.PLAN_FRAGMENT; import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL; import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL; import static io.airlift.units.DataSize.Unit.BYTE; @@ -86,7 +90,8 @@ public void testSetMemoryPool(boolean useReservedPool) localQueryRunner.getExecutor(), localQueryRunner.getScheduler(), new DataSize(0, BYTE), - new SpillSpaceTracker(new DataSize(0, BYTE))); + new SpillSpaceTracker(new DataSize(0, BYTE)), + listJsonCodec(TaskMemoryReservationSummary.class)); // Use memory queryContext.getQueryMemoryContext().initializeLocalMemoryContexts("test"); @@ -122,7 +127,8 @@ public void testChecksTotalMemoryOnUserMemoryAllocation() localQueryRunner.getExecutor(), localQueryRunner.getScheduler(), new DataSize(0, BYTE), - new SpillSpaceTracker(new DataSize(0, BYTE))); + new SpillSpaceTracker(new DataSize(0, BYTE)), + listJsonCodec(TaskMemoryReservationSummary.class)); queryContext.getQueryMemoryContext().initializeLocalMemoryContexts("test"); LocalMemoryContext systemMemoryContext = queryContext.getQueryMemoryContext().localSystemMemoryContext(); @@ -140,7 +146,15 @@ public void testMoveTaggedAllocations() QueryId queryId = new QueryId("query"); QueryContext queryContext = createQueryContext(queryId, generalPool); TaskStateMachine taskStateMachine = new TaskStateMachine(TaskId.valueOf("queryid.0.0.0"), TEST_EXECUTOR); - TaskContext taskContext = queryContext.addTaskContext(taskStateMachine, TEST_SESSION, false, false, false, false, false); + TaskContext taskContext = queryContext.addTaskContext( + taskStateMachine, + TEST_SESSION, + Optional.of(PLAN_FRAGMENT.getRoot()), + false, + false, + false, + false, + false); DriverContext driverContext = taskContext.addPipelineContext(0, false, false, false).addDriverContext(); OperatorContext operatorContext = driverContext.addOperatorContext(0, new PlanNodeId("test"), "test"); @@ -178,6 +192,7 @@ private static QueryContext createQueryContext(QueryId queryId, MemoryPool gener TEST_EXECUTOR, TEST_EXECUTOR, new DataSize(0, BYTE), - new SpillSpaceTracker(new DataSize(0, BYTE))); + new SpillSpaceTracker(new DataSize(0, BYTE)), + listJsonCodec(TaskMemoryReservationSummary.class)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/GroupByHashYieldAssertion.java b/presto-main/src/test/java/com/facebook/presto/operator/GroupByHashYieldAssertion.java index a13e253d37083..c760863e7e9b5 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/GroupByHashYieldAssertion.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/GroupByHashYieldAssertion.java @@ -32,6 +32,7 @@ import java.util.function.Function; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.airlift.testing.Assertions.assertBetweenInclusive; import static com.facebook.airlift.testing.Assertions.assertGreaterThan; import static com.facebook.airlift.testing.Assertions.assertLessThan; @@ -91,7 +92,8 @@ public static GroupByHashYieldResult finishOperatorWithYieldingGroupByHash(List< EXECUTOR, SCHEDULED_EXECUTOR, new DataSize(512, MEGABYTE), - new SpillSpaceTracker(new DataSize(512, MEGABYTE))); + new SpillSpaceTracker(new DataSize(512, MEGABYTE)), + listJsonCodec(TaskMemoryReservationSummary.class)); DriverContext driverContext = createTaskContext(queryContext, EXECUTOR, TEST_SESSION) .addPipelineContext(0, true, true, false) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index 907fe23045b76..0190ffafdf5ff 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -92,6 +92,7 @@ import com.facebook.presto.operator.OperatorStats; import com.facebook.presto.operator.PagesIndex; import com.facebook.presto.operator.TableCommitContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.index.IndexJoinLookupStats; import com.facebook.presto.resourcemanager.NoopResourceGroupService; import com.facebook.presto.resourcemanager.ResourceGroupService; @@ -238,6 +239,7 @@ protected void setup(Binder binder) jsonCodecBinder(binder).bindJsonCodec(QueryInfo.class); jsonCodecBinder(binder).bindJsonCodec(PrestoSparkQueryStatusInfo.class); jsonCodecBinder(binder).bindJsonCodec(PrestoSparkQueryData.class); + jsonCodecBinder(binder).bindListJsonCodec(TaskMemoryReservationSummary.class); // smile codecs smileCodecBinder(binder).bindSmileCodec(TaskSource.class); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index 2648085a9b25f..464fc32d2c653 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -44,6 +44,7 @@ import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.OutputFactory; import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.TaskStats; import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider; import com.facebook.presto.spark.PrestoSparkConfig; @@ -120,6 +121,7 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; +import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.execution.TaskState.FAILED; import static com.facebook.presto.execution.TaskStatus.STARTING_VERSION; import static com.facebook.presto.execution.buffer.BufferState.FINISHED; @@ -158,6 +160,7 @@ public class PrestoSparkTaskExecutorFactory private final JsonCodec taskDescriptorJsonCodec; private final Codec taskSourceCodec; private final Codec taskInfoCodec; + private final JsonCodec> memoryReservationSummaryJsonCodec; private final Executor notificationExecutor; private final ScheduledExecutorService yieldExecutor; @@ -194,6 +197,7 @@ public PrestoSparkTaskExecutorFactory( JsonCodec taskDescriptorJsonCodec, Codec taskSourceCodec, Codec taskInfoCodec, + JsonCodec> memoryReservationSummaryJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, ScheduledExecutorService memoryUpdateExecutor, @@ -216,6 +220,7 @@ public PrestoSparkTaskExecutorFactory( taskDescriptorJsonCodec, taskSourceCodec, taskInfoCodec, + memoryReservationSummaryJsonCodec, notificationExecutor, yieldExecutor, memoryUpdateExecutor, @@ -244,6 +249,7 @@ public PrestoSparkTaskExecutorFactory( JsonCodec taskDescriptorJsonCodec, Codec taskSourceCodec, Codec taskInfoCodec, + JsonCodec> memoryReservationSummaryJsonCodec, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, ScheduledExecutorService memoryUpdateExecutor, @@ -270,6 +276,7 @@ public PrestoSparkTaskExecutorFactory( this.taskDescriptorJsonCodec = requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null"); this.taskSourceCodec = requireNonNull(taskSourceCodec, "taskSourceCodec is null"); this.taskInfoCodec = requireNonNull(taskInfoCodec, "taskInfoCodec is null"); + this.memoryReservationSummaryJsonCodec = requireNonNull(memoryReservationSummaryJsonCodec, "memoryReservationSummaryJsonCodec is null"); this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); this.memoryUpdateExecutor = requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null"); @@ -386,12 +393,16 @@ public IPrestoSparkTaskExecutor doCreate( notificationExecutor, yieldExecutor, maxSpillMemory, - spillSpaceTracker); + spillSpaceTracker, + memoryReservationSummaryJsonCodec); + queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session)); TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, notificationExecutor); TaskContext taskContext = queryContext.addTaskContext( taskStateMachine, session, + // Plan has to be retained only if verbose memory exceeded errors are requested + isVerboseExceededMemoryLimitErrorsEnabled(session) ? Optional.of(fragment.getRoot()) : Optional.empty(), perOperatorCpuTimerEnabled, cpuTimerEnabled, perOperatorAllocationTrackingEnabled, diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkVerboseMemoryExceededErrors.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkVerboseMemoryExceededErrors.java new file mode 100644 index 0000000000000..7b08b61b9b137 --- /dev/null +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkVerboseMemoryExceededErrors.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spark; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestVerboseMemoryExceededErrors; +import com.google.common.collect.ImmutableList; + +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_TOTAL_MEMORY_PER_NODE; +import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner; + +public class TestPrestoSparkVerboseMemoryExceededErrors + extends AbstractTestVerboseMemoryExceededErrors +{ + @Override + protected QueryRunner createQueryRunner() + { + return createHivePrestoSparkQueryRunner(ImmutableList.of()); + } + + @Override + protected Session getSession() + { + return Session.builder(super.getSession()) + .setCatalog("tpch") + .setSchema("sf0.5") + .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "100MB") + .build(); + } +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestVerboseMemoryExceededErrors.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestVerboseMemoryExceededErrors.java new file mode 100644 index 0000000000000..07e7583c164b1 --- /dev/null +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestVerboseMemoryExceededErrors.java @@ -0,0 +1,143 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests; + +import com.facebook.presto.Session; +import com.facebook.presto.operator.HashAggregationOperator; +import com.facebook.presto.operator.HashBuilderOperator; +import com.facebook.presto.operator.TaskMemoryReservationSummary; +import com.facebook.presto.operator.WindowOperator; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; +import static com.facebook.presto.SystemSessionProperties.USE_MARK_DISTINCT; +import static com.facebook.presto.SystemSessionProperties.VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED; +import static java.util.regex.Pattern.DOTALL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +@Test(singleThreaded = true) +public abstract class AbstractTestVerboseMemoryExceededErrors + extends AbstractTestQueryFramework +{ + private static final int INVOCATION_COUNT = 1; + + @Override + protected Session getSession() + { + return Session.builder(super.getSession()) + .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true") + .setSystemProperty(USE_MARK_DISTINCT, "false") + .build(); + } + + @Test(invocationCount = INVOCATION_COUNT) + public void testAggregation() + { + assertMemoryExceededDetails("" + + "SELECT " + + " linenumber, " + + " ARRAY_AGG(comment)," + + " MAP_AGG(comment, comment) " + + "FROM lineitem " + + "GROUP BY linenumber", + HashAggregationOperator.class.getSimpleName(), + Optional.empty()); + assertMemoryExceededDetails("" + + "SELECT " + + " linenumber, " + + " ARRAY_AGG(DISTINCT comment)," + + " MAP_AGG(comment, comment) " + + "FROM lineitem " + + "GROUP BY linenumber", + HashAggregationOperator.class.getSimpleName(), + Optional.of("DISTINCT;")); + assertMemoryExceededDetails("" + + "SELECT " + + " linenumber, " + + " ARRAY_AGG(comment ORDER BY comment)," + + " MAP_AGG(comment, comment) " + + "FROM lineitem " + + "GROUP BY linenumber", + HashAggregationOperator.class.getSimpleName(), + Optional.of("ORDER_BY;")); + } + + @Test(invocationCount = INVOCATION_COUNT) + public void testJoin() + { + assertMemoryExceededDetails("" + + "SELECT " + + " * " + + "FROM lineitem l1 " + + "INNER JOIN lineitem l2 " + + "ON l1.linenumber = l2.linenumber " + + "WHERE l1.quantity = 1.0", + HashBuilderOperator.class.getSimpleName(), + Optional.of("INNER;")); + assertMemoryExceededDetails("" + + "SELECT " + + " * " + + "FROM (" + + " SELECT * " + + " FROM lineitem " + + " WHERE quantity = 1.0 " + + ") l1 " + + "RIGHT OUTER JOIN lineitem l2 " + + "ON l1.linenumber = l2.linenumber ", + HashBuilderOperator.class.getSimpleName(), + Optional.of("RIGHT;")); + } + + @Test(invocationCount = INVOCATION_COUNT) + public void testWindow() + { + assertMemoryExceededDetails("" + + "SELECT " + + " rank() OVER (ORDER BY comment DESC) AS rnk " + + "FROM lineitem", + WindowOperator.class.getSimpleName(), + Optional.empty()); + } + + private void assertMemoryExceededDetails(String sql, String expectedTopConsumerOperatorName, Optional expectedTopConsumerOperatorInfo) + { + try { + getQueryRunner().execute(getSession(), sql); + fail("query expected to fail"); + } + catch (RuntimeException e) { + Pattern p = Pattern.compile(".*Query exceeded per-node total memory limit of.*, Details: (.*)", DOTALL); + String message = e.getMessage(); + Matcher matcher = p.matcher(message); + if (!matcher.matches()) { + fail("Unexpected error message: " + message); + } + String detailsJson = matcher.group(1); + List summaries = listJsonCodec(TaskMemoryReservationSummary.class).fromJson(detailsJson); + assertEquals(summaries.get(0).getTopConsumers().get(0).getType(), expectedTopConsumerOperatorName); + if (expectedTopConsumerOperatorInfo.isPresent()) { + assertTrue(summaries.get(0).getTopConsumers().get(0).getInfo().isPresent()); + assertThat(summaries.get(0).getTopConsumers().get(0).getInfo().get()).contains(expectedTopConsumerOperatorInfo.get()); + } + } + } +} diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedVerboseMemoryExceededErrors.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedVerboseMemoryExceededErrors.java new file mode 100644 index 0000000000000..4b790bc8d77f6 --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedVerboseMemoryExceededErrors.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.tpch.TpchQueryRunnerBuilder; + +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_TOTAL_MEMORY_PER_NODE; + +public class TestDistributedVerboseMemoryExceededErrors + extends AbstractTestVerboseMemoryExceededErrors +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return TpchQueryRunnerBuilder + .builder() + .setNodeCount(0) + .build(); + } + + @Override + protected Session getSession() + { + return Session.builder(super.getSession()) + .setSchema("sf10") + .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "250MB") + .build(); + } +} diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestLocalVerboseMemoryExceededErrors.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestLocalVerboseMemoryExceededErrors.java new file mode 100644 index 0000000000000..3e14a374aa303 --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestLocalVerboseMemoryExceededErrors.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.QueryRunner; + +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_TOTAL_MEMORY_PER_NODE; +import static com.facebook.presto.tests.TestLocalQueries.createLocalQueryRunner; + +public class TestLocalVerboseMemoryExceededErrors + extends AbstractTestVerboseMemoryExceededErrors +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createLocalQueryRunner(); + } + + @Override + protected Session getSession() + { + return Session.builder(super.getSession()) + .setSchema("sf1") + .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "50MB") + .build(); + } +}