Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public final class SystemSessionProperties
public static final String MAX_PARTIAL_TOP_N_MEMORY = "max_partial_top_n_memory";
public static final String RETRY_POLICY = "retry_policy";
public static final String QUERY_RETRY_ATTEMPTS = "query_retry_attempts";
public static final String TASK_RETRY_ATTEMPTS_OVERALL = "task_retry_attempts_overall";
public static final String TASK_RETRY_ATTEMPTS_PER_TASK = "task_retry_attempts_per_task";
public static final String MAX_TASKS_WAITING_FOR_EXECUTION_PER_QUERY = "max_tasks_waiting_for_execution_per_query";
public static final String MAX_TASKS_WAITING_FOR_NODE_PER_STAGE = "max_tasks_waiting_for_node_per_stage";
Expand All @@ -161,7 +160,6 @@ public final class SystemSessionProperties
public static final String RETRY_DELAY_SCALE_FACTOR = "retry_delay_scale_factor";
public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE = "fault_tolerant_execution_target_task_input_size";
public static final String FAULT_TOLERANT_EXECUTION_MIN_TASK_SPLIT_COUNT = "fault_tolerant_execution_min_task_split_count";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT = "fault_tolerant_execution_target_task_split_count";
public static final String FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT = "fault_tolerant_execution_max_task_split_count";
public static final String FAULT_TOLERANT_EXECUTION_COORDINATOR_TASK_MEMORY = "fault_tolerant_execution_coordinator_task_memory";
Expand All @@ -175,7 +173,6 @@ public final class SystemSessionProperties
public static final String JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT = "join_partitioned_build_min_row_count";
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED = "fault_tolerant_execution_event_driven_scheduler_enabled";
public static final String FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED = "fault_tolerant_execution_force_preferred_write_partitioning_enabled";
public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size";

Expand Down Expand Up @@ -739,11 +736,6 @@ public SystemSessionProperties(
"Maximum number of query retry attempts",
queryManagerConfig.getQueryRetryAttempts(),
false),
integerProperty(
TASK_RETRY_ATTEMPTS_OVERALL,
"Maximum number of task retry attempts overall",
queryManagerConfig.getTaskRetryAttemptsOverall(),
false),
integerProperty(
TASK_RETRY_ATTEMPTS_PER_TASK,
"Maximum number of task retry attempts per single task",
Expand Down Expand Up @@ -799,11 +791,6 @@ public SystemSessionProperties(
"Target size in bytes of all task inputs for a single fault tolerant task",
queryManagerConfig.getFaultTolerantExecutionTargetTaskInputSize(),
false),
integerProperty(
FAULT_TOLERANT_EXECUTION_MIN_TASK_SPLIT_COUNT,
"Minimal number of splits for a single fault tolerant task (count based)",
queryManagerConfig.getFaultTolerantExecutionMinTaskSplitCount(),
false),
integerProperty(
FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT,
"Target number of splits for a single fault tolerant task (split weight aware)",
Expand Down Expand Up @@ -871,11 +858,6 @@ public SystemSessionProperties(
"Force the usage of spliing join operator in favor of the non-spilling one, even if spill is not enabled",
featuresConfig.isForceSpillingJoin(),
false),
booleanProperty(
FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED,
"Enable event driven scheduler for fault tolerant execution",
queryManagerConfig.isFaultTolerantExecutionEventDrivenSchedulerEnabled(),
true),
booleanProperty(
FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED,
"Force preferred write partitioning for fault tolerant execution",
Expand Down Expand Up @@ -1448,11 +1430,6 @@ public static int getQueryRetryAttempts(Session session)
return session.getSystemProperty(QUERY_RETRY_ATTEMPTS, Integer.class);
}

public static int getTaskRetryAttemptsOverall(Session session)
{
return session.getSystemProperty(TASK_RETRY_ATTEMPTS_OVERALL, Integer.class);
}

public static int getTaskRetryAttemptsPerTask(Session session)
{
return session.getSystemProperty(TASK_RETRY_ATTEMPTS_PER_TASK, Integer.class);
Expand Down Expand Up @@ -1493,11 +1470,6 @@ public static DataSize getFaultTolerantExecutionTargetTaskInputSize(Session sess
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE, DataSize.class);
}

public static int getFaultTolerantExecutionMinTaskSplitCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_TASK_SPLIT_COUNT, Integer.class);
}

public static int getFaultTolerantExecutionTargetTaskSplitCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT, Integer.class);
Expand Down Expand Up @@ -1563,17 +1535,8 @@ public static boolean isForceSpillingOperator(Session session)
return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class);
}

public static boolean isFaultTolerantExecutionEventDriverSchedulerEnabled(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED, Boolean.class);
}

public static boolean isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(Session session)
{
if (!isFaultTolerantExecutionEventDriverSchedulerEnabled(session)) {
// supported only in event driven scheduler
return false;
}
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED, Boolean.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public class QueryManagerConfig
private RetryPolicy retryPolicy = RetryPolicy.NONE;
private int queryRetryAttempts = 4;
private int taskRetryAttemptsPerTask = 4;
private int taskRetryAttemptsOverall = Integer.MAX_VALUE;
private Duration retryInitialDelay = new Duration(10, SECONDS);
private Duration retryMaxDelay = new Duration(1, MINUTES);
private double retryDelayScaleFactor = 2.0;
Expand All @@ -94,12 +93,10 @@ public class QueryManagerConfig

private DataSize faultTolerantExecutionTargetTaskInputSize = DataSize.of(4, GIGABYTE);

private int faultTolerantExecutionMinTaskSplitCount = 16;
private int faultTolerantExecutionTargetTaskSplitCount = 64;
private int faultTolerantExecutionMaxTaskSplitCount = 256;
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15));
private int faultTolerantExecutionPartitionCount = 50;
private boolean faultTolerantExecutionEventDrivenSchedulerEnabled = true;
private boolean faultTolerantExecutionForcePreferredWritePartitioningEnabled = true;

@Min(1)
Expand Down Expand Up @@ -455,19 +452,6 @@ public QueryManagerConfig setQueryRetryAttempts(int queryRetryAttempts)
return this;
}

@Min(0)
public int getTaskRetryAttemptsOverall()
{
return taskRetryAttemptsOverall;
}

@Config("task-retry-attempts-overall")
public QueryManagerConfig setTaskRetryAttemptsOverall(int taskRetryAttemptsOverall)
{
this.taskRetryAttemptsOverall = taskRetryAttemptsOverall;
return this;
}

@Min(0)
@Max(MAX_TASK_RETRY_ATTEMPTS)
public int getTaskRetryAttemptsPerTask()
Expand Down Expand Up @@ -567,20 +551,6 @@ public QueryManagerConfig setFaultTolerantExecutionTargetTaskInputSize(DataSize
return this;
}

@Min(1)
public int getFaultTolerantExecutionMinTaskSplitCount()
{
return faultTolerantExecutionMinTaskSplitCount;
}

@Config("fault-tolerant-execution-min-task-split-count")
@ConfigDescription("Minimal number of splits for a single fault tolerant task (count based)")
public QueryManagerConfig setFaultTolerantExecutionMinTaskSplitCount(int faultTolerantExecutionMinTaskSplitCount)
{
this.faultTolerantExecutionMinTaskSplitCount = faultTolerantExecutionMinTaskSplitCount;
return this;
}

@Min(1)
public int getFaultTolerantExecutionTargetTaskSplitCount()
{
Expand Down Expand Up @@ -637,18 +607,6 @@ public QueryManagerConfig setFaultTolerantExecutionPartitionCount(int faultToler
return this;
}

public boolean isFaultTolerantExecutionEventDrivenSchedulerEnabled()
{
return faultTolerantExecutionEventDrivenSchedulerEnabled;
}

@Config("experimental.fault-tolerant-execution-event-driven-scheduler-enabled")
public QueryManagerConfig setFaultTolerantExecutionEventDrivenSchedulerEnabled(boolean faultTolerantExecutionEventDrivenSchedulerEnabled)
{
this.faultTolerantExecutionEventDrivenSchedulerEnabled = faultTolerantExecutionEventDrivenSchedulerEnabled;
return this;
}

public boolean isFaultTolerantExecutionForcePreferredWritePartitioningEnabled()
{
return faultTolerantExecutionForcePreferredWritePartitioningEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.scheduler.EventDrivenFaultTolerantQueryScheduler;
import io.trino.execution.scheduler.EventDrivenTaskSourceFactory;
import io.trino.execution.scheduler.FaultTolerantQueryScheduler;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory;
Expand All @@ -35,7 +34,6 @@
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.TaskDescriptorStorage;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
import io.trino.execution.warnings.WarningCollector;
import io.trino.failuredetector.FailureDetector;
Expand Down Expand Up @@ -86,12 +84,8 @@
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.SystemSessionProperties.getMaxTasksWaitingForNodePerStage;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.getTaskRetryAttemptsOverall;
import static io.trino.SystemSessionProperties.getTaskRetryAttemptsPerTask;
import static io.trino.SystemSessionProperties.isEnableDynamicFiltering;
import static io.trino.SystemSessionProperties.isFaultTolerantExecutionEventDriverSchedulerEnabled;
import static io.trino.execution.ParameterExtractor.bindParameters;
import static io.trino.execution.QueryState.FAILED;
import static io.trino.execution.QueryState.PLANNING;
Expand Down Expand Up @@ -135,7 +129,6 @@ public class SqlQueryExecution
private final TypeAnalyzer typeAnalyzer;
private final SqlTaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;
private final EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory;
private final TaskDescriptorStorage taskDescriptorStorage;

Expand Down Expand Up @@ -169,7 +162,6 @@ private SqlQueryExecution(
TypeAnalyzer typeAnalyzer,
SqlTaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory,
EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
{
Expand Down Expand Up @@ -217,7 +209,6 @@ private SqlQueryExecution(
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.taskSourceFactory = requireNonNull(taskSourceFactory, "taskSourceFactory is null");
this.eventDrivenTaskSourceFactory = requireNonNull(eventDrivenTaskSourceFactory, "taskSourceFactory is null");
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
}
Expand Down Expand Up @@ -527,51 +518,25 @@ private void planDistribution(PlanRoot plan)
coordinatorTaskManager);
break;
case TASK:
if (isFaultTolerantExecutionEventDriverSchedulerEnabled(stateMachine.getSession())) {
scheduler = new EventDrivenFaultTolerantQueryScheduler(
stateMachine,
plannerContext.getMetadata(),
remoteTaskFactory,
taskDescriptorStorage,
eventDrivenTaskSourceFactory,
plan.isSummarizeTaskInfos(),
nodeTaskMap,
queryExecutor,
schedulerExecutor,
schedulerStats,
partitionMemoryEstimatorFactory,
nodePartitioningManager,
exchangeManagerRegistry.getExchangeManager(),
nodeAllocatorService,
failureDetector,
dynamicFilterService,
taskExecutionStats,
plan.getRoot());
}
else {
scheduler = new FaultTolerantQueryScheduler(
stateMachine,
queryExecutor,
schedulerStats,
failureDetector,
taskSourceFactory,
taskDescriptorStorage,
exchangeManagerRegistry.getExchangeManager(),
nodePartitioningManager,
getTaskRetryAttemptsOverall(getSession()),
getTaskRetryAttemptsPerTask(getSession()),
getMaxTasksWaitingForNodePerStage(getSession()),
schedulerExecutor,
nodeAllocatorService,
partitionMemoryEstimatorFactory,
taskExecutionStats,
dynamicFilterService,
plannerContext.getMetadata(),
remoteTaskFactory,
nodeTaskMap,
plan.getRoot(),
plan.isSummarizeTaskInfos());
}
scheduler = new EventDrivenFaultTolerantQueryScheduler(
stateMachine,
plannerContext.getMetadata(),
remoteTaskFactory,
taskDescriptorStorage,
eventDrivenTaskSourceFactory,
plan.isSummarizeTaskInfos(),
nodeTaskMap,
queryExecutor,
schedulerExecutor,
schedulerStats,
partitionMemoryEstimatorFactory,
nodePartitioningManager,
exchangeManagerRegistry.getExchangeManager(),
nodeAllocatorService,
failureDetector,
dynamicFilterService,
taskExecutionStats,
plan.getRoot());
break;
default:
throw new IllegalArgumentException("Unexpected retry policy: " + retryPolicy);
Expand Down Expand Up @@ -777,7 +742,6 @@ public static class SqlQueryExecutionFactory
private final TypeAnalyzer typeAnalyzer;
private final SqlTaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;
private final EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory;
private final TaskDescriptorStorage taskDescriptorStorage;

Expand Down Expand Up @@ -808,7 +772,6 @@ public static class SqlQueryExecutionFactory
TypeAnalyzer typeAnalyzer,
SqlTaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory,
EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
{
Expand Down Expand Up @@ -837,7 +800,6 @@ public static class SqlQueryExecutionFactory
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.taskSourceFactory = requireNonNull(taskSourceFactory, "taskSourceFactory is null");
this.eventDrivenTaskSourceFactory = requireNonNull(eventDrivenTaskSourceFactory, "eventDrivenTaskSourceFactory is null");
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
}
Expand Down Expand Up @@ -883,7 +845,6 @@ public QueryExecution createQueryExecution(
typeAnalyzer,
coordinatorTaskManager,
exchangeManagerRegistry,
taskSourceFactory,
eventDrivenTaskSourceFactory,
taskDescriptorStorage);
}
Expand Down
Loading