Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
Expand Down Expand Up @@ -122,7 +123,7 @@ public class ClusterMemoryManager
private long lastTimeNotOutOfMemory = System.nanoTime();

@GuardedBy("this")
private KillTarget lastKillTarget;
private Optional<KillTarget> lastKillTarget = Optional.empty();

@Inject
public ClusterMemoryManager(
Expand Down Expand Up @@ -232,7 +233,7 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
outOfMemory &&
!queryKilled &&
nanosSince(lastTimeNotOutOfMemory).compareTo(killOnOutOfMemoryDelay) > 0) {
if (isLastKillTargetGone(runningQueries)) {
if (isLastKillTargetGone()) {
callOomKiller(runningQueries);
}
else {
Expand All @@ -250,12 +251,13 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
.map(this::createQueryMemoryInfo)
.collect(toImmutableList());

List<MemoryInfo> nodeMemoryInfos = nodes.values().stream()
.map(RemoteNodeMemory::getInfo)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());
Map<String, MemoryInfo> nodeMemoryInfosByNode = nodes.entrySet().stream()
.filter(entry -> entry.getValue().getInfo().isPresent())
.collect(toImmutableMap(
Entry::getKey,
entry -> entry.getValue().getInfo().get()));

List<MemoryInfo> nodeMemoryInfos = ImmutableList.copyOf(nodeMemoryInfosByNode.values());
Optional<KillTarget> killTarget = lowMemoryKiller.chooseQueryToKill(queryMemoryInfoList, nodeMemoryInfos);

if (killTarget.isPresent()) {
Expand All @@ -267,8 +269,8 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
// See comments in isQueryGone for why chosenQuery might be absent.
chosenQuery.get().fail(new TrinoException(CLUSTER_OUT_OF_MEMORY, "Query killed because the cluster is out of memory. Please try again in a few minutes."));
queriesKilledDueToOutOfMemory.incrementAndGet();
lastKillTarget = killTarget.get();
logQueryKill(queryId, nodeMemoryInfos);
lastKillTarget = killTarget;
logQueryKill(queryId, nodeMemoryInfosByNode);
}
}
else {
Expand All @@ -286,25 +288,25 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
// only record tasks actually killed
ImmutableSet<TaskId> killedTasks = killedTasksBuilder.build();
if (!killedTasks.isEmpty()) {
lastKillTarget = KillTarget.selectedTasks(killedTasks);
logTasksKill(killedTasks, nodeMemoryInfos);
lastKillTarget = Optional.of(KillTarget.selectedTasks(killedTasks));
logTasksKill(killedTasks, nodeMemoryInfosByNode);
}
}
}
}

@GuardedBy("this")
private boolean isLastKillTargetGone(Iterable<QueryExecution> runningQueries)
private boolean isLastKillTargetGone()
{
if (lastKillTarget == null) {
if (lastKillTarget.isEmpty()) {
return true;
}

if (lastKillTarget.isWholeQuery()) {
return isQueryGone(lastKillTarget.getQuery());
if (lastKillTarget.get().isWholeQuery()) {
return isQueryGone(lastKillTarget.get().getQuery());
}

return areTasksGone(lastKillTarget.getTasks(), runningQueries);
return areTasksGone(lastKillTarget.get().getTasks());
}

private boolean isQueryGone(QueryId killedQuery)
Expand All @@ -314,7 +316,7 @@ private boolean isQueryGone(QueryId killedQuery)
// Even if the weak references to the leaked queries are GCed in the ClusterMemoryLeakDetector, it will mark the same queries
// as leaked in its next run, and eventually the ClusterMemoryManager will make progress.
if (memoryLeakDetector.wasQueryPossiblyLeaked(killedQuery)) {
lastKillTarget = null;
lastKillTarget = Optional.empty();
return true;
}

Expand All @@ -327,73 +329,68 @@ private boolean isQueryGone(QueryId killedQuery)
.containsKey(killedQuery);
}

private boolean areTasksGone(Set<TaskId> tasks, Iterable<QueryExecution> runningQueries)
private boolean areTasksGone(Set<TaskId> tasks)
{
// We build list of tasks based on MemoryPoolInfo objects, so it is consistent with memory usage reported for nodes.
// This will only contain tasks for queries with task retries enabled - but this is what we want.
Set<TaskId> runningTasks = getRunningTasks();
return tasks.stream().noneMatch(runningTasks::contains);
}

private ImmutableSet<TaskId> getRunningTasks()
{
List<QueryExecution> queryExecutions = tasks.stream()
.map(TaskId::getQueryId)
.distinct()
.map(query -> findRunningQuery(runningQueries, query))
return nodes.values().stream()
.map(RemoteNodeMemory::getInfo)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());

if (queryExecutions.isEmpty()) {
// all queries we care about are gone
return true;
}

Set<TaskId> runningTasks = queryExecutions.stream()
.flatMap(query -> getRunningTasksForQuery(query).stream())
.map(TaskStatus::getTaskId)
.flatMap(memoryInfo -> memoryInfo.getPool().getTaskMemoryReservations().keySet().stream())
.map(TaskId::valueOf)
.collect(toImmutableSet());

return tasks.stream().noneMatch(runningTasks::contains);
}

private Optional<QueryExecution> findRunningQuery(Iterable<QueryExecution> runningQueries, QueryId queryId)
{
return Streams.stream(runningQueries).filter(query -> queryId.equals(query.getQueryId())).collect(toOptional());
}

private void logQueryKill(QueryId killedQueryId, List<MemoryInfo> nodes)
private void logQueryKill(QueryId killedQueryId, Map<String, MemoryInfo> nodeMemoryInfosByNode)
{
if (!log.isInfoEnabled()) {
return;
}
StringBuilder nodeDescription = new StringBuilder();
nodeDescription.append("Query Kill Decision: Killed ").append(killedQueryId).append("\n");
nodeDescription.append(formatKillScenario(nodes));
nodeDescription.append(formatKillScenario(nodeMemoryInfosByNode));
log.info("%s", nodeDescription);
}

private void logTasksKill(Set<TaskId> tasks, List<MemoryInfo> nodes)
private void logTasksKill(Set<TaskId> tasks, Map<String, MemoryInfo> nodeMemoryInfosByNode)
{
if (!log.isInfoEnabled()) {
return;
}
StringBuilder nodeDescription = new StringBuilder();
nodeDescription.append("Query Kill Decision: Tasks Killed ")
.append(tasks)
.append("(")
.append(tasks)
.append(")")
.append("\n");
nodeDescription.append(formatKillScenario(nodes));
nodeDescription.append(formatKillScenario(nodeMemoryInfosByNode));
log.info("%s", nodeDescription);
}

private String formatKillScenario(List<MemoryInfo> nodes)
private String formatKillScenario(Map<String, MemoryInfo> nodes)
{
StringBuilder stringBuilder = new StringBuilder();
for (MemoryInfo nodeMemoryInfo : nodes) {
for (Entry<String, MemoryInfo> entry : nodes.entrySet()) {
String nodeId = entry.getKey();
MemoryInfo nodeMemoryInfo = entry.getValue();
MemoryPoolInfo memoryPoolInfo = nodeMemoryInfo.getPool();
stringBuilder.append("Query Kill Scenario: ");
stringBuilder.append("Node[").append(nodeId).append("]: ");
stringBuilder.append("MaxBytes ").append(memoryPoolInfo.getMaxBytes()).append(' ');
stringBuilder.append("FreeBytes ").append(memoryPoolInfo.getFreeBytes() + memoryPoolInfo.getReservedRevocableBytes()).append(' ');
stringBuilder.append("Queries ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, memoryPoolInfo.getQueryMemoryReservations());
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, memoryPoolInfo.getQueryMemoryReservations()).append((' '));
stringBuilder.append("Tasks ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, nodeMemoryInfo.getTasksMemoryInfo().asMap());
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, memoryPoolInfo.getTaskMemoryReservations());
stringBuilder.append('\n');
}
return stringBuilder.toString();
Expand All @@ -412,7 +409,10 @@ private boolean isClusterOutOfMemory()

private QueryMemoryInfo createQueryMemoryInfo(QueryExecution query)
{
return new QueryMemoryInfo(query.getQueryId(), query.getTotalMemoryReservation().toBytes());
return new QueryMemoryInfo(
query.getQueryId(),
query.getTotalMemoryReservation().toBytes(),
getRetryPolicy(query.getSession()));
}

private List<TaskStatus> getRunningTasksForQuery(QueryExecution query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public synchronized MemoryPoolInfo getInfo()
reservedRevocableDistributedBytes,
ImmutableMap.copyOf(queryMemoryReservations),
ImmutableMap.copyOf(queryMemoryAllocations),
ImmutableMap.copyOf(queryMemoryRevocableReservations));
ImmutableMap.copyOf(queryMemoryRevocableReservations),
// not providing per-task memory info for cluster-wide pool
ImmutableMap.of(),
ImmutableMap.of());
}

@Managed
Expand Down
43 changes: 10 additions & 33 deletions core/trino-main/src/main/java/io/trino/memory/LowMemoryKiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package io.trino.memory;

import io.trino.execution.TaskId;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;

import java.util.List;
Expand All @@ -31,11 +31,13 @@ class QueryMemoryInfo
{
private final QueryId queryId;
private final long memoryReservation;
private final RetryPolicy retryPolicy;

public QueryMemoryInfo(QueryId queryId, long memoryReservation)
public QueryMemoryInfo(QueryId queryId, long memoryReservation, RetryPolicy retryPolicy)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.memoryReservation = memoryReservation;
this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null");
}

public QueryId getQueryId()
Expand All @@ -48,44 +50,19 @@ public long getMemoryReservation()
return memoryReservation;
}

public RetryPolicy getRetryPolicy()
{
return retryPolicy;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("queryId", queryId)
.add("memoryReservation", memoryReservation)
.add("retryPolicy", retryPolicy)
.toString();
}

public static class TaskMemoryInfo
{
private final TaskId taskId;
private final long memoryReservation;

public TaskMemoryInfo(TaskId taskId, long memoryReservation)
{
this.taskId = requireNonNull(taskId, "taskId is null");
this.memoryReservation = memoryReservation;
}

public TaskId getTaskId()
{
return taskId;
}

public long getMemoryReservation()
{
return memoryReservation;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("taskId", taskId)
.add("memoryReservation", memoryReservation)
.toString();
}
}
}
}
28 changes: 1 addition & 27 deletions core/trino-main/src/main/java/io/trino/memory/MemoryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.trino.TaskMemoryInfo;
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryPoolInfo;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -28,24 +24,14 @@ public class MemoryInfo
{
private final int availableProcessors;
private final MemoryPoolInfo pool;
private final ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfo;

public MemoryInfo(
int availableProcessors,
MemoryPoolInfo pool)
{
this(availableProcessors, pool, ImmutableListMultimap.of());
}

@JsonCreator
public MemoryInfo(
@JsonProperty("availableProcessors") int availableProcessors,
@JsonProperty("pool") MemoryPoolInfo pool,
@JsonProperty("tasksMemoryInfo") ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfo)
@JsonProperty("pool") MemoryPoolInfo pool)
{
this.availableProcessors = availableProcessors;
this.pool = requireNonNull(pool, "pool is null");
this.tasksMemoryInfo = ImmutableListMultimap.copyOf(requireNonNull(tasksMemoryInfo, "tasksMemoryInfo is null"));
}

@JsonProperty
Expand All @@ -60,24 +46,12 @@ public MemoryPoolInfo getPool()
return pool;
}

@JsonProperty
public ListMultimap<QueryId, TaskMemoryInfo> getTasksMemoryInfo()
{
return tasksMemoryInfo;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("availableProcessors", availableProcessors)
.add("pool", pool)
.add("tasksMemoryInfo", tasksMemoryInfo)
.toString();
}

public MemoryInfo withTasksMemoryInfo(ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfo)
{
return new MemoryInfo(availableProcessors, pool, tasksMemoryInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -60,7 +59,6 @@ public MemoryManagerConfig setLowMemoryKillerPolicy(LowMemoryKillerPolicy lowMem
}

@NotNull
@MinDuration("5s")
public Duration getKillOnOutOfMemoryDelay()
{
return killOnOutOfMemoryDelay;
Expand Down
Loading