Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ public class FeaturesConfig
static final String SPILL_ENABLED = "spill-enabled";
public static final String SPILLER_SPILL_PATH = "spiller-spill-path";

private boolean groupedExecutionEnabled;
private boolean dynamicScheduleForGroupedExecution;
private int concurrentLifespansPerTask;
private boolean redistributeWrites = true;
private boolean scaleWriters = true;
private DataSize writerMinSize = DataSize.of(32, DataSize.Unit.MEGABYTE);
Expand Down Expand Up @@ -137,47 +134,6 @@ public FeaturesConfig setLegacyRowToJsonCast(boolean legacyRowToJsonCast)
return this;
}

public boolean isGroupedExecutionEnabled()
{
return groupedExecutionEnabled;
}

@Config("grouped-execution-enabled")
@ConfigDescription("Experimental: Use grouped execution when possible")
public FeaturesConfig setGroupedExecutionEnabled(boolean groupedExecutionEnabled)
{
this.groupedExecutionEnabled = groupedExecutionEnabled;
return this;
}

public boolean isDynamicScheduleForGroupedExecutionEnabled()
{
return dynamicScheduleForGroupedExecution;
}

@Config("dynamic-schedule-for-grouped-execution")
@ConfigDescription("Experimental: Use dynamic schedule for grouped execution when possible")
public FeaturesConfig setDynamicScheduleForGroupedExecutionEnabled(boolean dynamicScheduleForGroupedExecution)
{
this.dynamicScheduleForGroupedExecution = dynamicScheduleForGroupedExecution;
return this;
}

@Min(0)
public int getConcurrentLifespansPerTask()
{
return concurrentLifespansPerTask;
}

@Config("concurrent-lifespans-per-task")
@ConfigDescription("Experimental: Default number of lifespans that run in parallel on each task when grouped execution is enabled")
// When set to zero, a limit is not imposed on the number of lifespans that run in parallel
public FeaturesConfig setConcurrentLifespansPerTask(int concurrentLifespansPerTask)
{
this.concurrentLifespansPerTask = concurrentLifespansPerTask;
return this;
}

public boolean isRedistributeWrites()
{
return redistributeWrites;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
Expand All @@ -61,8 +60,6 @@ public final class SystemSessionProperties
public static final String JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR = "join_multi_clause_independence_factor";
public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join";
public static final String HASH_PARTITION_COUNT = "hash_partition_count";
public static final String GROUPED_EXECUTION = "grouped_execution";
public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution";
public static final String PREFER_STREAMING_OPERATORS = "prefer_streaming_operators";
public static final String TASK_WRITER_COUNT = "task_writer_count";
public static final String TASK_CONCURRENCY = "task_concurrency";
Expand All @@ -89,7 +86,6 @@ public final class SystemSessionProperties
public static final String SPATIAL_JOIN = "spatial_join";
public static final String SPATIAL_PARTITIONING_TABLE_NAME = "spatial_partitioning_table_name";
public static final String COLOCATED_JOIN = "colocated_join";
public static final String CONCURRENT_LIFESPANS_PER_NODE = "concurrent_lifespans_per_task";
public static final String JOIN_REORDERING_STRATEGY = "join_reordering_strategy";
public static final String MAX_REORDERED_JOINS = "max_reordered_joins";
public static final String INITIAL_SPLITS_PER_NODE = "initial_splits_per_node";
Expand Down Expand Up @@ -240,16 +236,6 @@ public SystemSessionProperties(
"Number of partitions for distributed joins and aggregations",
queryManagerConfig.getHashPartitionCount(),
false),
booleanProperty(
GROUPED_EXECUTION,
"Use grouped execution when possible",
featuresConfig.isGroupedExecutionEnabled(),
false),
booleanProperty(
DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION,
"Experimental: Use dynamic schedule for grouped execution when possible",
featuresConfig.isDynamicScheduleForGroupedExecutionEnabled(),
false),
booleanProperty(
PREFER_STREAMING_OPERATORS,
"Prefer source table layouts that produce streaming operators",
Expand Down Expand Up @@ -423,11 +409,6 @@ public SystemSessionProperties(
"Name of the table containing spatial partitioning scheme",
null,
false),
integerProperty(
CONCURRENT_LIFESPANS_PER_NODE,
"Experimental: Run a fixed number of groups concurrently for eligible JOINs",
featuresConfig.getConcurrentLifespansPerTask(),
false),
booleanProperty(
SPILL_ENABLED,
"Enable spilling",
Expand Down Expand Up @@ -886,16 +867,6 @@ public static int getHashPartitionCount(Session session)
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
}

public static boolean isGroupedExecutionEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION, Boolean.class);
}

public static boolean isDynamicScheduleForGroupedExecution(Session session)
{
return session.getSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, Boolean.class);
}

public static boolean preferStreamingOperators(Session session)
{
return session.getSystemProperty(PREFER_STREAMING_OPERATORS, Boolean.class);
Expand Down Expand Up @@ -1026,16 +997,6 @@ public static Optional<String> getSpatialPartitioningTableName(Session session)
return Optional.ofNullable(session.getSystemProperty(SPATIAL_PARTITIONING_TABLE_NAME, String.class));
}

public static OptionalInt getConcurrentLifespansPerNode(Session session)
{
Integer result = session.getSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, Integer.class);
if (result == 0) {
return OptionalInt.empty();
}
checkArgument(result > 0, "Concurrent lifespans per node is negative: %s", result);
return OptionalInt.of(result);
}

public static int getInitialSplitsPerNode(Session session)
{
return session.getSystemProperty(INITIAL_SPLITS_PER_NODE, Integer.class);
Expand Down Expand Up @@ -1402,13 +1363,7 @@ public static DataSize getMaxPartialTopNMemory(Session session)

public static RetryPolicy getRetryPolicy(Session session)
{
RetryPolicy retryPolicy = session.getSystemProperty(RETRY_POLICY, RetryPolicy.class);
if (retryPolicy == RetryPolicy.TASK) {
if (isGroupedExecutionEnabled(session) || isDynamicScheduleForGroupedExecution(session)) {
throw new TrinoException(NOT_SUPPORTED, "Grouped execution is not supported with task level retries enabled");
}
}
return retryPolicy;
return session.getSystemProperty(RETRY_POLICY, RetryPolicy.class);
}

public static int getQueryRetryAttempts(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
Expand Down
104 changes: 0 additions & 104 deletions core/trino-main/src/main/java/io/trino/execution/Lifespan.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public interface RemoteTask

void noMoreSplits(PlanNodeId sourceId);

void noMoreSplits(PlanNodeId sourceId, Lifespan lifespan);

void setOutputBuffers(OutputBuffers outputBuffers);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,19 @@ public class SplitAssignment
{
private final PlanNodeId planNodeId;
private final Set<ScheduledSplit> splits;
private final Set<Lifespan> noMoreSplitsForLifespan;
private final boolean noMoreSplits;

@JsonCreator
public SplitAssignment(
@JsonProperty("planNodeId") PlanNodeId planNodeId,
@JsonProperty("splits") Set<ScheduledSplit> splits,
@JsonProperty("noMoreSplitsForLifespan") Set<Lifespan> noMoreSplitsForLifespan,
@JsonProperty("noMoreSplits") boolean noMoreSplits)
{
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.splits = ImmutableSet.copyOf(requireNonNull(splits, "splits is null"));
this.noMoreSplitsForLifespan = ImmutableSet.copyOf(noMoreSplitsForLifespan);
this.noMoreSplits = noMoreSplits;
}

public SplitAssignment(PlanNodeId planNodeId, Set<ScheduledSplit> splits, boolean noMoreSplits)
{
this(planNodeId, splits, ImmutableSet.of(), noMoreSplits);
}

@JsonProperty
public PlanNodeId getPlanNodeId()
{
Expand All @@ -61,12 +53,6 @@ public Set<ScheduledSplit> getSplits()
return splits;
}

@JsonProperty
public Set<Lifespan> getNoMoreSplitsForLifespan()
{
return noMoreSplitsForLifespan;
}

@JsonProperty
public boolean isNoMoreSplits()
{
Expand All @@ -86,15 +72,10 @@ public SplitAssignment update(SplitAssignment assignment)
.addAll(splits)
.addAll(assignment.getSplits())
.build();
Set<Lifespan> newNoMoreSplitsForDriverGroup = ImmutableSet.<Lifespan>builder()
.addAll(noMoreSplitsForLifespan)
.addAll(assignment.getNoMoreSplitsForLifespan())
.build();

return new SplitAssignment(
planNodeId,
newSplits,
newNoMoreSplitsForDriverGroup,
assignment.isNoMoreSplits());
}
else {
Expand All @@ -108,7 +89,6 @@ private boolean isNewer(SplitAssignment assignment)
// the specified assignment is newer if it changes the no more
// splits flag or if it contains new splits
return (!noMoreSplits && assignment.isNoMoreSplits()) ||
(!noMoreSplitsForLifespan.containsAll(assignment.getNoMoreSplitsForLifespan())) ||
(!splits.containsAll(assignment.getSplits()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ public synchronized Optional<RemoteTask> createTask(
Optional<int[]> bucketToPartition,
OutputBuffers outputBuffers,
Multimap<PlanNodeId, Split> splits,
Multimap<PlanNodeId, Lifespan> noMoreSplitsForLifespan,
Set<PlanNodeId> noMoreSplits,
Optional<DataSize> estimatedMemory)
{
Expand All @@ -237,7 +236,6 @@ public synchronized Optional<RemoteTask> createTask(
estimatedMemory,
summarizeTaskInfo);

noMoreSplitsForLifespan.forEach(task::noMoreSplits);
noMoreSplits.forEach(task::noMoreSplits);

tasks.put(taskId, task);
Expand Down
4 changes: 0 additions & 4 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ private TaskStatus createTaskStatus(TaskHolder taskHolder)
DataSize userMemoryReservation = DataSize.ofBytes(0);
DataSize peakUserMemoryReservation = DataSize.ofBytes(0);
DataSize revocableMemoryReservation = DataSize.ofBytes(0);
// TODO: add a mechanism to avoid sending the whole completedDriverGroups set over the wire for every task status reply
Set<Lifespan> completedDriverGroups = ImmutableSet.of();
long fullGcCount = 0;
Duration fullGcTime = new Duration(0, MILLISECONDS);
long dynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
Expand Down Expand Up @@ -325,7 +323,6 @@ else if (taskHolder.getTaskExecution() != null) {
physicalWrittenDataSize = succinctBytes(physicalWrittenBytes);
userMemoryReservation = taskContext.getMemoryReservation();
revocableMemoryReservation = taskContext.getRevocableMemoryReservation();
completedDriverGroups = taskContext.getCompletedDriverGroups();
fullGcCount = taskContext.getFullGcCount();
fullGcTime = taskContext.getFullGcTime();
dynamicFiltersVersion = taskContext.getDynamicFiltersVersion();
Expand All @@ -337,7 +334,6 @@ else if (taskHolder.getTaskExecution() != null) {
state,
location,
nodeId,
completedDriverGroups,
failures,
queuedPartitionedDrivers,
runningPartitionedDrivers,
Expand Down
Loading