Skip to content

Commit e03d410

Browse files
gaurav8297sopel39
authored andcommitted
Remove prefer partitioning rules
1 parent 9f851be commit e03d410

28 files changed

+59
-575
lines changed

core/trino-main/src/main/java/io/trino/SystemSessionProperties.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ public final class SystemSessionProperties
8080
public static final String QUERY_MAX_STAGE_COUNT = "query_max_stage_count";
8181
public static final String REDISTRIBUTE_WRITES = "redistribute_writes";
8282
public static final String USE_PREFERRED_WRITE_PARTITIONING = "use_preferred_write_partitioning";
83-
public static final String PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS = "preferred_write_partitioning_min_number_of_partitions";
8483
public static final String SCALE_WRITERS = "scale_writers";
8584
public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled";
8685
public static final String MAX_WRITER_TASKS_COUNT = "max_writer_tasks_count";
@@ -306,16 +305,6 @@ public SystemSessionProperties(
306305
"Use preferred write partitioning",
307306
optimizerConfig.isUsePreferredWritePartitioning(),
308307
false),
309-
integerProperty(
310-
PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS,
311-
"Use preferred write partitioning when the number of written partitions exceeds the configured threshold",
312-
optimizerConfig.getPreferredWritePartitioningMinNumberOfPartitions(),
313-
value -> {
314-
if (value < 1) {
315-
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be greater than or equal to 1: %s", PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, value));
316-
}
317-
},
318-
false),
319308
booleanProperty(
320309
SCALE_WRITERS,
321310
"Scale out writers based on throughput (use minimum necessary)",
@@ -1093,11 +1082,6 @@ public static boolean isUsePreferredWritePartitioning(Session session)
10931082
return session.getSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, Boolean.class);
10941083
}
10951084

1096-
public static int getPreferredWritePartitioningMinNumberOfPartitions(Session session)
1097-
{
1098-
return session.getSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, Integer.class);
1099-
}
1100-
11011085
public static boolean isScaleWriters(Session session)
11021086
{
11031087
return session.getSystemProperty(SCALE_WRITERS, Boolean.class);

core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3285,7 +3285,7 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod
32853285
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
32863286
{
32873287
// Set table writer count
3288-
int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getPreferredPartitioningScheme(), node.getSource());
3288+
int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource());
32893289
context.setDriverInstanceCount(maxWriterCount);
32903290
context.taskContext.setMaxWriterCount(maxWriterCount);
32913291

@@ -3443,7 +3443,7 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node
34433443
public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecutionPlanContext context)
34443444
{
34453445
// Set table writer count
3446-
int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getPreferredPartitioningScheme(), node.getSource());
3446+
int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource());
34473447
context.setDriverInstanceCount(maxWriterCount);
34483448
context.taskContext.setMaxWriterCount(maxWriterCount);
34493449

@@ -3470,7 +3470,7 @@ public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecution
34703470
return new PhysicalOperation(operatorFactory, outputMapping.buildOrThrow(), context, source);
34713471
}
34723472

3473-
private int getWriterCount(Session session, Optional<PartitioningScheme> partitioningScheme, Optional<PartitioningScheme> preferredPartitioningScheme, PlanNode source)
3473+
private int getWriterCount(Session session, Optional<PartitioningScheme> partitioningScheme, PlanNode source)
34743474
{
34753475
// This check is required because we don't know which writer count to use when exchange is
34763476
// single distribution. It could be possible that when scaling is enabled, a single distribution is
@@ -3480,18 +3480,12 @@ private int getWriterCount(Session session, Optional<PartitioningScheme> partiti
34803480
return 1;
34813481
}
34823482

3483-
if (isLocalScaledWriterExchange(source)) {
3484-
return partitioningScheme.or(() -> preferredPartitioningScheme)
3485-
// The default value of partitioned writer count is 32 which is high enough to use it
3486-
// for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many
3487-
// small files since when scaling is disabled only single writer will handle a single partition.
3488-
.map(scheme -> getTaskPartitionedWriterCount(session))
3489-
.orElseGet(() -> getTaskScaleWritersMaxWriterCount(session));
3490-
}
3491-
3483+
// The default value of partitioned writer count is 32 which is high enough to use it
3484+
// for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many
3485+
// small files since when scaling is disabled only single writer will handle a single partition.
34923486
return partitioningScheme
34933487
.map(scheme -> getTaskPartitionedWriterCount(session))
3494-
.orElseGet(() -> getTaskWriterCount(session));
3488+
.orElseGet(() -> isLocalScaledWriterExchange(source) ? getTaskScaleWritersMaxWriterCount(session) : getTaskWriterCount(session));
34953489
}
34963490

34973491
private boolean isSingleGatheringExchange(PlanNode node)

core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
import static io.trino.SystemSessionProperties.getMaxWriterTaskCount;
131131
import static io.trino.SystemSessionProperties.getRetryPolicy;
132132
import static io.trino.SystemSessionProperties.isCollectPlanStatisticsForAllQueries;
133+
import static io.trino.SystemSessionProperties.isUsePreferredWritePartitioning;
133134
import static io.trino.metadata.MetadataUtil.createQualifiedObjectName;
134135
import static io.trino.spi.StandardErrorCode.CATALOG_NOT_FOUND;
135136
import static io.trino.spi.StandardErrorCode.CONSTRAINT_VIOLATION;
@@ -660,7 +661,6 @@ private RelationPlan createTableWriterPlan(
660661
TableStatisticsMetadata statisticsMetadata)
661662
{
662663
Optional<PartitioningScheme> partitioningScheme = Optional.empty();
663-
Optional<PartitioningScheme> preferredPartitioningScheme = Optional.empty();
664664

665665
int maxWriterTasks = target.getMaxWriterTasks(plannerContext.getMetadata(), session).orElse(getMaxWriterTaskCount(session));
666666
Optional<Integer> maxWritersNodesCount = getRetryPolicy(session) != RetryPolicy.TASK
@@ -683,9 +683,9 @@ private RelationPlan createTableWriterPlan(
683683
Partitioning.create(partitioningHandle.get(), partitionFunctionArguments),
684684
outputLayout));
685685
}
686-
else {
686+
else if (isUsePreferredWritePartitioning(session)) {
687687
// empty connector partitioning handle means evenly partitioning on partitioning columns
688-
preferredPartitioningScheme = Optional.of(new PartitioningScheme(
688+
partitioningScheme = Optional.of(new PartitioningScheme(
689689
Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments),
690690
outputLayout,
691691
Optional.empty(),
@@ -721,7 +721,6 @@ private RelationPlan createTableWriterPlan(
721721
symbols,
722722
columnNames,
723723
partitioningScheme,
724-
preferredPartitioningScheme,
725724
Optional.of(partialAggregation),
726725
Optional.of(result.getDescriptor().map(aggregations.getMappings()::get))),
727726
target,
@@ -743,7 +742,6 @@ private RelationPlan createTableWriterPlan(
743742
symbols,
744743
columnNames,
745744
partitioningScheme,
746-
preferredPartitioningScheme,
747745
Optional.empty(),
748746
Optional.empty()),
749747
target,
@@ -956,7 +954,6 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
956954

957955
// todo extract common method to be used here and in createTableWriterPlan()
958956
Optional<PartitioningScheme> partitioningScheme = Optional.empty();
959-
Optional<PartitioningScheme> preferredPartitioningScheme = Optional.empty();
960957
if (layout.isPresent()) {
961958
List<Symbol> partitionFunctionArguments = new ArrayList<>();
962959
layout.get().getPartitionColumns().stream()
@@ -973,13 +970,13 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
973970
Partitioning.create(partitioningHandle.get(), partitionFunctionArguments),
974971
outputLayout));
975972
}
976-
else {
973+
else if (isUsePreferredWritePartitioning(session)) {
977974
// empty connector partitioning handle means evenly partitioning on partitioning columns
978975
int maxWriterTasks = tableExecuteTarget.getMaxWriterTasks(plannerContext.getMetadata(), session).orElse(getMaxWriterTaskCount(session));
979976
Optional<Integer> maxWritersNodesCount = getRetryPolicy(session) != RetryPolicy.TASK
980977
? Optional.of(Math.min(maxWriterTasks, getMaxWriterTaskCount(session)))
981978
: Optional.empty();
982-
preferredPartitioningScheme = Optional.of(new PartitioningScheme(
979+
partitioningScheme = Optional.of(new PartitioningScheme(
983980
Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments),
984981
outputLayout,
985982
Optional.empty(),
@@ -1000,8 +997,7 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
1000997
symbolAllocator.newSymbol("fragment", VARBINARY),
1001998
symbols,
1002999
columnNames,
1003-
partitioningScheme,
1004-
preferredPartitioningScheme),
1000+
partitioningScheme),
10051001
tableExecuteTarget,
10061002
symbolAllocator.newSymbol("rows", BIGINT),
10071003
Optional.empty(),

core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import static java.util.Objects.requireNonNull;
3131
import static java.util.concurrent.TimeUnit.MINUTES;
3232

33-
@DefunctConfig("adaptive-partial-aggregation.min-rows")
33+
@DefunctConfig({"adaptive-partial-aggregation.min-rows", "preferred-write-partitioning-min-number-of-partitions"})
3434
public class OptimizerConfig
3535
{
3636
private double cpuCostWeight = 75;
@@ -57,7 +57,6 @@ public class OptimizerConfig
5757
private boolean distributedSort = true;
5858

5959
private boolean usePreferredWritePartitioning = true;
60-
private int preferredWritePartitioningMinNumberOfPartitions = 50;
6160

6261
private Duration iterativeOptimizerTimeout = new Duration(3, MINUTES); // by default let optimizer wait a long time in case it retrieves some data from ConnectorMetadata
6362

@@ -373,20 +372,6 @@ public OptimizerConfig setUsePreferredWritePartitioning(boolean usePreferredWrit
373372
return this;
374373
}
375374

376-
@Min(1)
377-
public int getPreferredWritePartitioningMinNumberOfPartitions()
378-
{
379-
return preferredWritePartitioningMinNumberOfPartitions;
380-
}
381-
382-
@Config("preferred-write-partitioning-min-number-of-partitions")
383-
@ConfigDescription("Use preferred write partitioning when the number of written partitions exceeds the configured threshold")
384-
public OptimizerConfig setPreferredWritePartitioningMinNumberOfPartitions(int preferredWritePartitioningMinNumberOfPartitions)
385-
{
386-
this.preferredWritePartitioningMinNumberOfPartitions = preferredWritePartitioningMinNumberOfPartitions;
387-
return this;
388-
}
389-
390375
public Duration getIterativeOptimizerTimeout()
391376
{
392377
return iterativeOptimizerTimeout;

core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import io.trino.sql.planner.iterative.rule.AddDynamicFilterSource;
3535
import io.trino.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
3636
import io.trino.sql.planner.iterative.rule.AddIntermediateAggregations;
37-
import io.trino.sql.planner.iterative.rule.ApplyPreferredTableExecutePartitioning;
38-
import io.trino.sql.planner.iterative.rule.ApplyPreferredTableWriterPartitioning;
3937
import io.trino.sql.planner.iterative.rule.ApplyTableScanRedirection;
4038
import io.trino.sql.planner.iterative.rule.ArraySortAfterArrayDistinct;
4139
import io.trino.sql.planner.iterative.rule.CanonicalizeExpressions;
@@ -757,16 +755,6 @@ public PlanOptimizers(
757755
statsCalculator,
758756
costCalculator,
759757
ImmutableSet.of(new RemoveRedundantIdentityProjections())),
760-
// Prefer write partitioning rule requires accurate stats.
761-
// Run it before reorder joins which also depends on accurate stats.
762-
new IterativeOptimizer(
763-
plannerContext,
764-
ruleStats,
765-
statsCalculator,
766-
costCalculator,
767-
ImmutableSet.of(
768-
new ApplyPreferredTableWriterPartitioning(),
769-
new ApplyPreferredTableExecutePartitioning())),
770758
// Because ReorderJoins runs only once,
771759
// PredicatePushDown, columnPruningOptimizer and RemoveRedundantIdentityProjections
772760
// need to run beforehand in order to produce an optimal join order

core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableExecutePartitioning.java

Lines changed: 0 additions & 96 deletions
This file was deleted.

0 commit comments

Comments
 (0)