Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ public void setupServer()
throws Exception
{
Logging.initialize();
server = TestingTrinoServer.create();
server = TestingTrinoServer.builder()
.addProperty("optimizer.materialize-table.enabled", "false")
.build();

server.installPlugin(new TpchPlugin());
server.createCatalog(TEST_CATALOG, "tpch");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public final class SystemSessionProperties
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String USE_COST_BASED_PARTITIONING = "use_cost_based_partitioning";
public static final String PUSH_FILTER_INTO_VALUES_MAX_ROW_COUNT = "push_filter_into_values_max_row_count";
public static final String MATERIALIZE_TABLE_ENABLED = "materialize_table_enabled";
public static final String MATERIALIZE_TABLE_MAX_ESTIMATED_ROW_COUNT = "materialize_table_max_estimated_row_count";
public static final String MATERIALIZE_TABLE_MAX_ACTUAL_ROW_COUNT = "materialize_table_max_actual_row_count";
public static final String MATERIALIZE_TABLE_TIMEOUT = "materialize_table_timeout";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size";
public static final String IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD = "idle_writer_min_data_size_threshold";
Expand Down Expand Up @@ -1102,6 +1106,26 @@ public SystemSessionProperties(
"Maximum number of rows in values for which filter is pushed down into values",
optimizerConfig.getPushFilterIntoValuesMaxRowCount(),
false),
booleanProperty(
MATERIALIZE_TABLE_ENABLED,
"Materialize tables during planning",
optimizerConfig.isMaterializeTable(),
false),
integerProperty(
MATERIALIZE_TABLE_MAX_ESTIMATED_ROW_COUNT,
"Maximum estimated row count for a table to be materialized",
optimizerConfig.getMaterializeTableMaxEstimatedRowCount(),
false),
integerProperty(
MATERIALIZE_TABLE_MAX_ACTUAL_ROW_COUNT,
"Maximum actual row count for a materialized table",
optimizerConfig.getMaterializeTableMaxActualRowCount(),
false),
durationProperty(
MATERIALIZE_TABLE_TIMEOUT,
"Maximum time to wait for materializing a table",
optimizerConfig.getMaterializeTableTimeout(),
false),
booleanProperty(
FORCE_SPILLING_JOIN,
"Force the usage of spliing join operator in favor of the non-spilling one, even if spill is not enabled",
Expand Down Expand Up @@ -1993,6 +2017,26 @@ public static int getPushFilterIntoValuesMaxRowCount(Session session)
return session.getSystemProperty(PUSH_FILTER_INTO_VALUES_MAX_ROW_COUNT, Integer.class);
}

public static boolean isMaterializeTableEnabled(Session session)
{
return session.getSystemProperty(MATERIALIZE_TABLE_ENABLED, Boolean.class);
}

public static int getMaterializeTableMaxEstimatedRowCount(Session session)
{
return session.getSystemProperty(MATERIALIZE_TABLE_MAX_ESTIMATED_ROW_COUNT, Integer.class);
}

public static int getMaterializeTableMaxActualRowCount(Session session)
{
return session.getSystemProperty(MATERIALIZE_TABLE_MAX_ACTUAL_ROW_COUNT, Integer.class);
}

public static Duration getMaterializeTableTimeout(Session session)
{
return session.getSystemProperty(MATERIALIZE_TABLE_TIMEOUT, Duration.class);
}

public static boolean isForceSpillingOperator(Session session)
{
return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import io.trino.sql.PlannerContext;
import io.trino.sql.analyzer.AnalyzerFactory;
import io.trino.sql.analyzer.QueryExplainerFactory;
import io.trino.sql.planner.ForPlanner;
import io.trino.sql.planner.OptimizerStatsMBeanExporter;
import io.trino.sql.planner.PlanFragmenter;
import io.trino.sql.planner.PlanOptimizers;
Expand Down Expand Up @@ -307,6 +308,8 @@ List<OutputStatsEstimatorFactory> getCompositeOutputDataSizeEstimatorDelegateFac
// planner
binder.bind(PlanFragmenter.class).in(Scopes.SINGLETON);
binder.bind(PlanOptimizersFactory.class).to(PlanOptimizers.class).in(Scopes.SINGLETON);
binder.bind(ExecutorService.class).annotatedWith(ForPlanner.class)
.toInstance(newCachedThreadPool(threadsNamed("planner-%s")));

// Optimizer/Rule Stats exporter
binder.bind(RuleStatsRecorder.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -372,6 +375,7 @@ List<OutputStatsEstimatorFactory> getCompositeOutputDataSizeEstimatorDelegateFac
closingBinder(binder).registerExecutor(Key.get(ScheduledExecutorService.class, ForStatementResource.class));
closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForQueryExecution.class));
closingBinder(binder).registerExecutor(Key.get(ScheduledExecutorService.class, ForScheduler.class));
closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForPlanner.class));
}

// working around circular dependency Metadata <-> PlannerContext
Expand Down
10 changes: 3 additions & 7 deletions core/trino-main/src/main/java/io/trino/sql/DynamicFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,16 @@ public static Expression replaceDynamicFilterId(Call dynamicFilterFunctionCall,

public static boolean isDynamicFilter(Expression expression)
{
return getDescriptor(expression).isPresent();
return (expression instanceof Call call) && isDynamicFilterFunction(call);
}

public static Optional<Descriptor> getDescriptor(Expression expression)
{
if (!(expression instanceof Call call)) {
if (!isDynamicFilter(expression)) {
return Optional.empty();
}

if (!isDynamicFilterFunction(call)) {
return Optional.empty();
}

List<Expression> arguments = call.arguments();
List<Expression> arguments = ((Call) expression).arguments();
checkArgument(arguments.size() == 4, "invalid arguments count: %s", arguments.size());

Expression probeSymbol = arguments.get(0);
Expand Down
29 changes: 29 additions & 0 deletions core/trino-main/src/main/java/io/trino/sql/planner/ForPlanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForPlanner {}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig({
"adaptive-partial-aggregation.min-rows",
Expand Down Expand Up @@ -92,7 +93,11 @@ public class OptimizerConfig
private boolean forceSingleNodeOutput;
private boolean useExactPartitioning;
private boolean useCostBasedPartitioning = true;
private int pushFilterIntoValuesMaxRowCount = 100;
private int pushFilterIntoValuesMaxRowCount = 100_000;
private boolean materializeTable = true;
private int materializeTableMaxEstimatedRowCount = 50_000;
private int materializeTableMaxActualRowCount = 100_000;
private Duration materializeTableTimeout = new Duration(5, SECONDS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds is quite long as a default, 1-2 seconds seems more reasonable

// adaptive partial aggregation
private boolean adaptivePartialAggregationEnabled = true;
private double adaptivePartialAggregationUniqueRowsRatioThreshold = 0.8;
Expand Down Expand Up @@ -823,6 +828,60 @@ public OptimizerConfig setPushFilterIntoValuesMaxRowCount(int pushFilterIntoValu
return this;
}

public boolean isMaterializeTable()
{
return materializeTable;
}

@Config("optimizer.materialize-table.enabled")
@ConfigDescription("Materialize tables during planning")
public OptimizerConfig setMaterializeTable(boolean materializeTable)
{
this.materializeTable = materializeTable;
return this;
}

@Min(0)
public int getMaterializeTableMaxEstimatedRowCount()
{
return materializeTableMaxEstimatedRowCount;
}

@Config("optimizer.materialize-table.max-estimated-row-count")
@ConfigDescription("Maximum estimated row count for a table to be materialized")
public OptimizerConfig setMaterializeTableMaxEstimatedRowCount(int materializeTableMaxEstimatedRowCount)
{
this.materializeTableMaxEstimatedRowCount = materializeTableMaxEstimatedRowCount;
return this;
}

@Min(0)
public int getMaterializeTableMaxActualRowCount()
{
return materializeTableMaxActualRowCount;
}

@Config("optimizer.materialize-table.max-actual-row-count")
@ConfigDescription("Maximum actual row count for a materialized table")
public OptimizerConfig setMaterializeTableMaxActualRowCount(int materializeTableMaxActualRowCount)
{
this.materializeTableMaxActualRowCount = materializeTableMaxActualRowCount;
return this;
}

public Duration getMaterializeTableTimeout()
{
return materializeTableTimeout;
}

@Config("optimizer.materialize-table.timeout")
@ConfigDescription("Maximum time to wait for materializing a table")
public OptimizerConfig setMaterializeTableTimeout(Duration materializeTableTimeout)
{
this.materializeTableTimeout = materializeTableTimeout;
return this;
}

public boolean isUnsafePushdownAllowed()
{
return allowUnsafePushdown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.cost.StatsCalculator;
import io.trino.cost.TaskCountEstimator;
import io.trino.execution.TaskManagerConfig;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Metadata;
import io.trino.split.PageSourceManager;
import io.trino.split.SplitManager;
Expand Down Expand Up @@ -255,6 +256,8 @@
import io.trino.sql.planner.optimizations.HashGenerationOptimizer;
import io.trino.sql.planner.optimizations.IndexJoinOptimizer;
import io.trino.sql.planner.optimizations.LimitPushDown;
import io.trino.sql.planner.optimizations.MaterializeFilteredTableScan;
import io.trino.sql.planner.optimizations.MaterializeTableScan;
import io.trino.sql.planner.optimizations.MetadataQueryOptimizer;
import io.trino.sql.planner.optimizations.OptimizerStats;
import io.trino.sql.planner.optimizations.PlanOptimizer;
Expand All @@ -267,6 +270,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import static java.util.Objects.requireNonNull;

Expand All @@ -281,9 +285,11 @@ public class PlanOptimizers
@Inject
public PlanOptimizers(
PlannerContext plannerContext,
@ForPlanner ExecutorService executor,
TaskManagerConfig taskManagerConfig,
SplitManager splitManager,
PageSourceManager pageSourceManager,
InternalNodeManager nodeManager,
StatsCalculator statsCalculator,
ScalarStatsCalculator scalarStatsCalculator,
CostCalculator costCalculatorWithoutEstimatedExchanges,
Expand All @@ -294,10 +300,12 @@ public PlanOptimizers(
RuleStatsRecorder ruleStats)
{
this(plannerContext,
executor,
taskManagerConfig,
false,
splitManager,
pageSourceManager,
nodeManager,
statsCalculator,
scalarStatsCalculator,
costCalculatorWithoutEstimatedExchanges,
Expand All @@ -310,10 +318,12 @@ public PlanOptimizers(

public PlanOptimizers(
PlannerContext plannerContext,
ExecutorService executor,
TaskManagerConfig taskManagerConfig,
boolean forceSingleNode,
SplitManager splitManager,
PageSourceManager pageSourceManager,
InternalNodeManager nodeManager,
StatsCalculator statsCalculator,
ScalarStatsCalculator scalarStatsCalculator,
CostCalculator costCalculatorWithoutEstimatedExchanges,
Expand Down Expand Up @@ -644,6 +654,10 @@ public PlanOptimizers(
.add(new PushDistinctLimitIntoTableScan(plannerContext))
.add(new PushTopNIntoTableScan(metadata))
.add(new RewriteTableFunctionToTableScan(plannerContext)) // must run after ImplementTableFunctionSource
.add(new MaterializeFilteredTableScan(plannerContext, splitManager, pageSourceManager, nodeManager, executor))
.add(new MaterializeTableScan(plannerContext, splitManager, pageSourceManager, nodeManager, executor))
Comment on lines +657 to +658
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be too early in the planning process for this optimization. Ideally, we want to delay this until after as much predicate and projection pushdown as possible has happened.
I think we should at least do it after or near io.trino.sql.planner.optimizations.MetadataQueryOptimizer runs as that is a similar optimization as this one and we probably want that one to run that one first.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have NO IDEA where to put this optimization. I just looked around and guessed. If you can tell me exactly where to put it, that would be great. Or if you want to take over this PR, I'm happy for that too..

.add(new PushFilterIntoValues(plannerContext))
.add(new ReplaceJoinOverConstantWithProject())
.build();
IterativeOptimizer pushIntoTableScanOptimizer = new IterativeOptimizer(
plannerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.sql.planner.iterative.rule;

import com.google.common.collect.ImmutableList;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.planner.iterative.Rule;
Expand Down Expand Up @@ -43,7 +42,7 @@ public Result apply(IntersectNode node, Captures captures, Context context)
boolean hasEmptyBranches = node.getSources().stream().anyMatch(source -> isEmpty(source, context.getLookup()));

if (hasEmptyBranches) {
return Result.ofPlanNode(new ValuesNode(node.getId(), node.getOutputSymbols(), ImmutableList.of()));
return Result.ofPlanNode(new ValuesNode(node.getId(), node.getOutputSymbols()));
}

return Result.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.sql.planner.iterative.rule;

import com.google.common.collect.ImmutableList;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.planner.iterative.Rule;
Expand Down Expand Up @@ -41,6 +40,6 @@ public Pattern<SampleNode> getPattern()
@Override
public Result apply(SampleNode sample, Captures captures, Context context)
{
return Result.ofPlanNode(new ValuesNode(sample.getId(), sample.getOutputSymbols(), ImmutableList.of()));
return Result.ofPlanNode(new ValuesNode(sample.getId(), sample.getOutputSymbols()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private static Result pushFilter(FilterNode filterNode, AggregationNode aggregat

if (tupleDomain.isNone()) {
// Filter predicate is never satisfied. Replace filter with empty values.
return Result.ofPlanNode(new ValuesNode(filterNode.getId(), filterNode.getOutputSymbols(), ImmutableList.of()));
return Result.ofPlanNode(new ValuesNode(filterNode.getId(), filterNode.getOutputSymbols()));
}

// boolOrSymbol (in remaining expressions) should only be used in Coalesce(boolOrSymbol, FALSE) expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static Result pushFilter(FilterNode filterNode, AggregationNode aggregat

if (tupleDomain.isNone()) {
// Filter predicate is never satisfied. Replace filter with empty values.
return Result.ofPlanNode(new ValuesNode(filterNode.getId(), filterNode.getOutputSymbols(), ImmutableList.of()));
return Result.ofPlanNode(new ValuesNode(filterNode.getId(), filterNode.getOutputSymbols()));
}
Domain countDomain = tupleDomain.getDomains().get().get(countSymbol);
if (countDomain == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.sql.planner.iterative.rule;

import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.cost.StatsProvider;
Expand Down Expand Up @@ -217,7 +216,7 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
// TODO: DomainTranslator.fromPredicate can infer that the expression is "false" in some cases (TupleDomain.none()).
// This should move to another rule that simplifies the filter using that logic and then rely on RemoveTrivialFilters
// to turn the subtree into a Values node
return Optional.of(new ValuesNode(node.getId(), node.getOutputSymbols(), ImmutableList.of()));
return Optional.of(new ValuesNode(node.getId(), node.getOutputSymbols()));
}

Optional<ConstraintApplicationResult<TableHandle>> result = plannerContext.getMetadata().applyFilter(session, node.getTable(), constraint);
Expand All @@ -231,7 +230,7 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
TableProperties newTableProperties = plannerContext.getMetadata().getTableProperties(session, newTable);
Optional<TablePartitioning> newTablePartitioning = newTableProperties.getTablePartitioning();
if (newTableProperties.getPredicate().isNone()) {
return Optional.of(new ValuesNode(node.getId(), node.getOutputSymbols(), ImmutableList.of()));
return Optional.of(new ValuesNode(node.getId(), node.getOutputSymbols()));
}

TupleDomain<ColumnHandle> remainingFilter = result.get().getRemainingFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Result apply(FilterNode filter, Captures captures, Context context)
return Result.empty();
}
if (upperBound.getAsInt() <= 0) {
return Result.ofPlanNode(new ValuesNode(filter.getId(), filter.getOutputSymbols(), ImmutableList.of()));
return Result.ofPlanNode(new ValuesNode(filter.getId(), filter.getOutputSymbols()));
}
boolean updatedMaxRowCountPerPartition = false;
if (rowNumber.getMaxRowCountPerPartition().isEmpty() || rowNumber.getMaxRowCountPerPartition().get() > upperBound.getAsInt()) {
Expand Down
Loading