Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,15 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
*/
OptionalInt getMaxWriterTasks(Session session, String catalogName);

/**
* Workaround to lack of statistics about IO and CPU operations performed by the connector.
* In the long term, this should be replaced by improvements in the cost model.
*
* @return true if the cumulative cost of splitting a read of the specified tableHandle into multiple reads,
* each of which projects a subset of the required columns, is not significantly more than the cost of reading the specified tableHandle
*/
boolean allowSplittingReadIntoMultipleSubQueries(Session session, TableHandle tableHandle);

/**
* Returns writer scaling options for the specified table. This method is called when table handle is not available during CTAS.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2792,6 +2792,15 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName)
return catalogMetadata.getMetadata(session).getMaxWriterTasks(session.toConnectorSession(catalogHandle));
}

@Override
public boolean allowSplittingReadIntoMultipleSubQueries(Session session, TableHandle tableHandle)
{
CatalogHandle catalogHandle = tableHandle.catalogHandle();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return catalogMetadata.getMetadata(session).allowSplittingReadIntoMultipleSubQueries(connectorSession, tableHandle.connectorHandle());
}

@Override
public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map<String, Object> tableProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public enum DistinctAggregationsStrategy
SINGLE_STEP,
MARK_DISTINCT,
PRE_AGGREGATE,
SPLIT_TO_SUBQUERIES,
AUTOMATIC,
}

Expand Down
19 changes: 17 additions & 2 deletions core/trino-main/src/main/java/io/trino/sql/planner/PlanCopier.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.sql.planner;

import io.trino.sql.planner.iterative.GroupReference;
import io.trino.sql.planner.iterative.Lookup;
import io.trino.sql.planner.optimizations.UnaliasSymbolReferences;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.ApplyNode;
Expand Down Expand Up @@ -56,18 +58,25 @@ private PlanCopier() {}

public static NodeAndMappings copyPlan(PlanNode plan, List<Symbol> fields, SymbolAllocator symbolAllocator, PlanNodeIdAllocator idAllocator)
{
PlanNode copy = SimplePlanRewriter.rewriteWith(new Copier(idAllocator), plan, null);
return copyPlan(plan, fields, symbolAllocator, idAllocator, Lookup.noLookup());
}

public static NodeAndMappings copyPlan(PlanNode plan, List<Symbol> fields, SymbolAllocator symbolAllocator, PlanNodeIdAllocator idAllocator, Lookup lookup)
{
PlanNode copy = SimplePlanRewriter.rewriteWith(new Copier(idAllocator, lookup), plan, null);
return new UnaliasSymbolReferences().reallocateSymbols(copy, fields, symbolAllocator);
}

private static class Copier
extends SimplePlanRewriter<Void>
{
private final PlanNodeIdAllocator idAllocator;
private final Lookup lookup;

private Copier(PlanNodeIdAllocator idAllocator)
private Copier(PlanNodeIdAllocator idAllocator, Lookup lookup)
{
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.lookup = requireNonNull(lookup, "lookup is null");
}

@Override
Expand All @@ -76,6 +85,12 @@ protected PlanNode visitPlan(PlanNode node, RewriteContext<Void> context)
throw new UnsupportedOperationException("plan copying not implemented for " + node.getClass().getSimpleName());
}

@Override
public PlanNode visitGroupReference(GroupReference node, RewriteContext<Void> context)
{
return context.rewrite(lookup.resolve(node));
}

@Override
public PlanNode visitAggregation(AggregationNode node, RewriteContext<Void> context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.trino.sql.planner.iterative.rule.MergeProjectWithValues;
import io.trino.sql.planner.iterative.rule.MergeUnion;
import io.trino.sql.planner.iterative.rule.MultipleDistinctAggregationToMarkDistinct;
import io.trino.sql.planner.iterative.rule.MultipleDistinctAggregationsToSubqueries;
import io.trino.sql.planner.iterative.rule.OptimizeDuplicateInsensitiveJoins;
import io.trino.sql.planner.iterative.rule.OptimizeMixedDistinctAggregations;
import io.trino.sql.planner.iterative.rule.OptimizeRowPattern;
Expand Down Expand Up @@ -682,10 +683,15 @@ public PlanOptimizers(
new RemoveRedundantIdentityProjections(),
new PushAggregationThroughOuterJoin(),
new ReplaceRedundantJoinWithSource(), // Run this after PredicatePushDown optimizer as it inlines filter constants
// Run this after PredicatePushDown and PushProjectionIntoTableScan as it uses stats, and those two rules may reduce the number of partitions
// and columns we need stats for thus reducing the overhead of reading statistics from the metastore.
new MultipleDistinctAggregationsToSubqueries(taskCountEstimator, metadata),
// Run SingleDistinctAggregationToGroupBy after MultipleDistinctAggregationsToSubqueries to ensure the single column distinct is optimized
new SingleDistinctAggregationToGroupBy(),
new OptimizeMixedDistinctAggregations(plannerContext, taskCountEstimator), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
// It also is run before MultipleDistinctAggregationToMarkDistinct to take precedence if enabled
// It also is run before MultipleDistinctAggregationToMarkDistinct to take precedence f enabled
new ImplementFilteredAggregations(), // DistinctAggregationToGroupBy will add filters if fired
new MultipleDistinctAggregationToMarkDistinct(taskCountEstimator))), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
new MultipleDistinctAggregationToMarkDistinct(taskCountEstimator, metadata))), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
inlineProjections,
simplifyOptimizer, // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations
pushProjectionIntoTableScanOptimizer,
Expand Down
Loading