diff --git a/presto-main/src/main/java/com/facebook/presto/ExceededMemoryLimitException.java b/presto-main/src/main/java/com/facebook/presto/ExceededMemoryLimitException.java index 9e234bddfa82e..721beca65a7dc 100644 --- a/presto-main/src/main/java/com/facebook/presto/ExceededMemoryLimitException.java +++ b/presto-main/src/main/java/com/facebook/presto/ExceededMemoryLimitException.java @@ -17,9 +17,12 @@ import com.facebook.presto.spi.StandardErrorCode; import io.airlift.units.DataSize; +import java.util.Optional; + import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_REVOCABLE_MEMORY_LIMIT; +import static com.facebook.presto.util.HeapDumper.dumpHeap; import static java.lang.String.format; public class ExceededMemoryLimitException @@ -35,8 +38,13 @@ public static ExceededMemoryLimitException exceededGlobalTotalLimit(DataSize max return new ExceededMemoryLimitException(EXCEEDED_GLOBAL_MEMORY_LIMIT, format("Query exceeded distributed total memory limit of %s defined at the %s", maxMemory, limitSource)); } - public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize maxMemory, String additionalFailureInfo) + public static ExceededMemoryLimitException exceededLocalUserMemoryLimit( + DataSize maxMemory, + String additionalFailureInfo, + boolean heapDumpOnExceededMemoryLimitEnabled, + Optional heapDumpFilePath) { + performHeapDumpIfEnabled(heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath); return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT, format("Query exceeded per-node user memory limit of %s [%s]", maxMemory, additionalFailureInfo)); } @@ -47,19 +55,38 @@ public static ExceededMemoryLimitException exceededLocalBroadcastMemoryLimit(Dat format("Query exceeded per-node broadcast memory limit of %s [%s]", maxMemory, additionalFailureInfo)); } - public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(DataSize maxMemory, String additionalFailureInfo) + public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit( + DataSize maxMemory, + String additionalFailureInfo, + boolean heapDumpOnExceededMemoryLimitEnabled, + Optional heapDumpFilePath) { + performHeapDumpIfEnabled(heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath); return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT, format("Query exceeded per-node total memory limit of %s [%s]", maxMemory, additionalFailureInfo)); } - public static ExceededMemoryLimitException exceededLocalRevocableMemoryLimit(DataSize maxMemory, String additionalFailureInfo) + public static ExceededMemoryLimitException exceededLocalRevocableMemoryLimit( + DataSize maxMemory, + String additionalFailureInfo, + boolean heapDumpOnExceededMemoryLimitEnabled, + Optional heapDumpFilePath) { + performHeapDumpIfEnabled(heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath); return new ExceededMemoryLimitException( EXCEEDED_REVOCABLE_MEMORY_LIMIT, format("Query exceeded per-node revocable memory limit of %s [%s]", maxMemory, additionalFailureInfo)); } + // Heap dump is done synchronously to ensure that we capture the current state of the heap + // This is intended to be used for debugging purposes only + private static void performHeapDumpIfEnabled(boolean heapDumpOnExceededMemoryLimitEnabled, Optional heapDumpFilePath) + { + if (heapDumpOnExceededMemoryLimitEnabled && heapDumpFilePath.isPresent()) { + dumpHeap(heapDumpFilePath.get()); + } + } + private ExceededMemoryLimitException(StandardErrorCode errorCode, String message) { super(errorCode, message); diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index cc7b56947e9d2..5c2f285a4b520 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -207,6 +207,8 @@ public final class SystemSessionProperties public static final String QUERY_OPTIMIZATION_WITH_MATERIALIZED_VIEW_ENABLED = "query_optimization_with_materialized_view_enabled"; public static final String AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY = "aggregation_if_to_filter_rewrite_strategy"; public static final String RESOURCE_AWARE_SCHEDULING_STRATEGY = "resource_aware_scheduling_strategy"; + public static final String HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED = "heap_dump_on_exceeded_memory_limit_enabled"; + public static final String EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY = "exceeded_memory_limit_heap_dump_file_directory"; //TODO: Prestissimo related session properties that are temporarily put here. They will be relocated in the future public static final String PRESTISSIMO_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled"; @@ -1130,7 +1132,17 @@ public SystemSessionProperties( nodeSchedulerConfig.getResourceAwareSchedulingStrategy(), false, value -> ResourceAwareSchedulingStrategy.valueOf(((String) value).toUpperCase()), - ResourceAwareSchedulingStrategy::name)); + ResourceAwareSchedulingStrategy::name), + booleanProperty( + HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED, + "Trigger heap dump to `EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_PATH` on exceeded memory limit exceptions", + false, // This is intended to be used for debugging purposes only and thus we does not need an associated config property + true), + stringProperty( + EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY, + "Directory to which heap snapshot will be dumped, if heap_dump_on_exceeded_memory_limit_enabled", + System.getProperty("java.io.tmpdir"), // This is intended to be used for debugging purposes only and thus we does not need an associated config property + true)); } public static boolean isEmptyJoinOptimization(Session session) @@ -1895,4 +1907,14 @@ public static ResourceAwareSchedulingStrategy getResourceAwareSchedulingStrategy { return session.getSystemProperty(RESOURCE_AWARE_SCHEDULING_STRATEGY, ResourceAwareSchedulingStrategy.class); } + + public static Boolean isHeapDumpOnExceededMemoryLimitEnabled(Session session) + { + return session.getSystemProperty(HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED, Boolean.class); + } + + public static String getHeapDumpFileDirectory(Session session) + { + return session.getSystemProperty(EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY, String.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java index 100e30364c47a..37dd80aad328e 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java @@ -67,6 +67,7 @@ import javax.inject.Inject; import java.io.Closeable; +import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,10 +77,12 @@ import java.util.concurrent.TimeUnit; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.facebook.presto.SystemSessionProperties.getHeapDumpFileDirectory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxRevocableMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled; import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.SystemSessionProperties.resourceOvercommit; import static com.facebook.presto.execution.SqlTask.createSqlTask; @@ -419,6 +422,11 @@ public TaskInfo updateTask( } queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session)); + queryContext.setHeapDumpOnExceededMemoryLimitEnabled(isHeapDumpOnExceededMemoryLimitEnabled(session)); + String heapDumpFilePath = Paths.get( + getHeapDumpFileDirectory(session), + format("%s_%s.hprof", session.getQueryId().getId(), taskId.getStageExecutionId().getStageId().getId())).toString(); + queryContext.setHeapDumpFilePath(heapDumpFilePath); sqlTask.recordHeartbeat(); return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo); diff --git a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java index d858ead7edb4b..48e104e2daad2 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java @@ -111,6 +111,12 @@ public class QueryContext @GuardedBy("this") private boolean verboseExceededMemoryLimitErrorsEnabled; + @GuardedBy("this") + private boolean heapDumpOnExceededMemoryLimitEnabled; + + @GuardedBy("this") + private Optional heapDumpFilePath; + public QueryContext( QueryId queryId, DataSize maxUserMemory, @@ -332,6 +338,16 @@ public long getMaxTotalMemory() return maxTotalMemory; } + public synchronized void setHeapDumpOnExceededMemoryLimitEnabled(boolean heapDumpOnExceededMemoryLimitEnabled) + { + this.heapDumpOnExceededMemoryLimitEnabled = heapDumpOnExceededMemoryLimitEnabled; + } + + public synchronized void setHeapDumpFilePath(String heapDumpFilePath) + { + this.heapDumpFilePath = Optional.ofNullable(heapDumpFilePath); + } + public TaskContext addTaskContext( TaskStateMachine taskStateMachine, Session session, @@ -477,7 +493,7 @@ private void enforceBroadcastMemoryLimit(long allocated, long delta, long maxMem private void enforceUserMemoryLimit(long allocated, long delta, long maxMemory) { if (allocated + delta > maxMemory) { - throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta)); + throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta), heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath); } } @@ -487,7 +503,7 @@ private void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory) long totalMemory = allocated + delta; peakNodeTotalMemory = Math.max(totalMemory, peakNodeTotalMemory); if (totalMemory > maxMemory) { - throw exceededLocalTotalMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta)); + throw exceededLocalTotalMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta), heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath); } } @@ -495,7 +511,7 @@ private void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory) private void enforceRevocableMemoryLimit(long allocated, long delta, long maxMemory) { if (allocated + delta > maxMemory) { - throw exceededLocalRevocableMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta)); + throw exceededLocalRevocableMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta), heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath); } } diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 0a24e297e89cf..fd40aafec3574 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -207,6 +207,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -225,7 +226,9 @@ import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.json.JsonCodec.jsonCodec; +import static com.facebook.presto.SystemSessionProperties.getHeapDumpFileDirectory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled; import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.cost.StatsCalculatorModule.createNewStatsCalculator; import static com.facebook.presto.execution.scheduler.StreamingPlanSection.extractStreamingSections; @@ -243,6 +246,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; @@ -738,6 +742,11 @@ private MaterializedResultWithPlan executeInternal(Session session, @Language("S .setTaskPlan(plan.getRoot()) .build(); taskContext.getQueryContext().setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session)); + taskContext.getQueryContext().setHeapDumpOnExceededMemoryLimitEnabled(isHeapDumpOnExceededMemoryLimitEnabled(session)); + String heapDumpFilePath = Paths.get( + getHeapDumpFileDirectory(session), + format("%s_%s.hprof", session.getQueryId().getId(), taskContext.getTaskId().getStageExecutionId().getStageId().getId())).toString(); + taskContext.getQueryContext().setHeapDumpFilePath(heapDumpFilePath); List drivers = createDrivers(session, plan, outputFactory, taskContext); drivers.forEach(closer::register); diff --git a/presto-main/src/main/java/com/facebook/presto/util/HeapDumper.java b/presto-main/src/main/java/com/facebook/presto/util/HeapDumper.java new file mode 100644 index 0000000000000..b6a4dd0f3444c --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/util/HeapDumper.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.util; + +import com.facebook.airlift.log.Logger; +import com.sun.management.HotSpotDiagnosticMXBean; + +import javax.management.MBeanServer; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +public final class HeapDumper +{ + private static final Logger log = Logger.get(HeapDumper.class); + private static final String HOTSPOT_BEAN_NAME = "com.sun.management:type=HotSpotDiagnostic"; + private static final AtomicBoolean IS_HEAPDUMP_TRIGGERED = new AtomicBoolean(false); + + private static volatile HotSpotDiagnosticMXBean hotspotMBean; + + private HeapDumper() {} + + /** + * Call this method from your application whenever you + * want to dump the heap snapshot into a file. + * + * @param fileName name of the heap dump file + */ + public static void dumpHeap(String fileName) + { + if (IS_HEAPDUMP_TRIGGERED.compareAndSet(false, true)) { + log.info("Performing heapdump to file: " + fileName); + try { + if (hotspotMBean == null) { + hotspotMBean = getHotspotMBean(); + } + hotspotMBean.dumpHeap(fileName, false); + } + catch (Throwable throwable) { + // Consume the error as we do not want to fail during heapdump + log.error(throwable, "Unable to perform heap dump"); + } + } + } + + private static HotSpotDiagnosticMXBean getHotspotMBean() + throws IOException + { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + return ManagementFactory.newPlatformMXBeanProxy(server, HOTSPOT_BEAN_NAME, HotSpotDiagnosticMXBean.class); + } +} diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index 464fc32d2c653..fcecb4cd53e80 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -101,6 +101,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; @@ -117,9 +118,11 @@ import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalTotalMemoryLimit; import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount; +import static com.facebook.presto.SystemSessionProperties.getHeapDumpFileDirectory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.execution.TaskState.FAILED; @@ -396,6 +399,11 @@ public IPrestoSparkTaskExecutor doCreate( spillSpaceTracker, memoryReservationSummaryJsonCodec); queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session)); + queryContext.setHeapDumpOnExceededMemoryLimitEnabled(isHeapDumpOnExceededMemoryLimitEnabled(session)); + String heapDumpFilePath = Paths.get( + getHeapDumpFileDirectory(session), + format("%s_%s.hprof", session.getQueryId().getId(), stageId.getId())).toString(); + queryContext.setHeapDumpFilePath(heapDumpFilePath); TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, notificationExecutor); TaskContext taskContext = queryContext.addTaskContext( @@ -421,7 +429,9 @@ public IPrestoSparkTaskExecutor doCreate( queryContext.getAdditionalFailureInfo(totalMemoryReservationBytes, 0) + format("Total reserved memory: %s, Total revocable memory: %s", succinctBytes(pool.getQueryMemoryReservation(queryId)), - succinctBytes(pool.getQueryRevocableMemoryReservation(queryId)))); + succinctBytes(pool.getQueryRevocableMemoryReservation(queryId))), + isHeapDumpOnExceededMemoryLimitEnabled(session), + Optional.ofNullable(heapDumpFilePath)); } if (totalMemoryReservationBytes > pool.getMaxBytes() * memoryRevokingThreshold && memoryRevokePending.compareAndSet(false, true)) { memoryUpdateExecutor.execute(() -> {