diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 651edd79b26e..6659d7b4886b 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -81,6 +81,7 @@ public final class SystemSessionProperties public static final String PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS = "preferred_write_partitioning_min_number_of_partitions"; public static final String SCALE_WRITERS = "scale_writers"; public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled"; + public static final String MAX_WRITER_NODES_COUNT = "max_writer_nodes_count"; public static final String TASK_SCALE_WRITERS_MAX_WRITER_COUNT = "task_scale_writers_max_writer_count"; public static final String WRITER_MIN_SIZE = "writer_min_size"; public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union"; @@ -298,6 +299,11 @@ public SystemSessionProperties( "Scale out writers based on throughput (use minimum necessary)", featuresConfig.isScaleWriters(), false), + integerProperty( + MAX_WRITER_NODES_COUNT, + "Set upper limit on number of nodes that take part in executing writing stages", + queryManagerConfig.getMaxWriterNodesCount(), + false), booleanProperty( TASK_SCALE_WRITERS_ENABLED, "Scale the number of concurrent table writers per task based on throughput", @@ -1005,6 +1011,11 @@ public static int getTaskScaleWritersMaxWriterCount(Session session) return session.getSystemProperty(TASK_SCALE_WRITERS_MAX_WRITER_COUNT, Integer.class); } + public static int getMaxWriterNodesCount(Session session) + { + return session.getSystemProperty(MAX_WRITER_NODES_COUNT, Integer.class); + } + public static DataSize getWriterMinSize(Session session) { return session.getSystemProperty(WRITER_MIN_SIZE, DataSize.class); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 124e27465f57..5c6ad19eca38 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -55,6 +55,8 @@ public class QueryManagerConfig private int maxHashPartitionCount = 100; private int minHashPartitionCount = 4; + private int maxWriterNodesCount = 100; + private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES); private int maxQueryHistory = 100; private int maxQueryLength = 1_000_000; @@ -190,6 +192,20 @@ public QueryManagerConfig setMinHashPartitionCount(int minHashPartitionCount) return this; } + @Min(1) + public int getMaxWriterNodesCount() + { + return maxWriterNodesCount; + } + + @Config("query.max-writer-nodes-count") + @ConfigDescription("Maximum number of nodes that will take part in writer tasks") + public QueryManagerConfig setMaxWriterNodesCount(int maxWritersNodesCount) + { + this.maxWriterNodesCount = maxWritersNodesCount; + return this; + } + @NotNull public Duration getMinQueryExpireAge() { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java index 1f74a93315d9..218df7d56d18 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java @@ -107,6 +107,7 @@ import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; import static io.airlift.concurrent.MoreFutures.whenAnyComplete; import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static io.trino.SystemSessionProperties.getMaxWriterNodesCount; import static io.trino.SystemSessionProperties.getQueryRetryAttempts; import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor; import static io.trino.SystemSessionProperties.getRetryInitialDelay; @@ -1081,7 +1082,8 @@ public void stateChanged(QueryState newState) writerTasksProvider, nodeScheduler.createNodeSelector(session, Optional.empty()), executor, - getWriterMinSize(session)); + getWriterMinSize(session), + getMaxWriterNodesCount(session)); whenAllStages(childStageExecutions, StageExecution.State::isDone) .addListener(scheduler::finish, directExecutor()); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java index 1c24a1ddd5d5..61aeca1daa81 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ScaledWriterScheduler.java @@ -50,6 +50,7 @@ public class ScaledWriterScheduler private final long writerMinSizeBytes; private final Set scheduledNodes = new HashSet<>(); private final AtomicBoolean done = new AtomicBoolean(); + private final int maxWriterNodesCount; private volatile SettableFuture future = SettableFuture.create(); public ScaledWriterScheduler( @@ -58,7 +59,8 @@ public ScaledWriterScheduler( Supplier> writerTasksProvider, NodeSelector nodeSelector, ScheduledExecutorService executor, - DataSize writerMinSize) + DataSize writerMinSize, + int maxWriterNodesCount) { this.stage = requireNonNull(stage, "stage is null"); this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null"); @@ -66,6 +68,7 @@ public ScaledWriterScheduler( this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null"); this.executor = requireNonNull(executor, "executor is null"); this.writerMinSizeBytes = writerMinSize.toBytes(); + this.maxWriterNodesCount = maxWriterNodesCount; } public void finish() @@ -99,6 +102,24 @@ private int getNewTaskCount() return 0; } + // When there is a big data skewness, there could be a bottleneck due to the skewed workers even if most of the workers are not over-utilized. + // Check both, weighted output buffer over-utilization rate and average output buffer over-utilization rate, in case when there are many over-utilized small tasks + // due to fewer not-over-utilized big skewed tasks. + if (isSourceTasksBufferFull() && isWriteThroughputSufficient() && scheduledNodes.size() < maxWriterNodesCount) { + return 1; + } + + return 0; + } + + private boolean isSourceTasksBufferFull() + { + return isAverageBufferFull() || isWeightedBufferFull(); + } + + private boolean isWriteThroughputSufficient() + { + Collection writerTasks = writerTasksProvider.get(); long writtenBytes = writerTasks.stream() .map(TaskStatus::getPhysicalWrittenDataSize) .mapToLong(DataSize::toBytes) @@ -109,15 +130,7 @@ private int getNewTaskCount() .map(Optional::get) .mapToLong(writerCount -> writerMinSizeBytes * writerCount) .sum(); - - // When there is a big data skewness, there could be a bottleneck due to the skewed workers even if most of the workers are not over-utilized. - // Check both, weighted output buffer over-utilization rate and average output buffer over-utilization rate, in case when there are many over-utilized small tasks - // due to fewer not-over-utilized big skewed tasks. - if ((isWeightedBufferFull() || isAverageBufferFull()) && (writtenBytes >= minWrittenBytesToScaleUp)) { - return 1; - } - - return 0; + return writtenBytes >= minWrittenBytesToScaleUp; } private boolean isWeightedBufferFull() diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java index 393f461a693b..4ac3e0cdd26d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java @@ -240,6 +240,7 @@ import io.trino.sql.planner.optimizations.DeterminePartitionCount; import io.trino.sql.planner.optimizations.HashGenerationOptimizer; import io.trino.sql.planner.optimizations.IndexJoinOptimizer; +import io.trino.sql.planner.optimizations.LimitMaxWriterNodesCount; import io.trino.sql.planner.optimizations.LimitPushDown; import io.trino.sql.planner.optimizations.MetadataQueryOptimizer; import io.trino.sql.planner.optimizations.OptimizeMixedDistinctAggregations; @@ -849,6 +850,7 @@ public PlanOptimizers( builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(plannerContext, typeAnalyzer, statsCalculator))); // It can only run after AddExchanges since it estimates the hash partition count for all remote exchanges builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new DeterminePartitionCount(statsCalculator))); + builder.add(new LimitMaxWriterNodesCount()); } // use cost calculator without estimated exchanges after AddExchanges diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/LimitMaxWriterNodesCount.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/LimitMaxWriterNodesCount.java new file mode 100644 index 000000000000..4a8847136034 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/LimitMaxWriterNodesCount.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.planner.optimizations; + +import com.google.common.collect.ImmutableList; +import io.trino.Session; +import io.trino.cost.TableStatsProvider; +import io.trino.execution.warnings.WarningCollector; +import io.trino.operator.RetryPolicy; +import io.trino.sql.planner.PartitioningHandle; +import io.trino.sql.planner.PlanNodeIdAllocator; +import io.trino.sql.planner.SymbolAllocator; +import io.trino.sql.planner.SystemPartitioningHandle; +import io.trino.sql.planner.TypeProvider; +import io.trino.sql.planner.plan.ExchangeNode; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.SimplePlanRewriter; +import io.trino.sql.planner.plan.TableWriterNode; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.SystemSessionProperties.MAX_HASH_PARTITION_COUNT; +import static io.trino.SystemSessionProperties.MAX_WRITER_NODES_COUNT; +import static io.trino.SystemSessionProperties.getRetryPolicy; +import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; +import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; +import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION; +import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE; +import static io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith; +import static java.util.Objects.requireNonNull; + +public class LimitMaxWriterNodesCount + implements PlanOptimizer +{ + private static final List> SUPPORTED_NODES = ImmutableList.of(TableWriterNode.class); + private static final List SUPPORTED_PARTITIONING_MODES = ImmutableList.of( + FIXED_HASH_DISTRIBUTION, SCALED_WRITER_HASH_DISTRIBUTION, FIXED_ARBITRARY_DISTRIBUTION); + + @Override + public PlanNode optimize( + PlanNode plan, + Session session, + TypeProvider types, + SymbolAllocator symbolAllocator, + PlanNodeIdAllocator idAllocator, + WarningCollector warningCollector, + TableStatsProvider tableStatsProvider) + { + requireNonNull(plan, "plan is null"); + requireNonNull(session, "session is null"); + + // Skip for plans where there is not writing stages. Additionally, skip for FTE mode since we + // are not using estimated partitionCount in FTE scheduler. + + if (!PlanNodeSearcher.searchFrom(plan).whereIsInstanceOfAny(SUPPORTED_NODES).matches() || getRetryPolicy(session) == RetryPolicy.TASK) { + return plan; + } + + // if TableWriter's source is not an exchange or partitioning is not supported does not fire that rule + List allTableWriters = PlanNodeSearcher + .searchFrom(plan) + .where(TableWriterNode.class::isInstance) + .findAll(); + + boolean isPartitioningWritingModeSupported = allTableWriters + .stream() + .allMatch(it -> it.getSource() instanceof ExchangeNode exchangeNode && SUPPORTED_PARTITIONING_MODES.contains(exchangeNode.getPartitioningScheme().getPartitioning().getHandle())); + + if (!isPartitioningWritingModeSupported) { + return plan; + } + + // if there is not-system partitioning does not file that rule + boolean isAllPartitioningSystemPartitioning = PlanNodeSearcher + .searchFrom(plan) + .where(ExchangeNode.class::isInstance) + .findAll() + .stream() + .map(ExchangeNode.class::cast) + .map(node -> node.getPartitioningScheme().getPartitioning().getHandle().getConnectorHandle()) + .allMatch(SystemPartitioningHandle.class::isInstance); + + if (!isAllPartitioningSystemPartitioning) { + return plan; + } + + int maxWritersNodesCount = Math.min( + session.getSystemProperty(MAX_WRITER_NODES_COUNT, Integer.class), + session.getSystemProperty(MAX_HASH_PARTITION_COUNT, Integer.class)); + return rewriteWith(new Rewriter(maxWritersNodesCount), plan); + } + + private static class Rewriter + extends SimplePlanRewriter + { + private final int maxWriterNodesCount; + + private Rewriter(int maxWriterNodesCount) + { + this.maxWriterNodesCount = maxWriterNodesCount; + } + + @Override + public PlanNode visitExchange(ExchangeNode node, RewriteContext context) + { + if (node.getScope() != REMOTE) { + return node; + } + + List sources = node.getSources().stream() + .map(context::rewrite) + .collect(toImmutableList()); + + return new ExchangeNode( + node.getId(), + node.getType(), + node.getScope(), + node.getPartitioningScheme().withPartitionCount(maxWriterNodesCount), + sources, + node.getInputs(), + node.getOrderingScheme()); + } + } +} diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java index 85d50277f5de..eb8901ed0a2d 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java @@ -148,10 +148,32 @@ public void testGetNewTaskCountWhenExistingWriterTaskMaxWriterCountIsEmpty() assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 0); } + @Test + public void testNewTaskCountWhenNodesUpperLimitIsNotExceeded() + { + TaskStatus taskStatus = buildTaskStatus(true, 123456L); + AtomicReference> taskStatusProvider = new AtomicReference<>(ImmutableList.of(taskStatus)); + ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider, 2); + + scaledWriterScheduler.schedule(); + assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 1); + } + + @Test + public void testNewTaskCountWhenNodesUpperLimitIsExceeded() + { + TaskStatus taskStatus = buildTaskStatus(true, 123456L); + AtomicReference> taskStatusProvider = new AtomicReference<>(ImmutableList.of(taskStatus)); + ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider, 1); + + scaledWriterScheduler.schedule(); + assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 0); + } + private ScaledWriterScheduler buildScaleWriterSchedulerWithInitialTasks(TaskStatus taskStatus1, TaskStatus taskStatus2, TaskStatus taskStatus3) { AtomicReference> taskStatusProvider = new AtomicReference<>(ImmutableList.of()); - ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider); + ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider, 100); assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 1); taskStatusProvider.set(ImmutableList.of(taskStatus1)); @@ -165,7 +187,7 @@ private ScaledWriterScheduler buildScaleWriterSchedulerWithInitialTasks(TaskStat return scaledWriterScheduler; } - private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference> taskStatusProvider) + private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference> taskStatusProvider, int maxWritersNodesCount) { return new ScaledWriterScheduler( new TestingStageExecution(createFragment()), @@ -176,7 +198,8 @@ private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference tables = ImmutableList.of(partitionedTable, unPartitionedTable, sourceTable); + MockConnectorFactory connectorFactory = MockConnectorFactory.builder() + .withGetTableHandle(((session, tableName) -> { + if (tables.contains(tableName.getTableName())) { + return new MockConnectorTableHandle(tableName); + } + return null; + })) + .withGetInsertLayout((session, tableMetadata) -> { + if (tableMetadata.getTableName().equals(partitionedTable)) { + return Optional.of(new ConnectorTableLayout(ImmutableList.of("column_a"))); + } + return Optional.empty(); + }) + .withGetNewTableLayout((session, tableMetadata) -> { + if (tableMetadata.getTable().getTableName().equals(partitionedTable)) { + return Optional.of(new ConnectorTableLayout(ImmutableList.of("column_a"))); + } + return Optional.empty(); + }) + .withSupportsReportingWrittenBytes(true) + .withGetColumns(schemaTableName -> ImmutableList.of( + new ColumnMetadata("column_a", VARCHAR), + new ColumnMetadata("column_b", VARCHAR))) + .withName(catalogName) + .build(); + + Session session = testSessionBuilder() + .setCatalog(catalogName) + .setSchema("default") + .build(); + LocalQueryRunner queryRunner = LocalQueryRunner.create(session); + queryRunner.createCatalog( + catalogName, + connectorFactory, + ImmutableMap.of()); + return queryRunner; + } + + @Test + public void testPlanWhenInsertToPartitionedTable() + { + @Language("SQL") String query = "INSERT INTO partitioned_target_table VALUES ('one', 'two'), ('three', 'four')"; + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(MAX_WRITER_NODES_COUNT, "2") + .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .build(); + + // TestLimitMaxWriterNodesCount optimizer rule should fire and set the partitionCount to 2 for remote exchanges + assertDistributedPlan( + query, + session, + anyTree( + node(TableWriterNode.class, + project( + exchange(LOCAL, + exchange(REMOTE, Optional.of(2), + project( + values("column_a", "column_b")))))))); + } + + @Test + public void testPlanWhenInsertToUnpartitionedTableScaleWritersDisabled() + { + @Language("SQL") String query = "INSERT INTO unpartitioned_target_table VALUES ('one', 'two')"; + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(MAX_WRITER_NODES_COUNT, "2") + .setSystemProperty(SCALE_WRITERS, "false") + .build(); + + // TestLimitMaxWriterNodesCount optimizer rule should fire and set the partitionCount to 2 for remote exchanges + assertDistributedPlan( + query, + session, + anyTree( + node(TableWriterNode.class, + exchange(LOCAL, Optional.empty(), + exchange(REMOTE, Optional.of(2), + values("column_a", "column_b")))))); + } + + @Test + public void testPlanWhenInsertToUnpartitionedTableScaleWritersEnabled() + { + @Language("SQL") String query = "INSERT INTO unpartitioned_target_table VALUES ('one', 'two')"; + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(MAX_WRITER_NODES_COUNT, "2") + .setSystemProperty(SCALE_WRITERS, "true") + .build(); + + // TestLimitMaxWriterNodesCount optimizer rule should not fire because scale writers is enabled - no partitioning + assertDistributedPlan( + query, + session, + anyTree( + node(TableWriterNode.class, + exchange(LOCAL, Optional.empty(), + exchange(REMOTE, Optional.empty(), + values("column_a", "column_b")))))); + } + + @Test + public void testPlanWhenThereIsNoTableWriter() + { + @Language("SQL") String query = "SELECT count(*) FROM source_table"; + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(MAX_WRITER_NODES_COUNT, "2") + .build(); + + // TestLimitMaxWriterNodesCount optimizer rule should be skipped when there is no TableWriterNode + assertDistributedPlan( + query, + session, + output( + node(AggregationNode.class, + exchange(LOCAL, Optional.empty(), + exchange(REMOTE, Optional.empty(), + node(AggregationNode.class, + node(TableScanNode.class))))))); + } +}