Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class OptimizerConfig
private int maxReorderedJoins = 9;

private boolean enableStatsCalculator = true;
private boolean statisticsPrecalculationForPushdownEnabled;
private boolean statisticsPrecalculationForPushdownEnabled = true;
private boolean collectPlanStatisticsForAllQueries;
private boolean ignoreStatsCalculatorFailures = true;
private boolean defaultFilterFactorEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testDefaults()
.setUsePreferredWritePartitioning(true)
.setPreferredWritePartitioningMinNumberOfPartitions(50)
.setEnableStatsCalculator(true)
.setStatisticsPrecalculationForPushdownEnabled(false)
.setStatisticsPrecalculationForPushdownEnabled(true)
.setCollectPlanStatisticsForAllQueries(false)
.setIgnoreStatsCalculatorFailures(true)
.setDefaultFilterFactorEnabled(false)
Expand Down Expand Up @@ -96,7 +96,7 @@ public void testExplicitPropertyMappings()
.put("memory-cost-weight", "0.3")
.put("network-cost-weight", "0.2")
.put("enable-stats-calculator", "false")
.put("statistics-precalculation-for-pushdown.enabled", "true")
.put("statistics-precalculation-for-pushdown.enabled", "false")
.put("collect-plan-statistics-for-all-queries", "true")
.put("optimizer.ignore-stats-calculator-failures", "false")
.put("optimizer.default-filter-factor-enabled", "true")
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testExplicitPropertyMappings()
.setMemoryCostWeight(0.3)
.setNetworkCostWeight(0.2)
.setEnableStatsCalculator(false)
.setStatisticsPrecalculationForPushdownEnabled(true)
.setStatisticsPrecalculationForPushdownEnabled(false)
.setCollectPlanStatisticsForAllQueries(true)
.setIgnoreStatsCalculatorFailures(false)
.setJoinDistributionType(BROADCAST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ public class DefaultJdbcMetadata
private static final String SYNTHETIC_COLUMN_NAME_PREFIX = "_pfgnrtd_";

private final JdbcClient jdbcClient;
private final boolean precalculateStatisticsForPushdown;

private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();

public DefaultJdbcMetadata(JdbcClient jdbcClient)
public DefaultJdbcMetadata(JdbcClient jdbcClient, boolean precalculateStatisticsForPushdown)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.precalculateStatisticsForPushdown = precalculateStatisticsForPushdown;
}

@Override
Expand Down Expand Up @@ -210,8 +212,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

return Optional.of(
remainingExpression.isPresent()
? new ConstraintApplicationResult<>(handle, remainingFilter, remainingExpression.get(), false)
: new ConstraintApplicationResult<>(handle, remainingFilter, false));
? new ConstraintApplicationResult<>(handle, remainingFilter, remainingExpression.get(), precalculateStatisticsForPushdown)
: new ConstraintApplicationResult<>(handle, remainingFilter, precalculateStatisticsForPushdown));
}

private JdbcTableHandle flushAttributesAsQuery(ConnectorSession session, JdbcTableHandle handle)
Expand Down Expand Up @@ -270,7 +272,7 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
assignment.getValue(),
((JdbcColumnHandle) assignment.getValue()).getColumnType()))
.collect(toImmutableList()),
false));
precalculateStatisticsForPushdown));
}

@Override
Expand Down Expand Up @@ -364,7 +366,7 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
handle.getAllReferencedTables(),
nextSyntheticColumnId);

return Optional.of(new AggregationApplicationResult<>(handle, projections.build(), resultAssignments.build(), ImmutableMap.of(), false));
return Optional.of(new AggregationApplicationResult<>(handle, projections.build(), resultAssignments.build(), ImmutableMap.of(), precalculateStatisticsForPushdown));
}

@Override
Expand Down Expand Up @@ -449,7 +451,7 @@ public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin(
nextSyntheticColumnId),
ImmutableMap.copyOf(newLeftColumns),
ImmutableMap.copyOf(newRightColumns),
false));
precalculateStatisticsForPushdown));
}

private static Optional<JdbcColumnHandle> getVariableColumnHandle(Map<String, ColumnHandle> assignments, ConnectorExpression expression)
Expand Down Expand Up @@ -505,7 +507,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
handle.getOtherReferencedTables(),
handle.getNextSyntheticColumnId());

return Optional.of(new LimitApplicationResult<>(handle, jdbcClient.isLimitGuaranteed(session), false));
return Optional.of(new LimitApplicationResult<>(handle, jdbcClient.isLimitGuaranteed(session), precalculateStatisticsForPushdown));
}

@Override
Expand Down Expand Up @@ -554,7 +556,7 @@ public Optional<TopNApplicationResult<ConnectorTableHandle>> applyTopN(
handle.getOtherReferencedTables(),
handle.getNextSyntheticColumnId());

return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNGuaranteed(session), false));
return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNGuaranteed(session), precalculateStatisticsForPushdown));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public JdbcMetadata create(JdbcTransactionHandle transaction)

protected JdbcMetadata create(JdbcClient transactionCachingJdbcClient)
{
return new DefaultJdbcMetadata(transactionCachingJdbcClient);
return new DefaultJdbcMetadata(transactionCachingJdbcClient, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;

import java.util.Optional;

public class JdbcJoinPushdownConfig
{
private JoinPushdownStrategy joinPushdownStrategy = JoinPushdownStrategy.AUTOMATIC;
private Optional<DataSize> joinPushdownAutomaticMaxTableSize = Optional.empty();
// Normally we would put 1.0 as a default value here to only allow joins which do not expand data to be pushed down.
// We use 1.25 to adjust for the fact that NDV estimations sometimes are off and joins which should be pushed down are not.
private double joinPushdownAutomaticMaxJoinToTablesRatio = 1.25;

public JoinPushdownStrategy getJoinPushdownStrategy()
{
return joinPushdownStrategy;
}

@Config("join-pushdown.strategy")
public JdbcJoinPushdownConfig setJoinPushdownStrategy(JoinPushdownStrategy joinPushdownStrategy)
{
this.joinPushdownStrategy = joinPushdownStrategy;
return this;
}

public Optional<DataSize> getJoinPushdownAutomaticMaxTableSize()
{
return joinPushdownAutomaticMaxTableSize;
}

@Config("experimental.join-pushdown.automatic.max-table-size")
@ConfigDescription("Maximum table size to be considered for join pushdown")
public JdbcJoinPushdownConfig setJoinPushdownAutomaticMaxTableSize(DataSize joinPushdownAutomaticMaxTableSize)
{
this.joinPushdownAutomaticMaxTableSize = Optional.ofNullable(joinPushdownAutomaticMaxTableSize);
return this;
}

public double getJoinPushdownAutomaticMaxJoinToTablesRatio()
{
return joinPushdownAutomaticMaxJoinToTablesRatio;
}

@Config("experimental.join-pushdown.automatic.max-join-to-tables-ratio")
@ConfigDescription("If estimated join output size is greater than or equal to ratio * sum of table sizes, then join pushdown will not be performed")
public JdbcJoinPushdownConfig setJoinPushdownAutomaticMaxJoinToTablesRatio(double joinPushdownAutomaticMaxJoinToTablesRatio)
{
this.joinPushdownAutomaticMaxJoinToTablesRatio = joinPushdownAutomaticMaxJoinToTablesRatio;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import javax.inject.Inject;

import java.util.List;
import java.util.Optional;

import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;

public final class JdbcJoinPushdownSessionProperties
implements SessionPropertiesProvider
{
public static final String JOIN_PUSHDOWN_STRATEGY = "join_pushdown_strategy";
public static final String JOIN_PUSHDOWN_AUTOMATIC_MAX_TABLE_SIZE = "join_pushdown_automatic_max_table_size";
public static final String JOIN_PUSHDOWN_AUTOMATIC_MAX_JOIN_TO_TABLES_RATIO = "join_pushdown_automatic_max_join_to_tables_ratio";

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public JdbcJoinPushdownSessionProperties(JdbcJoinPushdownConfig joinPushdownConfig)
{
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(enumProperty(
JOIN_PUSHDOWN_STRATEGY,
"Join pushdown strategy",
JoinPushdownStrategy.class,
joinPushdownConfig.getJoinPushdownStrategy(),
false))
.add(doubleProperty(
JOIN_PUSHDOWN_AUTOMATIC_MAX_JOIN_TO_TABLES_RATIO,
"If estimated join output size is greater than or equal to ratio * sum of table sizes, then join pushdown will not be performed",
joinPushdownConfig.getJoinPushdownAutomaticMaxJoinToTablesRatio(),
false))
.add(dataSizeProperty(
JOIN_PUSHDOWN_AUTOMATIC_MAX_TABLE_SIZE,
"Maximum table size to be considered for join pushdown",
joinPushdownConfig.getJoinPushdownAutomaticMaxTableSize().orElse(null),
false))
.build();
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

public static JoinPushdownStrategy getJoinPushdownStrategy(ConnectorSession session)
{
return session.getProperty(JOIN_PUSHDOWN_STRATEGY, JoinPushdownStrategy.class);
}

public static Optional<DataSize> getJoinPushdownAutomaticMaxTableSize(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(JOIN_PUSHDOWN_AUTOMATIC_MAX_TABLE_SIZE, DataSize.class));
}

public static double getJoinPushdownAutomaticJoinToTablesRatio(ConnectorSession session)
{
return session.getProperty(JOIN_PUSHDOWN_AUTOMATIC_MAX_JOIN_TO_TABLES_RATIO, Double.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import com.google.inject.Binder;
import io.airlift.configuration.AbstractConfigurationAwareModule;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider;

/**
* A helper module for implementing cost-aware Join pushdown. It remains
* {@link io.trino.plugin.jdbc.JdbcClient}'s responsibility to provide cost-aware pushdown logic.
*/
public class JdbcJoinPushdownSupportModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(JdbcJoinPushdownConfig.class);
bindSessionPropertiesProvider(binder, JdbcJoinPushdownSessionProperties.class);

configBinder(binder).bindConfigDefaults(JdbcMetadataConfig.class, config -> config.setJoinPushdownEnabled(true));
}
}
Loading