Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(DataSiz
format("Query exceeded per-node total memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededTaskMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
{
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
format("Query exceeded per-task total memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

private ExceededMemoryLimitException(StandardErrorCode errorCode, String message)
{
super(errorCode, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public final class SystemSessionProperties
public static final String ENABLE_LARGE_DYNAMIC_FILTERS = "enable_large_dynamic_filters";
public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node";
public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE = "query_max_total_memory_per_node";
public static final String QUERY_MAX_TOTAL_MEMORY_PER_TASK = "query_max_total_memory_per_task";
public static final String IGNORE_DOWNSTREAM_PREFERENCES = "ignore_downstream_preferences";
public static final String FILTERING_SEMI_JOIN_TO_INNER = "rewrite_filtering_semi_join_to_inner_join";
public static final String OPTIMIZE_DUPLICATE_INSENSITIVE_JOINS = "optimize_duplicate_insensitive_joins";
Expand Down Expand Up @@ -594,6 +595,11 @@ public SystemSessionProperties(
"Maximum amount of total memory a query can use per node",
nodeMemoryConfig.getMaxQueryTotalMemoryPerNode(),
true),
dataSizeProperty(
QUERY_MAX_TOTAL_MEMORY_PER_TASK,
"Maximum amount of memory a single task can use",
nodeMemoryConfig.getMaxQueryTotalMemoryPerTask().orElse(null),
true),
booleanProperty(
IGNORE_DOWNSTREAM_PREFERENCES,
"Ignore Parent's PreferredProperties in AddExchange optimizer",
Expand Down Expand Up @@ -1160,6 +1166,11 @@ public static DataSize getQueryMaxTotalMemoryPerNode(Session session)
return session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, DataSize.class);
}

public static Optional<DataSize> getQueryMaxTotalMemoryPerTask(Session session)
{
return Optional.ofNullable(session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_TASK, DataSize.class));
}

public static boolean ignoreDownStreamPreferences(Session session)
{
return session.getSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static io.trino.SystemSessionProperties.getQueryMaxTotalMemoryPerNode;
import static io.trino.SystemSessionProperties.getQueryMaxTotalMemoryPerTask;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.execution.SqlTask.createSqlTask;
import static io.trino.memory.LocalMemoryManager.GENERAL_POOL;
Expand Down Expand Up @@ -111,6 +112,7 @@ public class SqlTaskManager

private final long queryMaxMemoryPerNode;
private final long queryMaxTotalMemoryPerNode;
private final Optional<DataSize> queryMaxMemoryPerTask;

@GuardedBy("this")
private long currentMemoryPoolAssignmentVersion;
Expand Down Expand Up @@ -155,13 +157,14 @@ public SqlTaskManager(
this.localMemoryManager = requireNonNull(localMemoryManager, "localMemoryManager is null");
DataSize maxQueryMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
DataSize maxQueryTotalMemoryPerNode = nodeMemoryConfig.getMaxQueryTotalMemoryPerNode();
queryMaxMemoryPerTask = nodeMemoryConfig.getMaxQueryTotalMemoryPerTask();
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();

queryMaxMemoryPerNode = maxQueryMemoryPerNode.toBytes();
queryMaxTotalMemoryPerNode = maxQueryTotalMemoryPerNode.toBytes();

queryContexts = CacheBuilder.newBuilder().weakValues().build(CacheLoader.from(
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQueryTotalMemoryPerNode, maxQuerySpillPerNode)));
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQueryTotalMemoryPerNode, queryMaxMemoryPerTask, maxQuerySpillPerNode)));

tasks = CacheBuilder.newBuilder().build(CacheLoader.from(
taskId -> createSqlTask(
Expand All @@ -184,12 +187,14 @@ private QueryContext createQueryContext(
GcMonitor gcMonitor,
DataSize maxQueryUserMemoryPerNode,
DataSize maxQueryTotalMemoryPerNode,
Optional<DataSize> maxQueryMemoryPerTask,
DataSize maxQuerySpillPerNode)
{
return new QueryContext(
queryId,
maxQueryUserMemoryPerNode,
maxQueryTotalMemoryPerNode,
maxQueryMemoryPerTask,
localMemoryManager.getGeneralPool(),
gcMonitor,
taskNotificationExecutor,
Expand Down Expand Up @@ -406,11 +411,19 @@ private TaskInfo doUpdateTask(
if (!queryContext.isMemoryLimitsInitialized()) {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
long sessionQueryTotalMaxMemoryPerNode = getQueryMaxTotalMemoryPerNode(session).toBytes();

Optional<DataSize> effectiveQueryMaxMemoryPerTask = getQueryMaxTotalMemoryPerTask(session);
if (queryMaxMemoryPerTask.isPresent() &&
(effectiveQueryMaxMemoryPerTask.isEmpty() || effectiveQueryMaxMemoryPerTask.get().toBytes() > queryMaxMemoryPerTask.get().toBytes())) {
effectiveQueryMaxMemoryPerTask = queryMaxMemoryPerTask;
}

// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode),
min(sessionQueryTotalMaxMemoryPerNode, queryMaxTotalMemoryPerNode));
min(sessionQueryTotalMaxMemoryPerNode, queryMaxTotalMemoryPerNode),
effectiveQueryMaxMemoryPerTask);
}

sqlTask.recordHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@

import javax.validation.constraints.NotNull;

import java.util.Optional;

// This is separate from MemoryManagerConfig because it's difficult to test the default value of maxQueryMemoryPerNode
@DefunctConfig("deprecated.legacy-system-pool-enabled")
public class NodeMemoryConfig
{
public static final long AVAILABLE_HEAP_MEMORY = Runtime.getRuntime().maxMemory();
public static final String QUERY_MAX_MEMORY_PER_NODE_CONFIG = "query.max-memory-per-node";
public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG = "query.max-total-memory-per-node";
public static final String QUERY_MAX_TOTAL_MEMORY_PER_TASK_CONFIG = "query.max-total-memory-per-task";

private boolean isReservedPoolDisabled = true;

private DataSize maxQueryMemoryPerNode = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.1));

private Optional<DataSize> maxQueryTotalMemoryPerTask = Optional.empty();

// This is a per-query limit for the user plus system allocations.
private DataSize maxQueryTotalMemoryPerNode = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3));
private DataSize heapHeadroom = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3));
Expand All @@ -50,6 +55,20 @@ public NodeMemoryConfig setMaxQueryMemoryPerNode(DataSize maxQueryMemoryPerNode)
return this;
}

@NotNull
public Optional<DataSize> getMaxQueryTotalMemoryPerTask()
{
return maxQueryTotalMemoryPerTask;
}

@Config(QUERY_MAX_TOTAL_MEMORY_PER_TASK_CONFIG)
@ConfigDescription("Sets total (user + system) memory limit enforced for a single task; there is no memory limit by default")
public NodeMemoryConfig setMaxQueryTotalMemoryPerTask(DataSize maxQueryTotalMemoryPerTask)
{
this.maxQueryTotalMemoryPerTask = Optional.ofNullable(maxQueryTotalMemoryPerTask);
return this;
}

@Deprecated
@LegacyConfig(value = "experimental.reserved-pool-enabled", replacedBy = "experimental.reserved-pool-disabled")
public void setReservedPoolEnabled(boolean reservedPoolEnabled)
Expand Down
15 changes: 12 additions & 3 deletions core/trino-main/src/main/java/io/trino/memory/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -51,6 +52,7 @@
import static io.trino.ExceededSpillLimitException.exceededPerQueryLocalLimit;
import static io.trino.memory.context.AggregatedMemoryContext.newRootAggregatedMemoryContext;
import static io.trino.operator.Operator.NOT_BLOCKED;
import static io.trino.operator.TaskContext.createTaskContext;
import static java.lang.String.format;
import static java.util.Map.Entry.comparingByValue;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class QueryContext
private long maxUserMemory;
@GuardedBy("this")
private long maxTotalMemory;
@GuardedBy("this")
private Optional<DataSize> maxTaskMemory;

private final MemoryTrackingContext queryMemoryContext;

Expand All @@ -91,6 +95,7 @@ public QueryContext(
QueryId queryId,
DataSize maxUserMemory,
DataSize maxTotalMemory,
Optional<DataSize> maxTaskMemory,
MemoryPool memoryPool,
GcMonitor gcMonitor,
Executor notificationExecutor,
Expand All @@ -101,6 +106,7 @@ public QueryContext(
this.queryId = requireNonNull(queryId, "queryId is null");
this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null").toBytes();
this.maxTotalMemory = requireNonNull(maxTotalMemory, "maxTotalMemory is null").toBytes();
this.maxTaskMemory = requireNonNull(maxTaskMemory, "maxTaskMemory is null");
this.memoryPool = requireNonNull(memoryPool, "memoryPool is null");
this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
Expand All @@ -119,7 +125,7 @@ public boolean isMemoryLimitsInitialized()
}

// TODO: This method should be removed, and the correct limit set in the constructor. However, due to the way QueryContext is constructed the memory limit is not known in advance
public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long maxUserMemory, long maxTotalMemory)
public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long maxUserMemory, long maxTotalMemory, Optional<DataSize> maxTaskMemory)
{
checkArgument(maxUserMemory >= 0, "maxUserMemory must be >= 0, found: %s", maxUserMemory);
checkArgument(maxTotalMemory >= 0, "maxTotalMemory must be >= 0, found: %s", maxTotalMemory);
Expand All @@ -129,10 +135,12 @@ public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long
// The coordinator will kill the query if the cluster runs out of memory.
this.maxUserMemory = memoryPool.getMaxBytes();
this.maxTotalMemory = memoryPool.getMaxBytes();
this.maxTaskMemory = Optional.empty(); // disabled
}
else {
this.maxUserMemory = maxUserMemory;
this.maxTotalMemory = maxTotalMemory;
this.maxTaskMemory = maxTaskMemory;
}
memoryLimitsInitialized = true;
}
Expand Down Expand Up @@ -294,7 +302,7 @@ public TaskContext addTaskContext(
boolean perOperatorCpuTimerEnabled,
boolean cpuTimerEnabled)
{
TaskContext taskContext = TaskContext.createTaskContext(
TaskContext taskContext = createTaskContext(
this,
taskStateMachine,
gcMonitor,
Expand All @@ -304,7 +312,8 @@ public TaskContext addTaskContext(
queryMemoryContext.newMemoryTrackingContext(),
notifyStatusChanged,
perOperatorCpuTimerEnabled,
cpuTimerEnabled);
cpuTimerEnabled,
maxTaskMemory);
taskContexts.put(taskStateMachine.getTaskId(), taskContext);
return taskContext;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator;

import io.airlift.units.DataSize;
import io.trino.memory.context.MemoryAllocationValidator;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.ExceededMemoryLimitException.exceededTaskMemoryLimit;
import static java.lang.String.format;
import static java.util.Map.Entry.comparingByValue;
import static java.util.Objects.requireNonNull;

@ThreadSafe
// Keeps track of per-node memory usage of given task. Single instance is shared by multiple ValidatingLocalMemoryContext instances
// originating from single ValidatingAggregateContext.
public class TaskAllocationValidator
implements MemoryAllocationValidator
{
private final long limitBytes;
@GuardedBy("this")
private long usedBytes;
@GuardedBy("this")
private final Map<String, Long> taggedAllocations = new HashMap<>();

public TaskAllocationValidator(DataSize memoryLimit)
{
this.limitBytes = requireNonNull(memoryLimit, "memoryLimit is null").toBytes();
}

@Override
public synchronized void reserveMemory(String allocationTag, long delta)
{
if (usedBytes + delta > limitBytes) {
verify(delta > 0, "exceeded limit with negative delta (%s); usedBytes=%s, limitBytes=%s", delta, usedBytes, limitBytes);
raiseLimitExceededFailure(allocationTag, delta);
}
usedBytes += delta;
taggedAllocations.merge(allocationTag, delta, Long::sum);
}

private synchronized void raiseLimitExceededFailure(String currentAllocationTag, long currentAllocationDelta)
{
Map<String, Long> tmpTaggedAllocations = new HashMap<>(taggedAllocations);
// include current allocation in the output of top-consumers
tmpTaggedAllocations.merge(currentAllocationTag, currentAllocationDelta, Long::sum);
String topConsumers = tmpTaggedAllocations.entrySet().stream()
.sorted(comparingByValue(Comparator.reverseOrder()))
.limit(3)
.filter(e -> e.getValue() >= 0)
.collect(toImmutableMap(Map.Entry::getKey, e -> succinctBytes(e.getValue())))
.toString();

String additionalInfo = format("Allocated: %s, Delta: %s, Top Consumers: %s", succinctBytes(usedBytes), succinctBytes(currentAllocationDelta), topConsumers);
throw exceededTaskMemoryLimit(DataSize.succinctBytes(limitBytes), additionalInfo);
}

@Override
public synchronized boolean tryReserveMemory(String allocationTag, long delta)
{
if (usedBytes + delta > limitBytes) {
verify(delta > 0, "exceeded limit with negative delta (%s); usedBytes=%s, limitBytes=%s", delta, usedBytes, limitBytes);
return false;
}
usedBytes += delta;
return true;
}
}
Loading