Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.NodeMemoryConfig;
import io.trino.memory.QueryContext;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.VersionEmbedder;
Expand Down Expand Up @@ -71,9 +72,11 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.SqlTask.createSqlTask;
import static io.trino.operator.RetryPolicy.TASK;
import static io.trino.spi.StandardErrorCode.ABANDONED_TASK;
import static io.trino.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static java.lang.Math.min;
Expand Down Expand Up @@ -394,12 +397,20 @@ private TaskInfo doUpdateTask(
SqlTask sqlTask = tasks.getUnchecked(taskId);
QueryContext queryContext = sqlTask.getQueryContext();
if (!queryContext.isMemoryLimitsInitialized()) {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
RetryPolicy retryPolicy = getRetryPolicy(session);
if (retryPolicy == TASK) {
// Memory limit for fault tolerant queries should only be enforced by the MemoryPool.
// LowMemoryKiller is responsible for freeing up the MemoryPool if necessary.
queryContext.initializeMemoryLimits(false, /* unlimited */ Long.MAX_VALUE);
}
else {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();

// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode));
// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode));
}
}

sqlTask.recordHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.memory.LowMemoryKiller.QueryMemoryInfo;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.operator.RetryPolicy;
import io.trino.server.BasicQueryInfo;
import io.trino.server.ServerConfig;
import io.trino.spi.QueryId;
Expand Down Expand Up @@ -76,6 +77,7 @@
import static io.trino.SystemSessionProperties.RESOURCE_OVERCOMMIT;
import static io.trino.SystemSessionProperties.getQueryMaxMemory;
import static io.trino.SystemSessionProperties.getQueryMaxTotalMemory;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
Expand Down Expand Up @@ -191,6 +193,14 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
boolean resourceOvercommit = resourceOvercommit(query.getSession());
long userMemoryReservation = query.getUserMemoryReservation().toBytes();
long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
totalUserMemoryBytes += userMemoryReservation;
totalMemoryBytes += totalMemoryReservation;

if (getRetryPolicy(query.getSession()) == RetryPolicy.TASK) {
// Memory limit for fault tolerant queries should only be enforced by the MemoryPool.
// LowMemoryKiller is responsible for freeing up the MemoryPool if necessary.
continue;
}

if (resourceOvercommit && outOfMemory) {
// If a query has requested resource overcommit, only kill it if the cluster has run out of memory
Expand All @@ -213,9 +223,6 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
queryKilled = true;
}
}

totalUserMemoryBytes += userMemoryReservation;
totalMemoryBytes += totalMemoryReservation;
}

clusterUserMemoryReservation.set(totalUserMemoryBytes);
Expand Down
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/admin/properties-resource-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ by the hash tables built during execution, memory used during sorting, etc.
When the user memory allocation of a query on any worker hits this limit,
it is killed.

.. note::

Does not apply for queries with task level retries enabled (``retry-policy=TASK``)

``query.max-memory``
^^^^^^^^^^^^^^^^^^^^

Expand All @@ -37,6 +41,10 @@ by the hash tables built during execution, memory used during sorting, etc.
When the user memory allocation of a query across all workers hits this limit
it is killed.

.. note::

Does not apply for queries with task level retries enabled (``retry-policy=TASK``)

``query.max-total-memory``
^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -48,6 +56,10 @@ including revocable memory. When the memory allocated by a query across all
workers hits this limit it is killed. The value of ``query.max-total-memory``
must be greater than ``query.max-memory``.

.. note::

Does not apply for queries with task level retries enabled (``retry-policy=TASK``)

``memory.heap-headroom-per-node``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down