Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_STAGE_COUNT = "query_max_stage_count";
public static final String REDISTRIBUTE_WRITES = "redistribute_writes";
public static final String USE_PREFERRED_WRITE_PARTITIONING = "use_preferred_write_partitioning";
public static final String PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS = "preferred_write_partitioning_min_number_of_partitions";
public static final String SCALE_WRITERS = "scale_writers";
public static final String WRITER_MIN_SIZE = "writer_min_size";
public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union";
Expand Down Expand Up @@ -209,6 +210,11 @@ public SystemSessionProperties(
"Use preferred write partitioning",
featuresConfig.isUsePreferredWritePartitioning(),
false),
integerProperty(
PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS,
"Use preferred write partitioning when the number of written partitions exceeds the configured threshold",
featuresConfig.getPreferredWritePartitioningMinNumberOfPartitions(),
false),
booleanProperty(
SCALE_WRITERS,
"Scale out writers based on throughput (use minimum necessary)",
Expand Down Expand Up @@ -647,6 +653,11 @@ public static boolean isUsePreferredWritePartitioning(Session session)
return session.getSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, Boolean.class);
}

public static int getPreferredWritePartitioningMinNumberOfPartitions(Session session)
{
return session.getSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, Integer.class);
}

public static boolean isScaleWriters(Session session)
{
return session.getSystemProperty(SCALE_WRITERS, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ public static PlanNodeStatsEstimate groupBy(PlanNodeStatsEstimate sourceStats, C
}));
}

double rowsCount = 1;
for (Symbol groupBySymbol : groupBySymbols) {
SymbolStatsEstimate symbolStatistics = sourceStats.getSymbolStatistics(groupBySymbol);
int nullRow = (symbolStatistics.getNullsFraction() == 0.0) ? 0 : 1;
rowsCount *= symbolStatistics.getDistinctValuesCount() + nullRow;
}
double rowsCount = getRowsCount(sourceStats, groupBySymbols);
result.setOutputRowCount(min(rowsCount, sourceStats.getOutputRowCount()));

for (Map.Entry<Symbol, Aggregation> aggregationEntry : aggregations.entrySet()) {
Expand All @@ -91,6 +86,17 @@ public static PlanNodeStatsEstimate groupBy(PlanNodeStatsEstimate sourceStats, C
return result.build();
}

public static double getRowsCount(PlanNodeStatsEstimate sourceStats, Collection<Symbol> groupBySymbols)
{
double rowsCount = 1;
for (Symbol groupBySymbol : groupBySymbols) {
SymbolStatsEstimate symbolStatistics = sourceStats.getSymbolStatistics(groupBySymbol);
int nullRow = (symbolStatistics.getNullsFraction() == 0.0) ? 0 : 1;
rowsCount *= symbolStatistics.getDistinctValuesCount() + nullRow;
}
return rowsCount;
}

private static SymbolStatsEstimate estimateAggregationStats(Aggregation aggregation, PlanNodeStatsEstimate sourceStats)
{
requireNonNull(aggregation, "aggregation is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public class FeaturesConfig
private JoinReorderingStrategy joinReorderingStrategy = JoinReorderingStrategy.AUTOMATIC;
private int maxReorderedJoins = 9;
private boolean redistributeWrites = true;
private boolean usePreferredWritePartitioning;
private boolean usePreferredWritePartitioning = true;
private int preferredWritePartitioningMinNumberOfPartitions = 50;
private boolean scaleWriters;
private DataSize writerMinSize = DataSize.of(32, DataSize.Unit.MEGABYTE);
private boolean optimizeMetadataQueries;
Expand Down Expand Up @@ -373,6 +374,20 @@ public FeaturesConfig setUsePreferredWritePartitioning(boolean usePreferredWrite
return this;
}

@Min(0)
public int getPreferredWritePartitioningMinNumberOfPartitions()
{
return preferredWritePartitioningMinNumberOfPartitions;
}

@Config("preferred-write-partitioning-min-number-of-partitions")
@ConfigDescription("Use preferred write partitioning when the number of written partitions exceeds the configured threshold")
public FeaturesConfig setPreferredWritePartitioningMinNumberOfPartitions(int preferredWritePartitioningMinNumberOfPartitions)
{
this.preferredWritePartitioningMinNumberOfPartitions = preferredWritePartitioningMinNumberOfPartitions;
return this;
}

public boolean isScaleWriters()
{
return scaleWriters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Streams.zip;
import static io.trino.SystemSessionProperties.isCollectPlanStatisticsForAllQueries;
import static io.trino.SystemSessionProperties.isUsePreferredWritePartitioning;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.statistics.TableStatisticType.ROW_COUNT;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -490,6 +489,7 @@ private RelationPlan createTableWriterPlan(
TableStatisticsMetadata statisticsMetadata)
{
Optional<PartitioningScheme> partitioningScheme = Optional.empty();
Optional<PartitioningScheme> preferredPartitioningScheme = Optional.empty();
if (writeTableLayout.isPresent()) {
List<Symbol> partitionFunctionArguments = new ArrayList<>();
writeTableLayout.get().getPartitionColumns().stream()
Expand All @@ -505,10 +505,9 @@ private RelationPlan createTableWriterPlan(
Partitioning.create(partitioningHandle.get(), partitionFunctionArguments),
outputLayout));
}
else if (isUsePreferredWritePartitioning(session)) {
// TODO: move to iterative optimizer and use CBO
else {
// empty connector partitioning handle means evenly partitioning on partitioning columns
partitioningScheme = Optional.of(new PartitioningScheme(
preferredPartitioningScheme = Optional.of(new PartitioningScheme(
Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments),
outputLayout));
}
Expand Down Expand Up @@ -547,6 +546,7 @@ else if (isUsePreferredWritePartitioning(session)) {
columnNames,
notNullColumnSymbols,
partitioningScheme,
preferredPartitioningScheme,
Optional.of(partialAggregation),
Optional.of(result.getDescriptor().map(aggregations.getMappings()::get))),
target,
Expand All @@ -569,6 +569,7 @@ else if (isUsePreferredWritePartitioning(session)) {
columnNames,
notNullColumnSymbols,
partitioningScheme,
preferredPartitioningScheme,
Optional.empty(),
Optional.empty()),
target,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.trino.sql.planner.iterative.rule.MergeUnion;
import io.trino.sql.planner.iterative.rule.MultipleDistinctAggregationToMarkDistinct;
import io.trino.sql.planner.iterative.rule.OptimizeDuplicateInsensitiveJoins;
import io.trino.sql.planner.iterative.rule.PreferWritePartitioning;
import io.trino.sql.planner.iterative.rule.PruneAggregationColumns;
import io.trino.sql.planner.iterative.rule.PruneAggregationSourceColumns;
import io.trino.sql.planner.iterative.rule.PruneApplyColumns;
Expand Down Expand Up @@ -693,7 +694,14 @@ public PlanOptimizers(
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new RemoveRedundantIdentityProjections())),

// Prefer write partitioning rule requires accurate stats.
// Therefore PredicatePushDown, columnPruningOptimizer and
// RemoveRedundantIdentityProjections need to run beforehand.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could remove RemoveRedundantIdentityProjections from both comments (here and below) as it should not affect stats. This could be separate PR

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll handle it when I get some free cycles

new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new PreferWritePartitioning())),
// Because ReorderJoins runs only once,
// PredicatePushDown, columnPruningOptimizer and RemoveRedundantIdentityProjections
// need to run beforehand in order to produce an optimal join order
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner.iterative.rule;

import io.trino.Session;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.planner.iterative.Rule;
import io.trino.sql.planner.plan.TableWriterNode;

import java.util.Optional;

import static io.trino.SystemSessionProperties.getPreferredWritePartitioningMinNumberOfPartitions;
import static io.trino.SystemSessionProperties.isUsePreferredWritePartitioning;
import static io.trino.cost.AggregationStatsRule.getRowsCount;
import static io.trino.sql.planner.plan.Patterns.tableWriterNode;
import static java.lang.Double.isNaN;

public class PreferWritePartitioning
Comment thread
skrzypo987 marked this conversation as resolved.
Outdated
implements Rule<TableWriterNode>
{
@Override
public Pattern<TableWriterNode> getPattern()
{
return tableWriterNode();
}

@Override
public boolean isEnabled(Session session)
{
return isUsePreferredWritePartitioning(session);
Copy link
Copy Markdown
Member

@sopel39 sopel39 Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also enable use-preferred-write-partitioning by default (separate commit).
Maybe we should also rename that property to use-automatic-preferred-write-partitioning and mark the old one as legacy since the previous behavior is changing. wdyt @findepi ?

}

@Override
public Result apply(TableWriterNode node, Captures captures, Context context)
{
if (node.getPreferredPartitioningScheme().isEmpty()) {
return Result.empty();
}

int minimumNumberOfPartitions = getPreferredWritePartitioningMinNumberOfPartitions(context.getSession());
Comment thread
skrzypo987 marked this conversation as resolved.
Outdated
if (minimumNumberOfPartitions <= 1) {
// Force 'preferred write partitioning' even if stats are missing or broken
return enable(node);
}

double expectedNumberOfPartitions = getRowsCount(
context.getStatsProvider().getStats(node.getSource()),
node.getPreferredPartitioningScheme().get().getPartitioning().getColumns());

if (isNaN(expectedNumberOfPartitions) || expectedNumberOfPartitions < minimumNumberOfPartitions) {
return Result.empty();
}

return enable(node);
}

private Result enable(TableWriterNode node)
{
return Result.ofPlanNode(new TableWriterNode(
node.getId(),
node.getSource(),
node.getTarget(),
node.getRowCountSymbol(),
node.getFragmentSymbol(),
node.getColumns(),
node.getColumnNames(),
node.getNotNullColumnSymbols(),
node.getPreferredPartitioningScheme(),
Optional.empty(),
node.getStatisticsAggregation(),
node.getStatisticsAggregationDescriptor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<Context> c
node.getColumnNames(),
node.getNotNullColumnSymbols(),
node.getPartitioningScheme(),
node.getPreferredPartitioningScheme(),
node.getStatisticsAggregation(),
node.getStatisticsAggregationDescriptor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<Set<Symbol
node.getColumnNames(),
node.getNotNullColumnSymbols(),
node.getPartitioningScheme(),
node.getPreferredPartitioningScheme(),
node.getStatisticsAggregation(),
node.getStatisticsAggregationDescriptor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ public TableWriterNode map(TableWriterNode node, PlanNode source, PlanNodeId new
node.getColumnNames(),
node.getNotNullColumnSymbols(),
node.getPartitioningScheme().map(partitioningScheme -> map(partitioningScheme, source.getOutputSymbols())),
node.getPreferredPartitioningScheme().map(partitioningScheme -> map(partitioningScheme, source.getOutputSymbols())),
node.getStatisticsAggregation().map(this::map),
node.getStatisticsAggregationDescriptor().map(descriptor -> descriptor.map(this::map)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TableWriterNode
private final List<String> columnNames;
private final Set<Symbol> notNullColumnSymbols;
private final Optional<PartitioningScheme> partitioningScheme;
private final Optional<PartitioningScheme> preferredPartitioningScheme;
private final Optional<StatisticAggregations> statisticsAggregation;
private final Optional<StatisticAggregationsDescriptor<Symbol>> statisticsAggregationDescriptor;
private final List<Symbol> outputs;
Expand All @@ -67,6 +68,7 @@ public TableWriterNode(
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("notNullColumnSymbols") Set<Symbol> notNullColumnSymbols,
@JsonProperty("partitioningScheme") Optional<PartitioningScheme> partitioningScheme,
@JsonProperty("preferredPartitioningScheme") Optional<PartitioningScheme> preferredPartitioningScheme,
@JsonProperty("statisticsAggregation") Optional<StatisticAggregations> statisticsAggregation,
@JsonProperty("statisticsAggregationDescriptor") Optional<StatisticAggregationsDescriptor<Symbol>> statisticsAggregationDescriptor)
{
Expand All @@ -84,9 +86,11 @@ public TableWriterNode(
this.columnNames = ImmutableList.copyOf(columnNames);
this.notNullColumnSymbols = ImmutableSet.copyOf(requireNonNull(notNullColumnSymbols, "notNullColumns is null"));
this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null");
this.preferredPartitioningScheme = requireNonNull(preferredPartitioningScheme, "preferredPartitioningScheme is null");
Comment thread
skrzypo987 marked this conversation as resolved.
Outdated
this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null");
this.statisticsAggregationDescriptor = requireNonNull(statisticsAggregationDescriptor, "statisticsAggregationDescriptor is null");
checkArgument(statisticsAggregation.isPresent() == statisticsAggregationDescriptor.isPresent(), "statisticsAggregation and statisticsAggregationDescriptor must be either present or absent");
checkArgument(partitioningScheme.isEmpty() || preferredPartitioningScheme.isEmpty(), "Both partitioningScheme and preferredPartitioningScheme cannot be present");

ImmutableList.Builder<Symbol> outputs = ImmutableList.<Symbol>builder()
.add(rowCountSymbol)
Expand Down Expand Up @@ -146,6 +150,12 @@ public Optional<PartitioningScheme> getPartitioningScheme()
return partitioningScheme;
}

@JsonProperty
public Optional<PartitioningScheme> getPreferredPartitioningScheme()
{
return preferredPartitioningScheme;
}

@JsonProperty
public Optional<StatisticAggregations> getStatisticsAggregation()
{
Expand Down Expand Up @@ -179,7 +189,7 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, rowCountSymbol, fragmentSymbol, columns, columnNames, notNullColumnSymbols, partitioningScheme, statisticsAggregation, statisticsAggregationDescriptor);
return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, rowCountSymbol, fragmentSymbol, columns, columnNames, notNullColumnSymbols, partitioningScheme, preferredPartitioningScheme, statisticsAggregation, statisticsAggregationDescriptor);
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public void testDefaults()
.setJoinReorderingStrategy(JoinReorderingStrategy.AUTOMATIC)
.setMaxReorderedJoins(9)
.setRedistributeWrites(true)
.setUsePreferredWritePartitioning(false)
.setUsePreferredWritePartitioning(true)
.setPreferredWritePartitioningMinNumberOfPartitions(50)
.setScaleWriters(false)
.setWriterMinSize(DataSize.of(32, MEGABYTE))
.setOptimizeMetadataQueries(false)
Expand Down Expand Up @@ -135,7 +136,8 @@ public void testExplicitPropertyMappings()
.put("optimizer.join-reordering-strategy", "NONE")
.put("optimizer.max-reordered-joins", "5")
.put("redistribute-writes", "false")
.put("use-preferred-write-partitioning", "true")
.put("use-preferred-write-partitioning", "false")
.put("preferred-write-partitioning-min-number-of-partitions", "10")
.put("scale-writers", "true")
.put("writer-min-size", "42GB")
.put("optimizer.optimize-metadata-queries", "true")
Expand Down Expand Up @@ -204,7 +206,8 @@ public void testExplicitPropertyMappings()
.setJoinReorderingStrategy(NONE)
.setMaxReorderedJoins(5)
.setRedistributeWrites(false)
.setUsePreferredWritePartitioning(true)
.setUsePreferredWritePartitioning(false)
.setPreferredWritePartitioningMinNumberOfPartitions(10)
.setScaleWriters(true)
.setWriterMinSize(DataSize.of(42, GIGABYTE))
.setOptimizeMetadataQueries(true)
Expand Down
Loading