Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.PageSourceOperator;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskMemoryReservationSummary;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.operator.project.InputPageProjection;
import com.facebook.presto.operator.project.PageProcessor;
Expand Down Expand Up @@ -66,6 +67,7 @@
import java.util.Optional;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
import static com.facebook.airlift.stats.CpuTimer.CpuDuration;
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount;
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
Expand Down Expand Up @@ -282,9 +284,12 @@ protected Map<String, Long> runOnce()
localQueryRunner.getExecutor(),
localQueryRunner.getScheduler(),
new DataSize(256, MEGABYTE),
spillSpaceTracker)
.addTaskContext(new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()),
spillSpaceTracker,
listJsonCodec(TaskMemoryReservationSummary.class))
.addTaskContext(
new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()),
session,
Optional.empty(),
false,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.operator.Driver;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskMemoryReservationSummary;
import com.facebook.presto.plugin.memory.MemoryConnectorFactory;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
Expand All @@ -42,6 +43,7 @@
import java.util.Map;
import java.util.Optional;

import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -86,11 +88,14 @@ public List<Page> execute(@Language("SQL") String query)
localQueryRunner.getExecutor(),
localQueryRunner.getScheduler(),
new DataSize(4, GIGABYTE),
spillSpaceTracker);
spillSpaceTracker,
listJsonCodec(TaskMemoryReservationSummary.class));

TaskContext taskContext = queryContext
.addTaskContext(new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()),
.addTaskContext(
new TaskStateMachine(new TaskId("query", 0, 0, 0), localQueryRunner.getExecutor()),
localQueryRunner.getDefaultSession(),
Optional.empty(),
false,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public final class SystemSessionProperties
public static final String PARTIAL_RESULTS_COMPLETION_RATIO_THRESHOLD = "partial_results_completion_ratio_threshold";
public static final String PARTIAL_RESULTS_MAX_EXECUTION_TIME_MULTIPLIER = "partial_results_max_execution_time_multiplier";
public static final String OFFSET_CLAUSE_ENABLED = "offset_clause_enabled";
public static final String VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED = "verbose_exceeded_memory_limit_errors_enabled";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to EXCEEDED_MEMORY_LIMIT_ERRORS_VERBOSE_LOGGING_ENABLED

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXCEEDED_MEMORY_LIMIT_ERRORS_VERBOSE_LOGGING_ENABLED may sound a little misleading, as it suggests that the logging verbosity is getting changed. Setting this property to true won't increase logging verbosity, but is going to provide extra details in error messages.


private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1048,6 +1049,11 @@ public SystemSessionProperties(
PARTIAL_RESULTS_MAX_EXECUTION_TIME_MULTIPLIER,
"This value is multiplied by the time taken to reach the completion ratio threshold and is set as max task end time",
featuresConfig.getPartialResultsMaxExecutionTimeMultiplier(),
false),
booleanProperty(
VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED,
"When enabled the error message for exceeded memory limit errors will contain additional operator memory allocation details",
nodeMemoryConfig.isVerboseExceededMemoryLimitErrorsEnabled(),
false));
}

Expand Down Expand Up @@ -1768,4 +1774,9 @@ public static boolean isOffsetClauseEnabled(Session session)
{
return session.getSystemProperty(OFFSET_CLAUSE_ENABLED, Boolean.class);
}

public static boolean isVerboseExceededMemoryLimitErrorsEnabled(Session session)
{
return session.getSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import com.facebook.presto.sql.planner.PlanFragment;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;

import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled;
import static com.facebook.presto.execution.SqlTaskExecution.createSqlTaskExecution;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -88,6 +90,8 @@ public SqlTaskExecution create(
TaskContext taskContext = queryContext.addTaskContext(
taskStateMachine,
session,
// Plan has to be retained only if verbose memory exceeded errors are requested
isVerboseExceededMemoryLimitErrorsEnabled(session) ? Optional.of(fragment.getRoot()) : Optional.empty(),
perOperatorCpuTimerEnabled,
cpuTimerEnabled,
perOperatorAllocationTrackingEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution;

import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.airlift.stats.CounterStat;
Expand All @@ -37,6 +38,7 @@
import com.facebook.presto.metadata.MetadataUpdates;
import com.facebook.presto.operator.ExchangeClientSupplier;
import com.facebook.presto.operator.FragmentResultCacheManager;
import com.facebook.presto.operator.TaskMemoryReservationSummary;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.connector.ConnectorMetadataUpdater;
Expand Down Expand Up @@ -77,6 +79,7 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled;
import static com.facebook.presto.SystemSessionProperties.resourceOvercommit;
import static com.facebook.presto.execution.SqlTask.createSqlTask;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
Expand Down Expand Up @@ -107,6 +110,7 @@ public class SqlTaskManager
private final Duration clientTimeout;

private final LocalMemoryManager localMemoryManager;
private final JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec;
private final LoadingCache<QueryId, QueryContext> queryContexts;
private final LoadingCache<TaskId, SqlTask> tasks;

Expand All @@ -126,6 +130,7 @@ public SqlTaskManager(
SplitMonitor splitMonitor,
NodeInfo nodeInfo,
LocalMemoryManager localMemoryManager,
JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec,
TaskManagementExecutor taskManagementExecutor,
TaskManagerConfig config,
NodeMemoryConfig nodeMemoryConfig,
Expand Down Expand Up @@ -162,6 +167,7 @@ public SqlTaskManager(
config);

this.localMemoryManager = requireNonNull(localMemoryManager, "localMemoryManager is null");
this.memoryReservationSummaryJsonCodec = requireNonNull(memoryReservationSummaryJsonCodec, "memoryReservationSummaryJsonCodec is null");
DataSize maxQueryUserMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
DataSize maxQueryTotalMemoryPerNode = nodeMemoryConfig.getMaxQueryTotalMemoryPerNode();
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();
Expand Down Expand Up @@ -213,7 +219,8 @@ private QueryContext createQueryContext(
taskNotificationExecutor,
driverYieldExecutor,
maxQuerySpillPerNode,
localSpillManager.getSpillSpaceTracker());
localSpillManager.getSpillSpaceTracker(),
memoryReservationSummaryJsonCodec);
}

@Override
Expand Down Expand Up @@ -409,6 +416,8 @@ public TaskInfo updateTask(
}
}

queryContext.setVerboseExceededMemoryLimitErrorsEnabled(isVerboseExceededMemoryLimitErrorsEnabled(session));

Comment on lines 419 to 420
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why is this done in this way rather than passing it while creating QueryContext object?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there's a design flaw in how we create QueryContext. In Presto classic the QueryContext object is shared between all tasks of the same query and is created when the first task of a query is scheduled on the node. While in practice 2 tasks of the same query shouldn't have different session properties the interface allows it. Since the hack ...

We are already using a similar hack for setting memory limits: https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java#L405

Perhaps we need to refactor it at some point. But that is probably beyond the scope of this PR.

sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class NodeMemoryConfig
private DataSize softMaxQueryTotalMemoryPerNode;
private DataSize heapHeadroom = new DataSize(AVAILABLE_HEAP_MEMORY * 0.3, BYTE);

private boolean verboseExceededMemoryLimitErrorsEnabled;

@NotNull
public DataSize getMaxQueryBroadcastMemory()
{
Expand Down Expand Up @@ -143,4 +145,17 @@ public NodeMemoryConfig setHeapHeadroom(DataSize heapHeadroom)
this.heapHeadroom = heapHeadroom;
return this;
}

public boolean isVerboseExceededMemoryLimitErrorsEnabled()
{
return verboseExceededMemoryLimitErrorsEnabled;
}

@Config("memory.verbose-exceeded-memory-limit-errors-enabled")
@ConfigDescription("When enabled the error message for exceeded memory limit errors will contain additional operator memory allocation details")
public NodeMemoryConfig setVerboseExceededMemoryLimitErrorsEnabled(boolean verboseExceededMemoryLimitErrorsEnabled)
{
this.verboseExceededMemoryLimitErrorsEnabled = verboseExceededMemoryLimitErrorsEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
*/
package com.facebook.presto.memory;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.stats.GcMonitor;
import com.facebook.presto.Session;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.memory.context.MemoryReservationHandler;
import com.facebook.presto.memory.context.MemoryTrackingContext;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskMemoryReservationSummary;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spiller.SpillSpaceTracker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand All @@ -35,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -52,11 +56,13 @@
import static com.facebook.presto.operator.Operator.NOT_BLOCKED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.Map.Entry.comparingByValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand All @@ -72,6 +78,7 @@ public class QueryContext
private final ScheduledExecutorService yieldExecutor;
private final long maxSpill;
private final SpillSpaceTracker spillSpaceTracker;
private final JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec;
private final Map<TaskId, TaskContext> taskContexts = new ConcurrentHashMap<>();

@GuardedBy("this")
Expand Down Expand Up @@ -103,6 +110,9 @@ public class QueryContext
@GuardedBy("this")
private long spillUsed;

@GuardedBy("this")
private boolean verboseExceededMemoryLimitErrorsEnabled;

public QueryContext(
QueryId queryId,
DataSize maxUserMemory,
Expand All @@ -114,7 +124,8 @@ public QueryContext(
Executor notificationExecutor,
ScheduledExecutorService yieldExecutor,
DataSize maxSpill,
SpillSpaceTracker spillSpaceTracker)
SpillSpaceTracker spillSpaceTracker,
JsonCodec<List<TaskMemoryReservationSummary>> memoryReservationSummaryJsonCodec)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null").toBytes();
Expand All @@ -127,6 +138,7 @@ public QueryContext(
this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null");
this.maxSpill = requireNonNull(maxSpill, "maxSpill is null").toBytes();
this.spillSpaceTracker = requireNonNull(spillSpaceTracker, "spillSpaceTracker is null");
this.memoryReservationSummaryJsonCodec = requireNonNull(memoryReservationSummaryJsonCodec, "memoryReservationSummaryJsonCodec is null");
this.queryMemoryContext = new MemoryTrackingContext(
newRootAggregatedMemoryContext(new QueryMemoryReservationHandler(this::updateUserMemory, this::tryUpdateUserMemory, this::updateBroadcastMemory, this::tryUpdateBroadcastMemory), GUARANTEED_MEMORY),
newRootAggregatedMemoryContext(new QueryMemoryReservationHandler(this::updateRevocableMemory, this::tryReserveMemoryNotSupported, this::updateBroadcastMemory, this::tryUpdateBroadcastMemory), 0L),
Expand Down Expand Up @@ -325,6 +337,7 @@ public long getMaxTotalMemory()
public TaskContext addTaskContext(
TaskStateMachine taskStateMachine,
Session session,
Optional<PlanNode> taskPlan,
boolean perOperatorCpuTimerEnabled,
boolean cpuTimerEnabled,
boolean perOperatorAllocationTrackingEnabled,
Expand All @@ -339,6 +352,7 @@ public TaskContext addTaskContext(
yieldExecutor,
session,
queryMemoryContext.newMemoryTrackingContext(),
taskPlan,
perOperatorCpuTimerEnabled,
cpuTimerEnabled,
perOperatorAllocationTrackingEnabled,
Expand Down Expand Up @@ -388,6 +402,11 @@ public synchronized void setMemoryLimits(DataSize queryMaxTaskMemory, DataSize q
memoryLimitsInitialized = true;
}

public synchronized void setVerboseExceededMemoryLimitErrorsEnabled(boolean verboseExceededMemoryLimitErrorsEnabled)
{
this.verboseExceededMemoryLimitErrorsEnabled = verboseExceededMemoryLimitErrorsEnabled;
}

public synchronized long getPeakNodeTotalMemory()
{
return peakNodeTotalMemory;
Expand Down Expand Up @@ -496,6 +515,17 @@ public String getAdditionalFailureInfo(long allocated, long delta)
.collect(toImmutableMap(Entry::getKey, e -> succinctBytes(e.getValue())))
.toString();

return format("%s, Top Consumers: %s", additionalInfo, topConsumers);
String message = format("%s, Top Consumers: %s", additionalInfo, topConsumers);

if (verboseExceededMemoryLimitErrorsEnabled) {
List<TaskMemoryReservationSummary> memoryReservationSummaries = taskContexts.values().stream()
.map(TaskContext::getMemoryReservationSummary)
.filter(summary -> summary.getReservation().toBytes() > 0)
.sorted(comparing(TaskMemoryReservationSummary::getReservation).reversed())
.limit(3)
.collect(toImmutableList());
message += ", Details: " + memoryReservationSummaryJsonCodec.toJson(memoryReservationSummaries);
}
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class OperatorContext
private final AtomicLong peakTotalMemoryReservation = new AtomicLong();
private final RuntimeStats runtimeStats = new RuntimeStats();

private final AtomicLong currentTotalMemoryReservationInBytes = new AtomicLong();

@GuardedBy("this")
private boolean memoryRevokingRequested;

Expand Down Expand Up @@ -129,6 +131,11 @@ public int getOperatorId()
return operatorId;
}

public PlanNodeId getPlanNodeId()
{
return planNodeId;
}

public String getOperatorType()
{
return operatorType;
Expand Down Expand Up @@ -305,6 +312,12 @@ private void updatePeakMemoryReservations()
peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
peakSystemMemoryReservation.accumulateAndGet(systemMemory, Math::max);
peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max);
currentTotalMemoryReservationInBytes.set(totalMemory);
}

public long getCurrentTotalMemoryReservationInBytes()
{
return currentTotalMemoryReservationInBytes.get();
}

// listen to revocable memory allocations and call any listeners waiting on task memory allocation
Expand Down
Loading