Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize
format("Query exceeded per-node memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededTaskMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
{
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
format("Query exceeded per-task memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

private ExceededMemoryLimitException(StandardErrorCode errorCode, String message)
{
super(errorCode, message);
Expand Down
56 changes: 39 additions & 17 deletions core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public final class SystemSessionProperties
public static final String ENABLE_COORDINATOR_DYNAMIC_FILTERS_DISTRIBUTION = "enable_coordinator_dynamic_filters_distribution";
public static final String ENABLE_LARGE_DYNAMIC_FILTERS = "enable_large_dynamic_filters";
public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node";
public static final String QUERY_MAX_MEMORY_PER_TASK = "query_max_memory_per_task";
public static final String IGNORE_DOWNSTREAM_PREFERENCES = "ignore_downstream_preferences";
public static final String FILTERING_SEMI_JOIN_TO_INNER = "rewrite_filtering_semi_join_to_inner_join";
public static final String OPTIMIZE_DUPLICATE_INSENSITIVE_JOINS = "optimize_duplicate_insensitive_joins";
Expand All @@ -146,14 +145,17 @@ public final class SystemSessionProperties
public static final String INCREMENTAL_HASH_ARRAY_LOAD_FACTOR_ENABLED = "incremental_hash_array_load_factor_enabled";
public static final String MAX_PARTIAL_TOP_N_MEMORY = "max_partial_top_n_memory";
public static final String RETRY_POLICY = "retry_policy";
public static final String RETRY_ATTEMPTS = "retry_attempts";
public static final String QUERY_RETRY_ATTEMPTS = "query_retry_attempts";
public static final String TASK_RETRY_ATTEMPTS_OVERALL = "task_retry_attempts_overall";
public static final String TASK_RETRY_ATTEMPTS_PER_TASK = "task_retry_attempts_per_task";
public static final String RETRY_INITIAL_DELAY = "retry_initial_delay";
public static final String RETRY_MAX_DELAY = "retry_max_delay";
public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE = "fault_tolerant_execution_target_task_input_size";
public static final String FAULT_TOLERANT_EXECUTION_MIN_TASK_SPLIT_COUNT = "fault_tolerant_execution_min_task_split_count";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT = "fault_tolerant_execution_target_task_split_count";
public static final String FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT = "fault_tolerant_execution_max_task_split_count";
public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY = "fault_tolerant_execution_task_memory";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -597,11 +599,6 @@ public SystemSessionProperties(
"Maximum amount of memory a query can use per node",
nodeMemoryConfig.getMaxQueryMemoryPerNode(),
true),
dataSizeProperty(
QUERY_MAX_MEMORY_PER_TASK,
"Maximum amount of memory a single task can use",
nodeMemoryConfig.getMaxQueryMemoryPerTask().orElse(null),
true),
booleanProperty(
IGNORE_DOWNSTREAM_PREFERENCES,
"Ignore Parent's PreferredProperties in AddExchange optimizer",
Expand Down Expand Up @@ -688,9 +685,19 @@ public SystemSessionProperties(
queryManagerConfig.getRetryPolicy(),
false),
integerProperty(
RETRY_ATTEMPTS,
"Maximum number of retry attempts",
queryManagerConfig.getRetryAttempts(),
QUERY_RETRY_ATTEMPTS,
"Maximum number of query retry attempts",
queryManagerConfig.getQueryRetryAttempts(),
false),
integerProperty(
TASK_RETRY_ATTEMPTS_OVERALL,
"Maximum number of task retry attempts overall",
queryManagerConfig.getTaskRetryAttemptsOverall(),
false),
integerProperty(
TASK_RETRY_ATTEMPTS_PER_TASK,
"Maximum number of task retry attempts per single task",
queryManagerConfig.getTaskRetryAttemptsPerTask(),
false),
durationProperty(
RETRY_INITIAL_DELAY,
Expand Down Expand Up @@ -727,6 +734,11 @@ public SystemSessionProperties(
FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT,
"Maximal number of splits for a single fault tolerant task (count based)",
queryManagerConfig.getFaultTolerantExecutionMaxTaskSplitCount(),
false),
dataSizeProperty(
FAULT_TOLERANT_EXECUTION_TASK_MEMORY,
"Estimated amount of memory a single task will use when task level retries are used; value is used allocating nodes for tasks execution",
memoryManagerConfig.getFaultTolerantTaskMemory(),
false));
}

Expand Down Expand Up @@ -1178,11 +1190,6 @@ public static DataSize getQueryMaxMemoryPerNode(Session session)
return session.getSystemProperty(QUERY_MAX_MEMORY_PER_NODE, DataSize.class);
}

public static Optional<DataSize> getQueryMaxTotalMemoryPerTask(Session session)
{
return Optional.ofNullable(session.getSystemProperty(QUERY_MAX_MEMORY_PER_TASK, DataSize.class));
}

public static boolean ignoreDownStreamPreferences(Session session)
{
return session.getSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, Boolean.class);
Expand Down Expand Up @@ -1269,9 +1276,19 @@ public static RetryPolicy getRetryPolicy(Session session)
return retryPolicy;
}

public static int getRetryAttempts(Session session)
public static int getQueryRetryAttempts(Session session)
{
return session.getSystemProperty(QUERY_RETRY_ATTEMPTS, Integer.class);
}

public static int getTaskRetryAttemptsOverall(Session session)
{
return session.getSystemProperty(TASK_RETRY_ATTEMPTS_OVERALL, Integer.class);
}

public static int getTaskRetryAttemptsPerTask(Session session)
{
return session.getSystemProperty(RETRY_ATTEMPTS, Integer.class);
return session.getSystemProperty(TASK_RETRY_ATTEMPTS_PER_TASK, Integer.class);
}

public static Duration getRetryInitialDelay(Session session)
Expand Down Expand Up @@ -1308,4 +1325,9 @@ public static int getFaultTolerantExecutionMaxTaskSplitCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT, Integer.class);
}

public static DataSize getFaultTolerantExecutionDefaultTaskMemory(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, DataSize.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,17 @@ private PartitionedSplitsInfo getPartitionedSplitsInfo()
private void addTask(RemoteTask task)
{
if (remoteTasks.add(task)) {
// Check if task state is already done before adding the listener
if (task.getTaskStatus().getState().isDone()) {
remoteTasks.remove(task);
return;
}

task.addStateChangeListener(taskStatus -> {
if (taskStatus.getState().isDone()) {
remoteTasks.remove(task);
}
});

// Check if task state is already done before adding the listener
if (task.getTaskStatus().getState().isDone()) {
remoteTasks.remove(task);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public class QueryManagerConfig
private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES);

private RetryPolicy retryPolicy = RetryPolicy.NONE;
private int retryAttempts = 4;
private int queryRetryAttempts = 4;
private int taskRetryAttemptsPerTask = 2;
private int taskRetryAttemptsOverall = Integer.MAX_VALUE;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this config is not needed --- Spark only has taskRetryAttemptsPerTask. Given the number of tasks of different stages can be different, this config may not make too much sense

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super attached to it. And the default value is Integer.MAX_VALUE so by default it is ineffective. I think we can rid of it.
Then we should rename taskRetryAttemptsPerTask to taskRetryAttempts. @arhimondr makes sense?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property can be used as a safeguard from a retry avalanche, when all the tasks suddenly start failing? Though this is more of a hypothetical thinking, not sure how useful is it going to be in practice. I don't really have a strong opinion here. Since it's already there and implemented I'm fine with keeping it. But if you feel like it won't be particularly useful you should also feel free to remove it.

private Duration retryInitialDelay = new Duration(10, SECONDS);
private Duration retryMaxDelay = new Duration(1, MINUTES);

Expand Down Expand Up @@ -414,15 +416,42 @@ public QueryManagerConfig setRetryPolicy(RetryPolicy retryPolicy)
}

@Min(0)
public int getRetryAttempts()
public int getQueryRetryAttempts()
{
return retryAttempts;
return queryRetryAttempts;
}

@Config("retry-attempts")
public QueryManagerConfig setRetryAttempts(int retryAttempts)
@Config("query-retry-attempts")
@LegacyConfig("retry-attempts")
public QueryManagerConfig setQueryRetryAttempts(int queryRetryAttempts)
{
this.retryAttempts = retryAttempts;
this.queryRetryAttempts = queryRetryAttempts;
return this;
}

@Min(0)
public int getTaskRetryAttemptsOverall()
{
return taskRetryAttemptsOverall;
}

@Config("task-retry-attempts-overall")
public QueryManagerConfig setTaskRetryAttemptsOverall(int taskRetryAttemptsOverall)
{
this.taskRetryAttemptsOverall = taskRetryAttemptsOverall;
return this;
}

@Min(0)
public int getTaskRetryAttemptsPerTask()
{
return taskRetryAttemptsPerTask;
}

@Config("task-retry-attempts-per-task")
public QueryManagerConfig setTaskRetryAttemptsPerTask(int taskRetryAttemptsPerTask)
{
this.taskRetryAttemptsPerTask = taskRetryAttemptsPerTask;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.trino.exchange.ExchangeManagerRegistry;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this commit does more. For example you introduce FixedCountNodeAllocatorService and remove FixedCountNodeAllocator. Do these have same functionality?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which commit you have in mind?

import io.trino.execution.QueryPreparer.PreparedQuery;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.PartitionMemoryEstimator;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.SqlQueryScheduler;
import io.trino.execution.scheduler.TaskDescriptorStorage;
Expand Down Expand Up @@ -99,6 +101,8 @@ public class SqlQueryExecution
private final SplitSourceFactory splitSourceFactory;
private final NodePartitioningManager nodePartitioningManager;
private final NodeScheduler nodeScheduler;
private final NodeAllocatorService nodeAllocatorService;
private final PartitionMemoryEstimator partitionMemoryEstimator;
private final List<PlanOptimizer> planOptimizers;
private final PlanFragmenter planFragmenter;
private final RemoteTaskFactory remoteTaskFactory;
Expand Down Expand Up @@ -132,6 +136,8 @@ private SqlQueryExecution(
SplitSourceFactory splitSourceFactory,
NodePartitioningManager nodePartitioningManager,
NodeScheduler nodeScheduler,
NodeAllocatorService nodeAllocatorService,
PartitionMemoryEstimator partitionMemoryEstimator,
List<PlanOptimizer> planOptimizers,
PlanFragmenter planFragmenter,
RemoteTaskFactory remoteTaskFactory,
Expand Down Expand Up @@ -159,6 +165,8 @@ private SqlQueryExecution(
this.splitSourceFactory = requireNonNull(splitSourceFactory, "splitSourceFactory is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");
this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
Expand Down Expand Up @@ -497,6 +505,8 @@ private void planDistribution(PlanRoot plan)
plan.getRoot(),
nodePartitioningManager,
nodeScheduler,
nodeAllocatorService,
partitionMemoryEstimator,
remoteTaskFactory,
plan.isSummarizeTaskInfos(),
scheduleSplitBatchSize,
Expand Down Expand Up @@ -698,6 +708,8 @@ public static class SqlQueryExecutionFactory
private final SplitSourceFactory splitSourceFactory;
private final NodePartitioningManager nodePartitioningManager;
private final NodeScheduler nodeScheduler;
private final NodeAllocatorService nodeAllocatorService;
private final PartitionMemoryEstimator partitionMemoryEstimator;
private final List<PlanOptimizer> planOptimizers;
private final PlanFragmenter planFragmenter;
private final RemoteTaskFactory remoteTaskFactory;
Expand All @@ -724,6 +736,8 @@ public static class SqlQueryExecutionFactory
SplitSourceFactory splitSourceFactory,
NodePartitioningManager nodePartitioningManager,
NodeScheduler nodeScheduler,
NodeAllocatorService nodeAllocatorService,
PartitionMemoryEstimator partitionMemoryEstimator,
PlanOptimizersFactory planOptimizersFactory,
PlanFragmenter planFragmenter,
RemoteTaskFactory remoteTaskFactory,
Expand Down Expand Up @@ -751,6 +765,8 @@ public static class SqlQueryExecutionFactory
this.splitSourceFactory = requireNonNull(splitSourceFactory, "splitSourceFactory is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null");
this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
Expand Down Expand Up @@ -790,6 +806,8 @@ public QueryExecution createQueryExecution(
splitSourceFactory,
nodePartitioningManager,
nodeScheduler,
nodeAllocatorService,
partitionMemoryEstimator,
planOptimizers,
planFragmenter,
remoteTaskFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static io.trino.SystemSessionProperties.getQueryMaxTotalMemoryPerTask;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.SqlTask.createSqlTask;
Expand Down Expand Up @@ -105,7 +104,6 @@ public class SqlTaskManager
private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats();

private final long queryMaxMemoryPerNode;
private final Optional<DataSize> queryMaxMemoryPerTask;

private final CounterStat failedTasks = new CounterStat();

Expand Down Expand Up @@ -144,13 +142,12 @@ public SqlTaskManager(
SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskNotificationExecutor, taskExecutor, planner, splitMonitor, config);

DataSize maxQueryMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
queryMaxMemoryPerTask = nodeMemoryConfig.getMaxQueryMemoryPerTask();
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();

queryMaxMemoryPerNode = maxQueryMemoryPerNode.toBytes();

queryContexts = buildNonEvictableCache(CacheBuilder.newBuilder().weakValues(), CacheLoader.from(
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, queryMaxMemoryPerTask, maxQuerySpillPerNode)));
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQuerySpillPerNode)));

tasks = buildNonEvictableCache(CacheBuilder.newBuilder(), CacheLoader.from(
taskId -> createSqlTask(
Expand All @@ -173,13 +170,11 @@ private QueryContext createQueryContext(
LocalSpillManager localSpillManager,
GcMonitor gcMonitor,
DataSize maxQueryUserMemoryPerNode,
Optional<DataSize> maxQueryMemoryPerTask,
DataSize maxQuerySpillPerNode)
{
return new QueryContext(
queryId,
maxQueryUserMemoryPerNode,
maxQueryMemoryPerTask,
localMemoryManager.getMemoryPool(),
gcMonitor,
taskNotificationExecutor,
Expand Down Expand Up @@ -401,17 +396,10 @@ private TaskInfo doUpdateTask(
if (!queryContext.isMemoryLimitsInitialized()) {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();

Optional<DataSize> effectiveQueryMaxMemoryPerTask = getQueryMaxTotalMemoryPerTask(session);
if (queryMaxMemoryPerTask.isPresent() &&
(effectiveQueryMaxMemoryPerTask.isEmpty() || effectiveQueryMaxMemoryPerTask.get().toBytes() > queryMaxMemoryPerTask.get().toBytes())) {
effectiveQueryMaxMemoryPerTask = queryMaxMemoryPerTask;
}

// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode),
effectiveQueryMaxMemoryPerTask);
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode));
}

sqlTask.recordHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.scheduler;

import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.spi.ErrorCode;

public class ConstantPartitionMemoryEstimator
implements PartitionMemoryEstimator
{
@Override
public MemoryRequirements getInitialMemoryRequirements(Session session, DataSize defaultMemoryLimit)
{
return new MemoryRequirements(
defaultMemoryLimit,
true);
}

@Override
public MemoryRequirements getNextRetryMemoryRequirements(Session session, MemoryRequirements previousMemoryRequirements, ErrorCode errorCode)
{
return previousMemoryRequirements;
}
}
Loading