Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import com.facebook.presto.spi.StandardErrorCode;
import io.airlift.units.DataSize;

import java.util.Optional;

import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_REVOCABLE_MEMORY_LIMIT;
import static com.facebook.presto.util.HeapDumper.dumpHeap;
import static java.lang.String.format;

public class ExceededMemoryLimitException
Expand All @@ -35,8 +38,13 @@ public static ExceededMemoryLimitException exceededGlobalTotalLimit(DataSize max
return new ExceededMemoryLimitException(EXCEEDED_GLOBAL_MEMORY_LIMIT, format("Query exceeded distributed total memory limit of %s defined at the %s", maxMemory, limitSource));
}

public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(
DataSize maxMemory,
String additionalFailureInfo,
boolean heapDumpOnExceededMemoryLimitEnabled,
Optional<String> heapDumpFilePath)
{
performHeapDumpIfEnabled(heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath);
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
Comment on lines 47 to 48
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kinda weird having a dump inside an exception and do it synchronically.... Can we at the callsite, when memory exceeds, we throw the error and asynchronically use an executor to do the dump?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a basically a catch-all method for any exceeded memory OOM. If we add this logic at callsite, it will be spread throughout the code and might not look clean. Presto exceeded memory limit failure is like an OOM so getting a heapdump here makes most sense for debugging purpose.

Also, this is a debugging feature and will be disabled in prod. I didn't wanted to over-engineer the solution since this logic will be triggered in control environment while testing a specific query for OOM failures.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important the heap dump be triggered synchronously with the out of memory error because the purpose of the entire thing is capture the state of the heap when the error occurs. If it's asynchronous it allows time for queries to fail and cleanup to occur both in this query and in other queries that are running concurrently.

WDYT?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the case, shall we add comments to this method (and other's in this class) to indicate this needed and only for debugging purpose.

format("Query exceeded per-node user memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}
Expand All @@ -47,19 +55,38 @@ public static ExceededMemoryLimitException exceededLocalBroadcastMemoryLimit(Dat
format("Query exceeded per-node broadcast memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(
DataSize maxMemory,
String additionalFailureInfo,
boolean heapDumpOnExceededMemoryLimitEnabled,
Optional<String> heapDumpFilePath)
{
performHeapDumpIfEnabled(heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath);
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
format("Query exceeded per-node total memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededLocalRevocableMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
public static ExceededMemoryLimitException exceededLocalRevocableMemoryLimit(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Borderline it might be less useful to do this for revocable memory, but since this is a debug option you enable it seems like it should be fine since exceeding revocable limit is pretty rare.

DataSize maxMemory,
String additionalFailureInfo,
boolean heapDumpOnExceededMemoryLimitEnabled,
Optional<String> heapDumpFilePath)
{
performHeapDumpIfEnabled(heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath);
return new ExceededMemoryLimitException(
EXCEEDED_REVOCABLE_MEMORY_LIMIT,
format("Query exceeded per-node revocable memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

// Heap dump is done synchronously to ensure that we capture the current state of the heap
// This is intended to be used for debugging purposes only
private static void performHeapDumpIfEnabled(boolean heapDumpOnExceededMemoryLimitEnabled, Optional<String> heapDumpFilePath)
{
if (heapDumpOnExceededMemoryLimitEnabled && heapDumpFilePath.isPresent()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to check whether the heapDumpFilePath is a valid path?

Copy link
Copy Markdown
Contributor

@aweisberg aweisberg Sep 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point!

This is a session property per query and not a configuration at startup. It will fail if the path is bad here anyways, but it might make sense to do something like https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/spiller/FileSingleStreamSpillerFactory.java#L101 ?

I am torn on whether the query should fail fast at startup if the file can't be created vs just letting it fail to create the heap dump. It's a bit finicky because we would need to check for each query whether the file can be created.

This is for debug so the temptation is to just do what is simple.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did thought of adding some kind of valid path check but did not pursued on it since this is more of a debugging feature. If heapDump is enabled and heapDumpPath is not specified, a tmp path is automatically constructed for heapDump. Users have the ability to override the heapDumpPath, if needed and they are expected to provide a valid path.

Also, the heapDump is a best effort feature and will not fail the query on exceptions. If any exception is thrown, it will be swallowed and query execution will fail with proper EXCEEDED_MEMORY_LIMIT exception.

dumpHeap(heapDumpFilePath.get());
}
}

private ExceededMemoryLimitException(StandardErrorCode errorCode, String message)
{
super(errorCode, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public final class SystemSessionProperties
public static final String QUERY_OPTIMIZATION_WITH_MATERIALIZED_VIEW_ENABLED = "query_optimization_with_materialized_view_enabled";
public static final String AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY = "aggregation_if_to_filter_rewrite_strategy";
public static final String RESOURCE_AWARE_SCHEDULING_STRATEGY = "resource_aware_scheduling_strategy";
public static final String HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED = "heap_dump_on_exceeded_memory_limit_enabled";
public static final String EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY = "exceeded_memory_limit_heap_dump_file_directory";

//TODO: Prestissimo related session properties that are temporarily put here. They will be relocated in the future
public static final String PRESTISSIMO_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1130,7 +1132,17 @@ public SystemSessionProperties(
nodeSchedulerConfig.getResourceAwareSchedulingStrategy(),
false,
value -> ResourceAwareSchedulingStrategy.valueOf(((String) value).toUpperCase()),
ResourceAwareSchedulingStrategy::name));
ResourceAwareSchedulingStrategy::name),
booleanProperty(
HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED,
"Trigger heap dump to `EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_PATH` on exceeded memory limit exceptions",
false, // This is intended to be used for debugging purposes only and thus we does not need an associated config property
true),
stringProperty(
EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY,
"Directory to which heap snapshot will be dumped, if heap_dump_on_exceeded_memory_limit_enabled",
System.getProperty("java.io.tmpdir"), // This is intended to be used for debugging purposes only and thus we does not need an associated config property
true));
}

public static boolean isEmptyJoinOptimization(Session session)
Expand Down Expand Up @@ -1895,4 +1907,14 @@ public static ResourceAwareSchedulingStrategy getResourceAwareSchedulingStrategy
{
return session.getSystemProperty(RESOURCE_AWARE_SCHEDULING_STRATEGY, ResourceAwareSchedulingStrategy.class);
}

public static Boolean isHeapDumpOnExceededMemoryLimitEnabled(Session session)
{
return session.getSystemProperty(HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED, Boolean.class);
}

public static String getHeapDumpFileDirectory(Session session)
{
return session.getSystemProperty(EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY, String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import javax.inject.Inject;

import java.io.Closeable;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -76,10 +77,12 @@
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.presto.SystemSessionProperties.getHeapDumpFileDirectory;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxRevocableMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled;
import static com.facebook.presto.SystemSessionProperties.resourceOvercommit;
import static com.facebook.presto.execution.SqlTask.createSqlTask;
Expand Down Expand Up @@ -419,6 +422,11 @@ public TaskInfo updateTask(
}

queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session));
queryContext.setHeapDumpOnExceededMemoryLimitEnabled(isHeapDumpOnExceededMemoryLimitEnabled(session));
String heapDumpFilePath = Paths.get(
getHeapDumpFileDirectory(session),
format("%s_%s.hprof", session.getQueryId().getId(), taskId.getStageExecutionId().getStageId().getId())).toString();
queryContext.setHeapDumpFilePath(heapDumpFilePath);

sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public class QueryContext
@GuardedBy("this")
private boolean verboseExceededMemoryLimitErrorsEnabled;

@GuardedBy("this")
private boolean heapDumpOnExceededMemoryLimitEnabled;

@GuardedBy("this")
private Optional<String> heapDumpFilePath;
Comment on lines 115 to 118
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they could be final if we get them from constructor


public QueryContext(
QueryId queryId,
DataSize maxUserMemory,
Expand Down Expand Up @@ -332,6 +338,16 @@ public long getMaxTotalMemory()
return maxTotalMemory;
}

public synchronized void setHeapDumpOnExceededMemoryLimitEnabled(boolean heapDumpOnExceededMemoryLimitEnabled)
{
this.heapDumpOnExceededMemoryLimitEnabled = heapDumpOnExceededMemoryLimitEnabled;
}

public synchronized void setHeapDumpFilePath(String heapDumpFilePath)
{
this.heapDumpFilePath = Optional.ofNullable(heapDumpFilePath);
}

public TaskContext addTaskContext(
TaskStateMachine taskStateMachine,
Session session,
Expand Down Expand Up @@ -477,7 +493,7 @@ private void enforceBroadcastMemoryLimit(long allocated, long delta, long maxMem
private void enforceUserMemoryLimit(long allocated, long delta, long maxMemory)
{
if (allocated + delta > maxMemory) {
throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta), heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath);
}
}

Expand All @@ -487,15 +503,15 @@ private void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory)
long totalMemory = allocated + delta;
peakNodeTotalMemory = Math.max(totalMemory, peakNodeTotalMemory);
if (totalMemory > maxMemory) {
throw exceededLocalTotalMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
throw exceededLocalTotalMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta), heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath);
}
}

@GuardedBy("this")
private void enforceRevocableMemoryLimit(long allocated, long delta, long maxMemory)
{
if (allocated + delta > maxMemory) {
throw exceededLocalRevocableMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
throw exceededLocalRevocableMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta), heapDumpOnExceededMemoryLimitEnabled, heapDumpFilePath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -225,7 +226,9 @@
import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.SystemSessionProperties.getHeapDumpFileDirectory;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled;
import static com.facebook.presto.cost.StatsCalculatorModule.createNewStatsCalculator;
import static com.facebook.presto.execution.scheduler.StreamingPlanSection.extractStreamingSections;
Expand All @@ -243,6 +246,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
Expand Down Expand Up @@ -738,6 +742,11 @@ private MaterializedResultWithPlan executeInternal(Session session, @Language("S
.setTaskPlan(plan.getRoot())
.build();
taskContext.getQueryContext().setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session));
taskContext.getQueryContext().setHeapDumpOnExceededMemoryLimitEnabled(isHeapDumpOnExceededMemoryLimitEnabled(session));
String heapDumpFilePath = Paths.get(
getHeapDumpFileDirectory(session),
format("%s_%s.hprof", session.getQueryId().getId(), taskContext.getTaskId().getStageExecutionId().getStageId().getId())).toString();
taskContext.getQueryContext().setHeapDumpFilePath(heapDumpFilePath);
List<Driver> drivers = createDrivers(session, plan, outputFactory, taskContext);
drivers.forEach(closer::register);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.util;

import com.facebook.airlift.log.Logger;
import com.sun.management.HotSpotDiagnosticMXBean;

import javax.management.MBeanServer;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicBoolean;

public final class HeapDumper
{
private static final Logger log = Logger.get(HeapDumper.class);
private static final String HOTSPOT_BEAN_NAME = "com.sun.management:type=HotSpotDiagnostic";
private static final AtomicBoolean IS_HEAPDUMP_TRIGGERED = new AtomicBoolean(false);

private static volatile HotSpotDiagnosticMXBean hotspotMBean;

private HeapDumper() {}

/**
* Call this method from your application whenever you
* want to dump the heap snapshot into a file.
*
* @param fileName name of the heap dump file
*/
public static void dumpHeap(String fileName)
{
if (IS_HEAPDUMP_TRIGGERED.compareAndSet(false, true)) {
log.info("Performing heapdump to file: " + fileName);
try {
if (hotspotMBean == null) {
hotspotMBean = getHotspotMBean();
}
hotspotMBean.dumpHeap(fileName, false);
}
catch (Throwable throwable) {
// Consume the error as we do not want to fail during heapdump
log.error(throwable, "Unable to perform heap dump");
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a

finally {
    isHeapDumpTriggered.set(false);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont. We want to trigger heapdump only once. As soon as the first thread sets the atomic boolean to TRUE, no other thread should be able to trigger a heapdump after that.

}
}

private static HotSpotDiagnosticMXBean getHotspotMBean()
throws IOException
{
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
return ManagementFactory.newPlatformMXBeanProxy(server, HOTSPOT_BEAN_NAME, HotSpotDiagnosticMXBean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -117,9 +118,11 @@

import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalTotalMemoryLimit;
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getHeapDumpFileDirectory;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled;
import static com.facebook.presto.execution.TaskState.FAILED;
Expand Down Expand Up @@ -396,6 +399,11 @@ public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(
spillSpaceTracker,
memoryReservationSummaryJsonCodec);
queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session));
queryContext.setHeapDumpOnExceededMemoryLimitEnabled(isHeapDumpOnExceededMemoryLimitEnabled(session));
String heapDumpFilePath = Paths.get(
getHeapDumpFileDirectory(session),
format("%s_%s.hprof", session.getQueryId().getId(), stageId.getId())).toString();
queryContext.setHeapDumpFilePath(heapDumpFilePath);

TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, notificationExecutor);
TaskContext taskContext = queryContext.addTaskContext(
Expand All @@ -421,7 +429,9 @@ public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(
queryContext.getAdditionalFailureInfo(totalMemoryReservationBytes, 0) +
format("Total reserved memory: %s, Total revocable memory: %s",
succinctBytes(pool.getQueryMemoryReservation(queryId)),
succinctBytes(pool.getQueryRevocableMemoryReservation(queryId))));
succinctBytes(pool.getQueryRevocableMemoryReservation(queryId))),
isHeapDumpOnExceededMemoryLimitEnabled(session),
Optional.ofNullable(heapDumpFilePath));
}
if (totalMemoryReservationBytes > pool.getMaxBytes() * memoryRevokingThreshold && memoryRevokePending.compareAndSet(false, true)) {
memoryUpdateExecutor.execute(() -> {
Expand Down