Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.execution.LocationFactory;
import io.trino.execution.QueryExecution;
import io.trino.execution.QueryInfo;
Expand Down Expand Up @@ -72,7 +71,6 @@
import static com.google.common.collect.Sets.difference;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.DataSize.succinctBytes;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.ExceededMemoryLimitException.exceededGlobalTotalLimit;
import static io.trino.ExceededMemoryLimitException.exceededGlobalUserLimit;
import static io.trino.SystemSessionProperties.RESOURCE_OVERCOMMIT;
Expand Down Expand Up @@ -103,7 +101,6 @@ public class ClusterMemoryManager
private final DataSize maxQueryMemory;
private final DataSize maxQueryTotalMemory;
private final List<LowMemoryKiller> lowMemoryKillers;
private final Duration killOnOutOfMemoryDelay;
private final AtomicLong totalAvailableProcessors = new AtomicLong();
private final AtomicLong clusterUserMemoryReservation = new AtomicLong();
private final AtomicLong clusterTotalMemoryReservation = new AtomicLong();
Expand All @@ -119,9 +116,6 @@ public class ClusterMemoryManager

private final ClusterMemoryPool pool;

@GuardedBy("this")
private long lastTimeNotOutOfMemory = System.nanoTime();

@GuardedBy("this")
private Optional<KillTarget> lastKillTarget = Optional.empty();

Expand Down Expand Up @@ -151,7 +145,6 @@ public ClusterMemoryManager(
queryLowMemoryKiller);
this.maxQueryMemory = config.getMaxQueryMemory();
this.maxQueryTotalMemory = config.getMaxQueryTotalMemory();
this.killOnOutOfMemoryDelay = config.getKillOnOutOfMemoryDelay();

verify(maxQueryMemory.toBytes() <= maxQueryTotalMemory.toBytes(),
"maxQueryMemory cannot be greater than maxQueryTotalMemory");
Expand Down Expand Up @@ -182,9 +175,6 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, pool.getQueryMemoryReservations());

boolean outOfMemory = isClusterOutOfMemory();
if (!outOfMemory) {
lastTimeNotOutOfMemory = System.nanoTime();
}

boolean queryKilled = false;
long totalUserMemoryBytes = 0L;
Expand Down Expand Up @@ -228,10 +218,7 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
clusterUserMemoryReservation.set(totalUserMemoryBytes);
clusterTotalMemoryReservation.set(totalMemoryBytes);

if (!lowMemoryKillers.isEmpty() &&
outOfMemory &&
!queryKilled &&
nanosSince(lastTimeNotOutOfMemory).compareTo(killOnOutOfMemoryDelay) > 0) {
if (!lowMemoryKillers.isEmpty() && outOfMemory && !queryKilled) {
if (isLastKillTargetGone()) {
callOomKiller(runningQueries);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import jakarta.validation.constraints.NotNull;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig({
"experimental.cluster-memory-manager-enabled",
"query.low-memory-killer.enabled",
"resources.reserved-system-memory"})
"resources.reserved-system-memory",
"query.low-memory-killer.delay"})
public class MemoryManagerConfig
{
// enforced against user memory allocations
Expand All @@ -47,8 +45,6 @@ public class MemoryManagerConfig
private DataSize faultTolerantExecutionEagerSpeculativeTasksNodeMemoryOvercommit = DataSize.of(20, GIGABYTE);
private LowMemoryQueryKillerPolicy lowMemoryQueryKillerPolicy = LowMemoryQueryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private LowMemoryTaskKillerPolicy lowMemoryTaskKillerPolicy = LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
// default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
private Duration killOnOutOfMemoryDelay = new Duration(30, SECONDS);

@NotNull
public DataSize getMaxQueryMemory()
Expand Down Expand Up @@ -199,25 +195,6 @@ public MemoryManagerConfig setLowMemoryTaskKillerPolicy(LowMemoryTaskKillerPolic
return this;
}

@NotNull
public Duration getKillOnOutOfMemoryDelay()
{
return killOnOutOfMemoryDelay;
}

@Config("query.low-memory-killer.delay")
@ConfigDescription("Delay between cluster running low on memory and invoking killer")
public MemoryManagerConfig setKillOnOutOfMemoryDelay(Duration killOnOutOfMemoryDelay)
{
this.killOnOutOfMemoryDelay = killOnOutOfMemoryDelay;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
killOnOutOfMemoryDelay = new Duration(0, MINUTES);
}

public enum LowMemoryQueryKillerPolicy
{
NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,6 @@ protected void setup(Binder binder)
newExporter(binder).export(PauseMeter.class).withGeneratedName();

configBinder(binder).bindConfig(MemoryManagerConfig.class);
if (retryPolicy == TASK) {
configBinder(binder).bindConfigDefaults(MemoryManagerConfig.class, MemoryManagerConfig::applyFaultTolerantExecutionDefaults);
}

configBinder(binder).bindConfig(NodeMemoryConfig.class);
binder.bind(LocalMemoryManager.class).in(Scopes.SINGLETON);
binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.memory.MemoryManagerConfig.LowMemoryQueryKillerPolicy;
import io.trino.memory.MemoryManagerConfig.LowMemoryTaskKillerPolicy;
import org.junit.jupiter.api.Test;
Expand All @@ -27,7 +26,6 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.SECONDS;

public class TestMemoryManagerConfig
{
Expand All @@ -45,8 +43,7 @@ public void testDefaults()
.setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(true)
.setFaultTolerantExecutionEagerSpeculativeTasksNodeMemoryOvercommit(DataSize.of(20, GIGABYTE))
.setLowMemoryQueryKillerPolicy(LowMemoryQueryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES)
.setLowMemoryTaskKillerPolicy(LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES)
.setKillOnOutOfMemoryDelay(new Duration(30, SECONDS)));
.setLowMemoryTaskKillerPolicy(LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES));
}

@Test
Expand All @@ -64,7 +61,6 @@ public void testExplicitPropertyMappings()
.put("fault-tolerant-execution-eager-speculative-tasks-node_memory-overcommit", "21GB")
.put("query.low-memory-killer.policy", "none")
.put("task.low-memory-killer.policy", "none")
.put("query.low-memory-killer.delay", "20s")
.buildOrThrow();

MemoryManagerConfig expected = new MemoryManagerConfig()
Expand All @@ -78,8 +74,7 @@ public void testExplicitPropertyMappings()
.setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(false)
.setFaultTolerantExecutionEagerSpeculativeTasksNodeMemoryOvercommit(DataSize.of(21, GIGABYTE))
.setLowMemoryQueryKillerPolicy(LowMemoryQueryKillerPolicy.NONE)
.setLowMemoryTaskKillerPolicy(LowMemoryTaskKillerPolicy.NONE)
.setKillOnOutOfMemoryDelay(new Duration(20, SECONDS));
.setLowMemoryTaskKillerPolicy(LowMemoryTaskKillerPolicy.NONE);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public void testOutOfMemoryKiller()
throws Exception
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("query.low-memory-killer.delay", "5s")
.put("query.low-memory-killer.policy", "total-reservation")
.buildOrThrow();

Expand Down