Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public final class SystemSessionProperties
public static final String JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT = "join_partitioned_build_min_row_count";
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED = "fault_tolerant_execution_event_driven_scheduler_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -859,7 +860,12 @@ public SystemSessionProperties(
FORCE_SPILLING_JOIN,
"Force the usage of spliing join operator in favor of the non-spilling one, even if spill is not enabled",
featuresConfig.isForceSpillingJoin(),
false));
false),
booleanProperty(
FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED,
"Enable event driven scheduler for fault tolerant execution",
queryManagerConfig.isFaultTolerantExecutionEventDrivenSchedulerEnabled(),
true));
}

@Override
Expand Down Expand Up @@ -1537,4 +1543,9 @@ public static boolean isForceSpillingOperator(Session session)
{
return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class);
}

public static boolean isFaultTolerantExecutionEventDriverSchedulerEnabled(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class QueryManagerConfig
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15));
private int faultTolerantExecutionPartitionCount = 50;
private boolean faultTolerantPreserveInputPartitionsInWriteStage = true;
private boolean faultTolerantExecutionEventDrivenSchedulerEnabled = true;

@Min(1)
public int getScheduleSplitBatchSize()
Expand Down Expand Up @@ -628,4 +629,16 @@ public QueryManagerConfig setFaultTolerantPreserveInputPartitionsInWriteStage(bo
this.faultTolerantPreserveInputPartitionsInWriteStage = faultTolerantPreserveInputPartitionsInWriteStage;
return this;
}

public boolean isFaultTolerantExecutionEventDrivenSchedulerEnabled()
{
return faultTolerantExecutionEventDrivenSchedulerEnabled;
}

@Config("experimental.fault-tolerant-execution-event-driven-scheduler-enabled")
public QueryManagerConfig setFaultTolerantExecutionEventDrivenSchedulerEnabled(boolean faultTolerantExecutionEventDrivenSchedulerEnabled)
{
this.faultTolerantExecutionEventDrivenSchedulerEnabled = faultTolerantExecutionEventDrivenSchedulerEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.QueryPreparer.PreparedQuery;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.scheduler.EventDrivenFaultTolerantQueryScheduler;
import io.trino.execution.scheduler.EventDrivenTaskSourceFactory;
import io.trino.execution.scheduler.FaultTolerantQueryScheduler;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.NodeScheduler;
Expand Down Expand Up @@ -89,6 +91,7 @@
import static io.trino.SystemSessionProperties.getTaskRetryAttemptsOverall;
import static io.trino.SystemSessionProperties.getTaskRetryAttemptsPerTask;
import static io.trino.SystemSessionProperties.isEnableDynamicFiltering;
import static io.trino.SystemSessionProperties.isFaultTolerantExecutionEventDriverSchedulerEnabled;
import static io.trino.execution.QueryState.FAILED;
import static io.trino.execution.QueryState.PLANNING;
import static io.trino.server.DynamicFilterService.DynamicFiltersStats;
Expand Down Expand Up @@ -133,6 +136,7 @@ public class SqlQueryExecution
private final SqlTaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;
private final EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory;
private final TaskDescriptorStorage taskDescriptorStorage;

private SqlQueryExecution(
Expand Down Expand Up @@ -166,6 +170,7 @@ private SqlQueryExecution(
SqlTaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory,
EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
Expand Down Expand Up @@ -213,6 +218,7 @@ private SqlQueryExecution(
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.taskSourceFactory = requireNonNull(taskSourceFactory, "taskSourceFactory is null");
this.eventDrivenTaskSourceFactory = requireNonNull(eventDrivenTaskSourceFactory, "taskSourceFactory is null");
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
}
}
Expand Down Expand Up @@ -521,28 +527,51 @@ private void planDistribution(PlanRoot plan)
coordinatorTaskManager);
break;
case TASK:
scheduler = new FaultTolerantQueryScheduler(
stateMachine,
queryExecutor,
schedulerStats,
failureDetector,
taskSourceFactory,
taskDescriptorStorage,
exchangeManagerRegistry.getExchangeManager(),
nodePartitioningManager,
getTaskRetryAttemptsOverall(getSession()),
getTaskRetryAttemptsPerTask(getSession()),
getMaxTasksWaitingForNodePerStage(getSession()),
schedulerExecutor,
nodeAllocatorService,
partitionMemoryEstimatorFactory,
taskExecutionStats,
dynamicFilterService,
plannerContext.getMetadata(),
remoteTaskFactory,
nodeTaskMap,
plan.getRoot(),
plan.isSummarizeTaskInfos());
if (isFaultTolerantExecutionEventDriverSchedulerEnabled(stateMachine.getSession())) {
scheduler = new EventDrivenFaultTolerantQueryScheduler(
stateMachine,
plannerContext.getMetadata(),
remoteTaskFactory,
taskDescriptorStorage,
eventDrivenTaskSourceFactory,
plan.isSummarizeTaskInfos(),
nodeTaskMap,
queryExecutor,
schedulerExecutor,
schedulerStats,
partitionMemoryEstimatorFactory,
nodePartitioningManager,
exchangeManagerRegistry.getExchangeManager(),
nodeAllocatorService,
failureDetector,
dynamicFilterService,
taskExecutionStats,
plan.getRoot());
}
else {
scheduler = new FaultTolerantQueryScheduler(
stateMachine,
queryExecutor,
schedulerStats,
failureDetector,
taskSourceFactory,
taskDescriptorStorage,
exchangeManagerRegistry.getExchangeManager(),
nodePartitioningManager,
getTaskRetryAttemptsOverall(getSession()),
getTaskRetryAttemptsPerTask(getSession()),
getMaxTasksWaitingForNodePerStage(getSession()),
schedulerExecutor,
nodeAllocatorService,
partitionMemoryEstimatorFactory,
taskExecutionStats,
dynamicFilterService,
plannerContext.getMetadata(),
remoteTaskFactory,
nodeTaskMap,
plan.getRoot(),
plan.isSummarizeTaskInfos());
}
break;
default:
throw new IllegalArgumentException("Unexpected retry policy: " + retryPolicy);
Expand Down Expand Up @@ -749,6 +778,7 @@ public static class SqlQueryExecutionFactory
private final SqlTaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;
private final EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory;
private final TaskDescriptorStorage taskDescriptorStorage;

@Inject
Expand Down Expand Up @@ -779,6 +809,7 @@ public static class SqlQueryExecutionFactory
SqlTaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory,
EventDrivenTaskSourceFactory eventDrivenTaskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
{
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand Down Expand Up @@ -807,6 +838,7 @@ public static class SqlQueryExecutionFactory
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.taskSourceFactory = requireNonNull(taskSourceFactory, "taskSourceFactory is null");
this.eventDrivenTaskSourceFactory = requireNonNull(eventDrivenTaskSourceFactory, "eventDrivenTaskSourceFactory is null");
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
}

Expand Down Expand Up @@ -852,6 +884,7 @@ public QueryExecution createQueryExecution(
coordinatorTaskManager,
exchangeManagerRegistry,
taskSourceFactory,
eventDrivenTaskSourceFactory,
taskDescriptorStorage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public StageId getStageId()
return stateMachine.getStageId();
}

public StageState getState()
{
return stateMachine.getState();
}

public synchronized void finish()
{
if (stateMachine.transitionToFinished()) {
Expand Down
7 changes: 7 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageId.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import io.trino.spi.QueryId;
import io.trino.sql.planner.plan.PlanFragmentId;

import java.util.List;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Integer.parseInt;
import static java.util.Objects.requireNonNull;

public class StageId
Expand All @@ -38,6 +40,11 @@ public static StageId valueOf(List<String> ids)
return new StageId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
}

public static StageId create(QueryId queryId, PlanFragmentId fragmentId)
{
return new StageId(queryId, parseInt(fragmentId.toString()));
}

private final QueryId queryId;
private final int id;

Expand Down
31 changes: 31 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.QueryId;
import io.trino.spi.type.Type;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -152,6 +153,36 @@ public String toString()
.toString();
}

public StageInfo withSubStages(List<StageInfo> subStages)
{
return new StageInfo(
stageId,
state,
plan,
coordinatorOnly,
types,
stageStats,
tasks,
subStages,
tables,
failureCause);
}

public static StageInfo createInitial(QueryId queryId, StageState state, PlanFragment fragment)
{
return new StageInfo(
StageId.create(queryId, fragment.getId()),
state,
fragment,
fragment.getPartitioning().isCoordinatorOnly(),
fragment.getTypes(),
StageStats.createInitial(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
null);
}

public static List<StageInfo> getAllStages(Optional<StageInfo> stageInfo)
{
if (stageInfo.isEmpty()) {
Expand Down
74 changes: 74 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.stats.Distribution;
import io.airlift.stats.Distribution.DistributionSnapshot;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand All @@ -34,9 +35,11 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.trino.execution.StageState.RUNNING;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

@Immutable
public class StageStats
Expand Down Expand Up @@ -658,4 +661,75 @@ public BasicStageStats toBasicStageStats(StageState stageState)
blockedReasons,
progressPercentage);
}

public static StageStats createInitial()
{
DataSize zeroBytes = DataSize.of(0, BYTE);
Duration zeroSeconds = new Duration(0, SECONDS);
return new StageStats(
null,
new Distribution().snapshot(),
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
zeroBytes,
zeroBytes,
zeroBytes,
zeroBytes,
zeroBytes,
zeroSeconds,
zeroSeconds,
zeroSeconds,
zeroSeconds,
zeroSeconds,
false,
ImmutableSet.of(),
zeroBytes,
zeroBytes,
0,
0,
zeroSeconds,
zeroSeconds,
zeroBytes,
zeroBytes,
0,
0,
zeroBytes,
zeroBytes,
0,
0,
zeroBytes,
zeroBytes,
0,
0,
zeroSeconds,
zeroSeconds,
zeroBytes,
Optional.empty(),
zeroBytes,
zeroBytes,
0,
0,
zeroSeconds,
zeroSeconds,
zeroBytes,
zeroBytes,
new StageGcStatistics(
0,
0,
0,
0,
0,
0,
0),
ImmutableList.of());
}
}
Loading