Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public final class SystemSessionProperties
public static final String PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS = "preferred_write_partitioning_min_number_of_partitions";
public static final String SCALE_WRITERS = "scale_writers";
public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled";
public static final String MAX_WRITER_NODES_COUNT = "max_writer_nodes_count";
public static final String TASK_SCALE_WRITERS_MAX_WRITER_COUNT = "task_scale_writers_max_writer_count";
public static final String WRITER_MIN_SIZE = "writer_min_size";
public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union";
Expand Down Expand Up @@ -298,6 +299,11 @@ public SystemSessionProperties(
"Scale out writers based on throughput (use minimum necessary)",
featuresConfig.isScaleWriters(),
false),
integerProperty(
MAX_WRITER_NODES_COUNT,
"Set upper limit on number of nodes that take part in executing writing stages",
queryManagerConfig.getMaxWriterNodesCount(),
false),
booleanProperty(
TASK_SCALE_WRITERS_ENABLED,
"Scale the number of concurrent table writers per task based on throughput",
Expand Down Expand Up @@ -1005,6 +1011,11 @@ public static int getTaskScaleWritersMaxWriterCount(Session session)
return session.getSystemProperty(TASK_SCALE_WRITERS_MAX_WRITER_COUNT, Integer.class);
}

public static int getMaxWriterNodesCount(Session session)
{
return session.getSystemProperty(MAX_WRITER_NODES_COUNT, Integer.class);
}

public static DataSize getWriterMinSize(Session session)
{
return session.getSystemProperty(WRITER_MIN_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class QueryManagerConfig

private int maxHashPartitionCount = 100;
private int minHashPartitionCount = 4;
private int maxWriterNodesCount = 100;

private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES);
private int maxQueryHistory = 100;
private int maxQueryLength = 1_000_000;
Expand Down Expand Up @@ -190,6 +192,20 @@ public QueryManagerConfig setMinHashPartitionCount(int minHashPartitionCount)
return this;
}

@Min(1)
public int getMaxWriterNodesCount()
{
return maxWriterNodesCount;
}

@Config("query.max-writer-nodes-count")
@ConfigDescription("Maximum number of nodes that will take part in writer tasks")
public QueryManagerConfig setMaxWriterNodesCount(int maxWritersNodesCount)
{
this.maxWriterNodesCount = maxWritersNodesCount;
return this;
}

@NotNull
public Duration getMinQueryExpireAge()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.trino.SystemSessionProperties.getMaxWriterNodesCount;
import static io.trino.SystemSessionProperties.getQueryRetryAttempts;
import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor;
import static io.trino.SystemSessionProperties.getRetryInitialDelay;
Expand Down Expand Up @@ -1081,7 +1082,8 @@ public void stateChanged(QueryState newState)
writerTasksProvider,
nodeScheduler.createNodeSelector(session, Optional.empty()),
executor,
getWriterMinSize(session));
getWriterMinSize(session),
getMaxWriterNodesCount(session));

whenAllStages(childStageExecutions, StageExecution.State::isDone)
.addListener(scheduler::finish, directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ScaledWriterScheduler
private final long writerMinSizeBytes;
private final Set<InternalNode> scheduledNodes = new HashSet<>();
private final AtomicBoolean done = new AtomicBoolean();
private final int maxWriterNodesCount;
private volatile SettableFuture<Void> future = SettableFuture.create();

public ScaledWriterScheduler(
Expand All @@ -58,14 +59,16 @@ public ScaledWriterScheduler(
Supplier<Collection<TaskStatus>> writerTasksProvider,
NodeSelector nodeSelector,
ScheduledExecutorService executor,
DataSize writerMinSize)
DataSize writerMinSize,
int maxWriterNodesCount)
{
this.stage = requireNonNull(stage, "stage is null");
this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null");
this.writerTasksProvider = requireNonNull(writerTasksProvider, "writerTasksProvider is null");
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
this.executor = requireNonNull(executor, "executor is null");
this.writerMinSizeBytes = writerMinSize.toBytes();
this.maxWriterNodesCount = maxWriterNodesCount;
}

public void finish()
Expand Down Expand Up @@ -99,6 +102,24 @@ private int getNewTaskCount()
return 0;
}

// When there is a big data skewness, there could be a bottleneck due to the skewed workers even if most of the workers are not over-utilized.
// Check both, weighted output buffer over-utilization rate and average output buffer over-utilization rate, in case when there are many over-utilized small tasks
// due to fewer not-over-utilized big skewed tasks.
if (isSourceTasksBufferFull() && isWriteThroughputSufficient() && scheduledNodes.size() < maxWriterNodesCount) {
return 1;
}

return 0;
}

private boolean isSourceTasksBufferFull()
{
return isAverageBufferFull() || isWeightedBufferFull();
}

private boolean isWriteThroughputSufficient()
{
Collection<TaskStatus> writerTasks = writerTasksProvider.get();
long writtenBytes = writerTasks.stream()
.map(TaskStatus::getPhysicalWrittenDataSize)
.mapToLong(DataSize::toBytes)
Expand All @@ -109,15 +130,7 @@ private int getNewTaskCount()
.map(Optional::get)
.mapToLong(writerCount -> writerMinSizeBytes * writerCount)
.sum();

// When there is a big data skewness, there could be a bottleneck due to the skewed workers even if most of the workers are not over-utilized.
// Check both, weighted output buffer over-utilization rate and average output buffer over-utilization rate, in case when there are many over-utilized small tasks
// due to fewer not-over-utilized big skewed tasks.
if ((isWeightedBufferFull() || isAverageBufferFull()) && (writtenBytes >= minWrittenBytesToScaleUp)) {
return 1;
}

return 0;
return writtenBytes >= minWrittenBytesToScaleUp;
}

private boolean isWeightedBufferFull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@
import io.trino.sql.planner.optimizations.DeterminePartitionCount;
import io.trino.sql.planner.optimizations.HashGenerationOptimizer;
import io.trino.sql.planner.optimizations.IndexJoinOptimizer;
import io.trino.sql.planner.optimizations.LimitMaxWriterNodesCount;
import io.trino.sql.planner.optimizations.LimitPushDown;
import io.trino.sql.planner.optimizations.MetadataQueryOptimizer;
import io.trino.sql.planner.optimizations.OptimizeMixedDistinctAggregations;
Expand Down Expand Up @@ -849,6 +850,7 @@ public PlanOptimizers(
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(plannerContext, typeAnalyzer, statsCalculator)));
// It can only run after AddExchanges since it estimates the hash partition count for all remote exchanges
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new DeterminePartitionCount(statsCalculator)));
builder.add(new LimitMaxWriterNodesCount());
}

// use cost calculator without estimated exchanges after AddExchanges
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner.optimizations;

import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.cost.TableStatsProvider;
import io.trino.execution.warnings.WarningCollector;
import io.trino.operator.RetryPolicy;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PlanNodeIdAllocator;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.TypeProvider;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.SimplePlanRewriter;
import io.trino.sql.planner.plan.TableWriterNode;

import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.SystemSessionProperties.MAX_HASH_PARTITION_COUNT;
import static io.trino.SystemSessionProperties.MAX_WRITER_NODES_COUNT;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith;
import static java.util.Objects.requireNonNull;

public class LimitMaxWriterNodesCount
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having a separate rule, you can do this directly inside AddExchange#visitTableWriter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could. I do it here because:

  1. Most probably in the future we can make it adaptive basing on stats.
  2. There is a rule that is similar in the sense of setting PartitioningScheme - ApplyPreferredTableExecutePartitioning. This rule is very simple, just basing on configuration toggle and could be part of AddExchanges as well.

This is why I decided to wrap it within another rule.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I don't think we need a separate rule for this since we are just applying the configuration. In ApplyPreferredTableExecutePartitioning we are taking a decision based on estimates to use preferred partitioning or not. So, Its kinda makes sense.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in future if we ever make it adaptive based on stats we can always add a new rule and remove from AddExchanges. It shouldn't be a big change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT? @sopel39

Copy link
Copy Markdown
Member

@sopel39 sopel39 Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the easier approach is preferred. If it's just few lines of code, then AddExchange#visitTableWriter is good enough.

Be sure to have proper testing coverage. With AddExchange I think you have to have BasePlanTest kind of tests

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav8297 , the rule LimitMaxWriterNodesCount is now much complicated. Especially, we skip the rule in some cases that is not skipped in AddExchange rule. I do not think that we should merge them. It is complicated it seems to be at the beginning.

implements PlanOptimizer
{
private static final List<Class<? extends PlanNode>> SUPPORTED_NODES = ImmutableList.of(TableWriterNode.class);
private static final List<PartitioningHandle> SUPPORTED_PARTITIONING_MODES = ImmutableList.of(
FIXED_HASH_DISTRIBUTION, SCALED_WRITER_HASH_DISTRIBUTION, FIXED_ARBITRARY_DISTRIBUTION);

@Override
public PlanNode optimize(
PlanNode plan,
Session session,
TypeProvider types,
SymbolAllocator symbolAllocator,
PlanNodeIdAllocator idAllocator,
WarningCollector warningCollector,
TableStatsProvider tableStatsProvider)
{
requireNonNull(plan, "plan is null");
requireNonNull(session, "session is null");

// Skip for plans where there is not writing stages. Additionally, skip for FTE mode since we
// are not using estimated partitionCount in FTE scheduler.

if (!PlanNodeSearcher.searchFrom(plan).whereIsInstanceOfAny(SUPPORTED_NODES).matches() || getRetryPolicy(session) == RetryPolicy.TASK) {
return plan;
}

// if TableWriter's source is not an exchange or partitioning is not supported does not fire that rule
List<TableWriterNode> allTableWriters = PlanNodeSearcher
.searchFrom(plan)
.where(TableWriterNode.class::isInstance)
.findAll();

boolean isPartitioningWritingModeSupported = allTableWriters
.stream()
.allMatch(it -> it.getSource() instanceof ExchangeNode exchangeNode && SUPPORTED_PARTITIONING_MODES.contains(exchangeNode.getPartitioningScheme().getPartitioning().getHandle()));

if (!isPartitioningWritingModeSupported) {
return plan;
}

// if there is not-system partitioning does not file that rule
boolean isAllPartitioningSystemPartitioning = PlanNodeSearcher
.searchFrom(plan)
.where(ExchangeNode.class::isInstance)
.findAll()
.stream()
.map(ExchangeNode.class::cast)
.map(node -> node.getPartitioningScheme().getPartitioning().getHandle().getConnectorHandle())
.allMatch(SystemPartitioningHandle.class::isInstance);

if (!isAllPartitioningSystemPartitioning) {
return plan;
}

int maxWritersNodesCount = Math.min(
session.getSystemProperty(MAX_WRITER_NODES_COUNT, Integer.class),
session.getSystemProperty(MAX_HASH_PARTITION_COUNT, Integer.class));
return rewriteWith(new Rewriter(maxWritersNodesCount), plan);
}

private static class Rewriter
extends SimplePlanRewriter<Void>
{
private final int maxWriterNodesCount;

private Rewriter(int maxWriterNodesCount)
{
this.maxWriterNodesCount = maxWriterNodesCount;
}

@Override
public PlanNode visitExchange(ExchangeNode node, RewriteContext<Void> context)
{
if (node.getScope() != REMOTE) {
return node;
}

List<PlanNode> sources = node.getSources().stream()
.map(context::rewrite)
.collect(toImmutableList());

return new ExchangeNode(
node.getId(),
node.getType(),
node.getScope(),
node.getPartitioningScheme().withPartitionCount(maxWriterNodesCount),
sources,
node.getInputs(),
node.getOrderingScheme());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,32 @@ public void testGetNewTaskCountWhenExistingWriterTaskMaxWriterCountIsEmpty()
assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 0);
}

@Test
public void testNewTaskCountWhenNodesUpperLimitIsNotExceeded()
{
TaskStatus taskStatus = buildTaskStatus(true, 123456L);
AtomicReference<List<TaskStatus>> taskStatusProvider = new AtomicReference<>(ImmutableList.of(taskStatus));
ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider, 2);

scaledWriterScheduler.schedule();
assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 1);
}

@Test
public void testNewTaskCountWhenNodesUpperLimitIsExceeded()
{
TaskStatus taskStatus = buildTaskStatus(true, 123456L);
AtomicReference<List<TaskStatus>> taskStatusProvider = new AtomicReference<>(ImmutableList.of(taskStatus));
ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider, 1);

scaledWriterScheduler.schedule();
assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 0);
}

private ScaledWriterScheduler buildScaleWriterSchedulerWithInitialTasks(TaskStatus taskStatus1, TaskStatus taskStatus2, TaskStatus taskStatus3)
{
AtomicReference<List<TaskStatus>> taskStatusProvider = new AtomicReference<>(ImmutableList.of());
ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider);
ScaledWriterScheduler scaledWriterScheduler = buildScaledWriterScheduler(taskStatusProvider, 100);

assertEquals(scaledWriterScheduler.schedule().getNewTasks().size(), 1);
taskStatusProvider.set(ImmutableList.of(taskStatus1));
Expand All @@ -165,7 +187,7 @@ private ScaledWriterScheduler buildScaleWriterSchedulerWithInitialTasks(TaskStat
return scaledWriterScheduler;
}

private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference<List<TaskStatus>> taskStatusProvider)
private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference<List<TaskStatus>> taskStatusProvider, int maxWritersNodesCount)
{
return new ScaledWriterScheduler(
new TestingStageExecution(createFragment()),
Expand All @@ -176,7 +198,8 @@ private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference<List<Ta
new NodeSchedulerConfig().setIncludeCoordinator(true),
new NodeTaskMap(new FinalizerService())).createNodeSelector(testSessionBuilder().build(), Optional.empty()),
newScheduledThreadPool(10, threadsNamed("task-notification-%s")),
DataSize.of(32, DataSize.Unit.MEGABYTE));
DataSize.of(32, DataSize.Unit.MEGABYTE),
maxWritersNodesCount);
}

private static TaskStatus buildTaskStatus(boolean isOutputBufferOverUtilized, long outputDataSize)
Expand Down
Loading