diff --git a/docs/src/main/sphinx/connector/jdbc-common-configurations.fragment b/docs/src/main/sphinx/connector/jdbc-common-configurations.fragment index 412b96a50c94..cce75a4b79fb 100644 --- a/docs/src/main/sphinx/connector/jdbc-common-configurations.fragment +++ b/docs/src/main/sphinx/connector/jdbc-common-configurations.fragment @@ -42,3 +42,19 @@ connector: Do not change this setting from the default. Non-default values may negatively impact performance. - ``1000`` + * - ``dynamic-filtering.enabled`` + - Push down dynamic filters into JDBC queries + - ``true`` + * - ``dynamic-filtering.wait-timeout`` + - Maximum duration for which Trino will wait for dynamic filters to be + collected from the build side of joins before starting a JDBC query. + Using a large timeout can potentially result in more detailed dynamic filters. + However, it can also increase latency for some queries. + - ``20s`` + * - ``domain-compaction-threshold`` + - Minimum size of query predicates above which Trino compacts the predicates. + Pushing down a large list of predicates to the data source can compromise + performance. For optimization in that situation, Trino can compact the large + predicates into a simpler range predicate. If necessary, adjust the threshold + to ensure a balance between performance and predicate pushdown. + - ``32`` diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSplitSource.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSplitSource.java new file mode 100644 index 000000000000..812979ed2bcc --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSplitSource.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.predicate.TupleDomain; + +import java.util.concurrent.CompletableFuture; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +/** + * Attaches dynamic filter to {@link JdbcSplit} after {@link JdbcDynamicFilteringSplitManager} + * has waited for the collection of dynamic filters. + * This allows JDBC based connectors to avoid waiting for dynamic filters again on the worker node + * in {@link JdbcRecordSetProvider}. The number of splits generated by JDBC based connectors are + * typically small, therefore attaching dynamic filter here does not add significant overhead. + * Waiting for dynamic filters in {@link JdbcDynamicFilteringSplitManager} is preferred over waiting + * for them on the worker node in {@link JdbcRecordSetProvider} to allow connectors to take advantage of + * dynamic filters during the splits generation phase. + */ +public class DynamicFilteringJdbcSplitSource + implements ConnectorSplitSource +{ + private final ConnectorSplitSource delegateSplitSource; + private final DynamicFilter dynamicFilter; + private final JdbcTableHandle tableHandle; + + DynamicFilteringJdbcSplitSource(ConnectorSplitSource delegateSplitSource, DynamicFilter dynamicFilter, JdbcTableHandle tableHandle) + { + this.delegateSplitSource = requireNonNull(delegateSplitSource, "delegateSplitSource is null"); + this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); + this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + } + + @Override + public CompletableFuture getNextBatch(int maxSize) + { + if (!isEligibleForDynamicFilter(tableHandle)) { + return delegateSplitSource.getNextBatch(maxSize); + } + return delegateSplitSource.getNextBatch(maxSize) + .thenApply(batch -> { + TupleDomain dynamicFilterPredicate = dynamicFilter.getCurrentPredicate() + .transformKeys(JdbcColumnHandle.class::cast); + return new ConnectorSplitBatch( + batch.getSplits().stream() + // attach dynamic filter constraint to JdbcSplit + .map(split -> { + JdbcSplit jdbcSplit = (JdbcSplit) split; + // If split was a subclass of JdbcSplit, there would be additional information + // that we would need to pass further on. + verify(jdbcSplit.getClass() == JdbcSplit.class, "Unexpected split type %s", jdbcSplit); + return jdbcSplit.withDynamicFilter(dynamicFilterPredicate); + }) + .collect(toImmutableList()), + batch.isNoMoreSplits()); + }); + } + + @Override + public void close() + { + delegateSplitSource.close(); + } + + @Override + public boolean isFinished() + { + return delegateSplitSource.isFinished(); + } + + public static boolean isEligibleForDynamicFilter(JdbcTableHandle tableHandle) + { + // don't pushdown predicate through limit as it could reduce performance + return tableHandle.getLimit().isEmpty(); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringStats.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringStats.java new file mode 100644 index 000000000000..6cbb387cd872 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringStats.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import io.airlift.stats.CounterStat; +import io.airlift.stats.TimeStat; +import io.airlift.units.Duration; +import io.trino.spi.connector.DynamicFilter; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class DynamicFilteringStats +{ + private final CounterStat totalDynamicFilters = new CounterStat(); + private final CounterStat completedDynamicFilters = new CounterStat(); + private final CounterStat awaitableDynamicFilters = new CounterStat(); + private final TimeStat dynamicFilterWaitingTime = new TimeStat(MILLISECONDS); + + @Managed + @Nested + public CounterStat getTotalDynamicFilters() + { + return totalDynamicFilters; + } + + @Managed + @Nested + public CounterStat getCompletedDynamicFilters() + { + return completedDynamicFilters; + } + + @Managed + @Nested + public CounterStat getAwaitableDynamicFilters() + { + return awaitableDynamicFilters; + } + + @Managed + @Nested + public TimeStat getDynamicFilterWaitingTime() + { + return dynamicFilterWaitingTime; + } + + public void processDynamicFilter(DynamicFilter dynamicFilter, Duration waitingTime) + { + totalDynamicFilters.update(1); + if (dynamicFilter.isComplete()) { + completedDynamicFilters.update(1); + } + if (dynamicFilter.isAwaitable()) { + awaitableDynamicFilters.update(1); + } + dynamicFilterWaitingTime.add(waitingTime); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForJdbcDynamicFiltering.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForJdbcDynamicFiltering.java new file mode 100644 index 000000000000..db0b2ceb424d --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForJdbcDynamicFiltering.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForJdbcDynamicFiltering +{ +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcColumnHandle.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcColumnHandle.java index 6de88fbf4d75..ddcc3a010e65 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcColumnHandle.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcColumnHandle.java @@ -16,19 +16,25 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; +import io.airlift.slice.SizeOf; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ColumnSchema; import io.trino.spi.type.Type; +import org.openjdk.jol.info.ClassLayout; import java.util.Objects; import java.util.Optional; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public final class JdbcColumnHandle implements ColumnHandle { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(JdbcColumnHandle.class).instanceSize(); + private final String columnName; private final JdbcTypeHandle jdbcTypeHandle; private final Type columnType; @@ -136,6 +142,16 @@ public String toString() jdbcTypeHandle.getJdbcTypeName().orElse(null)); } + public long getRetainedSizeInBytes() + { + // columnType is not accounted for as the instances are cached (by TypeRegistry) and shared + return INSTANCE_SIZE + + sizeOf(nullable) + + estimatedSizeOf(columnName) + + sizeOf(comment, SizeOf::estimatedSizeOf) + + jdbcTypeHandle.getRetainedSizeInBytes(); + } + public static Builder builder() { return new Builder(); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringConfig.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringConfig.java new file mode 100644 index 000000000000..7d1189437bb4 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; + +import javax.validation.constraints.NotNull; + +import static java.util.concurrent.TimeUnit.SECONDS; + +public class JdbcDynamicFilteringConfig +{ + private boolean dynamicFilteringEnabled = true; + // 20s allows DF collection from dimensional tables as well as + // some larger build side subqueries + private Duration dynamicFilteringWaitTimeout = new Duration(20, SECONDS); + + public boolean isDynamicFilteringEnabled() + { + return dynamicFilteringEnabled; + } + + @Config("dynamic-filtering.enabled") + @ConfigDescription("Wait for dynamic filters before starting JDBC query") + public JdbcDynamicFilteringConfig setDynamicFilteringEnabled(boolean dynamicFilteringEnabled) + { + this.dynamicFilteringEnabled = dynamicFilteringEnabled; + return this; + } + + @NotNull + public Duration getDynamicFilteringWaitTimeout() + { + return dynamicFilteringWaitTimeout; + } + + @Config("dynamic-filtering.wait-timeout") + @ConfigDescription("Duration to wait for completion of dynamic filters") + public JdbcDynamicFilteringConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout) + { + this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout; + return this; + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringSessionProperties.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringSessionProperties.java new file mode 100644 index 000000000000..649c1be5c876 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringSessionProperties.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; +import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.session.PropertyMetadata; + +import javax.inject.Inject; + +import java.util.List; + +import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; +import static io.trino.spi.session.PropertyMetadata.booleanProperty; + +public class JdbcDynamicFilteringSessionProperties + implements SessionPropertiesProvider +{ + public static final String DYNAMIC_FILTERING_ENABLED = "dynamic_filtering_enabled"; + public static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; + + private final List> sessionProperties; + + @Inject + public JdbcDynamicFilteringSessionProperties(JdbcDynamicFilteringConfig config) + { + sessionProperties = ImmutableList.of( + booleanProperty( + DYNAMIC_FILTERING_ENABLED, + "Wait for dynamic filters before starting JDBC query", + config.isDynamicFilteringEnabled(), + false), + durationProperty( + DYNAMIC_FILTERING_WAIT_TIMEOUT, + "Duration to wait for completion of dynamic filters", + config.getDynamicFilteringWaitTimeout(), + false)); + } + + @Override + public List> getSessionProperties() + { + return sessionProperties; + } + + public static boolean dynamicFilteringEnabled(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_ENABLED, Boolean.class); + } + + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringSplitManager.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringSplitManager.java new file mode 100644 index 000000000000..39f1d0a69208 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcDynamicFilteringSplitManager.java @@ -0,0 +1,180 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.DynamicFilter; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static io.airlift.units.Duration.succinctNanos; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.dynamicFilteringEnabled; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.getDynamicFilteringWaitTimeout; +import static io.trino.spi.connector.ConnectorSplitSource.ConnectorSplitBatch; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Implements waiting for collection of dynamic filters before generating splits from {@link ConnectorSplitManager}. + * This allows JDBC based connectors to take advantage of dynamic filters during splits generation phase. + * Implementing this as a wrapper over {@link ConnectorSplitManager} allows this class to be used by JDBC connectors + * which don't rely on {@link JdbcSplitManager} for splits generation. + */ +public class JdbcDynamicFilteringSplitManager + implements ConnectorSplitManager +{ + private static final Logger log = Logger.get(JdbcDynamicFilteringSplitManager.class); + private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false); + + private final ConnectorSplitManager delegateSplitManager; + private final DynamicFilteringStats stats; + + @Inject + public JdbcDynamicFilteringSplitManager( + @ForJdbcDynamicFiltering ConnectorSplitManager delegateSplitManager, + DynamicFilteringStats stats) + { + this.delegateSplitManager = requireNonNull(delegateSplitManager, "delegateSplitManager is null"); + this.stats = requireNonNull(stats, "stats is null"); + } + + @Override + public ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableHandle table, + DynamicFilter dynamicFilter, + Constraint constraint) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + // pushing DF through limit could reduce query performance + boolean hasLimit = tableHandle.getLimit().isPresent(); + if (dynamicFilter == DynamicFilter.EMPTY || hasLimit || !dynamicFilteringEnabled(session)) { + return delegateSplitManager.getSplits(transaction, session, table, dynamicFilter, constraint); + } + + return new DynamicFilteringSplitSource(transaction, session, (JdbcTableHandle) table, dynamicFilter, constraint); + } + + private class DynamicFilteringSplitSource + implements ConnectorSplitSource + { + private final ConnectorTransactionHandle transaction; + private final ConnectorSession session; + private final JdbcTableHandle table; + private final DynamicFilter dynamicFilter; + private final Constraint constraint; + private final long dynamicFilteringTimeoutNanos; + private final long startNanos; + + @GuardedBy("this") + private Optional delegateSplitSource = Optional.empty(); + + DynamicFilteringSplitSource( + ConnectorTransactionHandle transaction, + ConnectorSession session, + JdbcTableHandle table, + DynamicFilter dynamicFilter, + Constraint constraint) + { + this.transaction = requireNonNull(transaction, "transaction is null"); + this.session = requireNonNull(session, "session is null"); + this.table = requireNonNull(table, "table is null"); + this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); + this.constraint = requireNonNull(constraint, "constraint is null"); + this.dynamicFilteringTimeoutNanos = (long) getDynamicFilteringWaitTimeout(session).getValue(NANOSECONDS); + this.startNanos = System.nanoTime(); + } + + @Override + public CompletableFuture getNextBatch(int maxSize) + { + long remainingTimeoutNanos = getRemainingTimeoutNanos(); + if (remainingTimeoutNanos > 0 && dynamicFilter.isAwaitable()) { + log.debug( + "Waiting for dynamic filter (query: %s, table: %s, remaining timeout: %s)", + session.getQueryId(), + table, + succinctNanos(remainingTimeoutNanos)); + // wait for dynamic filter and yield + return dynamicFilter.isBlocked() + .thenApply(ignored -> EMPTY_BATCH) + .completeOnTimeout(EMPTY_BATCH, remainingTimeoutNanos, NANOSECONDS); + } + + Duration waitingTime = succinctNanos(System.nanoTime() - startNanos); + log.debug("Enumerating splits (query %s, table: %s, waiting time: %s, awaitable: %s, completed: %s)", + session.getQueryId(), + table, + waitingTime, + dynamicFilter.isAwaitable(), + dynamicFilter.isComplete()); + stats.processDynamicFilter(dynamicFilter, waitingTime); + return getDelegateSplitSource().getNextBatch(maxSize); + } + + @Override + public void close() + { + getOptionalDelegateSplitSource().ifPresent(ConnectorSplitSource::close); + } + + @Override + public boolean isFinished() + { + if (getRemainingTimeoutNanos() > 0 && dynamicFilter.isAwaitable()) { + return false; + } + + return getDelegateSplitSource().isFinished(); + } + + private long getRemainingTimeoutNanos() + { + return dynamicFilteringTimeoutNanos - (System.nanoTime() - startNanos); + } + + private synchronized ConnectorSplitSource getDelegateSplitSource() + { + if (delegateSplitSource.isPresent()) { + return delegateSplitSource.get(); + } + + delegateSplitSource = Optional.of(delegateSplitManager.getSplits( + transaction, + session, + table.intersectedWithConstraint(dynamicFilter.getCurrentPredicate()), + dynamicFilter, + constraint)); + return delegateSplitSource.get(); + } + + private synchronized Optional getOptionalDelegateSplitSource() + { + return delegateSplitSource; + } + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 55af51537629..2da63f2ef1dd 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -39,6 +39,7 @@ import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static java.util.Objects.requireNonNull; +import static org.weakref.jmx.guice.ExportBinder.newExporter; public class JdbcModule extends AbstractConfigurationAwareModule @@ -64,7 +65,8 @@ public void setup(Binder binder) tablePropertiesProviderBinder(binder); newOptionalBinder(binder, JdbcMetadataFactory.class).setDefault().to(DefaultJdbcMetadataFactory.class).in(Scopes.SINGLETON); - newOptionalBinder(binder, ConnectorSplitManager.class).setDefault().to(JdbcSplitManager.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, Key.get(ConnectorSplitManager.class, ForJdbcDynamicFiltering.class)).setDefault().to(JdbcSplitManager.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, ConnectorSplitManager.class).setDefault().to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ConnectorRecordSetProvider.class).setDefault().to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ConnectorPageSinkProvider.class).setDefault().to(JdbcPageSinkProvider.class).in(Scopes.SINGLETON); @@ -73,11 +75,17 @@ public void setup(Binder binder) configBinder(binder).bindConfig(JdbcMetadataConfig.class); configBinder(binder).bindConfig(JdbcWriteConfig.class); configBinder(binder).bindConfig(BaseJdbcConfig.class); + configBinder(binder).bindConfig(JdbcDynamicFilteringConfig.class); configBinder(binder).bindConfig(TypeHandlingJdbcConfig.class); bindSessionPropertiesProvider(binder, TypeHandlingJdbcSessionProperties.class); bindSessionPropertiesProvider(binder, JdbcMetadataSessionProperties.class); bindSessionPropertiesProvider(binder, JdbcWriteSessionProperties.class); + bindSessionPropertiesProvider(binder, JdbcDynamicFilteringSessionProperties.class); + + binder.bind(DynamicFilteringStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(DynamicFilteringStats.class) + .as(generator -> generator.generatedNameOf(DynamicFilteringStats.class, catalogName)); binder.bind(CachingJdbcClient.class).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).to(Key.get(CachingJdbcClient.class)).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java index 45002e05c4cc..6d87a2bd2272 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java @@ -62,6 +62,12 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS handles.add((JdbcColumnHandle) handle); } - return new JdbcRecordSet(jdbcClient, executor, session, jdbcSplit, jdbcTable, handles.build()); + return new JdbcRecordSet( + jdbcClient, + executor, + session, + jdbcSplit, + jdbcTable.intersectedWithConstraint(jdbcSplit.getDynamicFilter().transformKeys(ColumnHandle.class::cast)), + handles.build()); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java index ac4ca91f49b6..28f25535c50b 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java @@ -19,6 +19,7 @@ import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.predicate.TupleDomain; import org.openjdk.jol.info.ClassLayout; import java.util.List; @@ -33,12 +34,25 @@ public class JdbcSplit private static final int INSTANCE_SIZE = ClassLayout.parseClass(JdbcSplit.class).instanceSize(); private final Optional additionalPredicate; + private final TupleDomain dynamicFilter; + + public JdbcSplit(Optional additionalPredicate) + { + this(additionalPredicate, TupleDomain.all()); + } @JsonCreator public JdbcSplit( - @JsonProperty("additionalPredicate") Optional additionalPredicate) + @JsonProperty("additionalPredicate") Optional additionalPredicate, + @JsonProperty("dynamicFilter") TupleDomain dynamicFilter) { this.additionalPredicate = requireNonNull(additionalPredicate, "additionalPredicate is null"); + this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); + } + + public JdbcSplit withDynamicFilter(TupleDomain dynamicFilter) + { + return new JdbcSplit(additionalPredicate, dynamicFilter); } @JsonProperty @@ -47,6 +61,12 @@ public Optional getAdditionalPredicate() return additionalPredicate; } + @JsonProperty + public TupleDomain getDynamicFilter() + { + return dynamicFilter; + } + @Override public boolean isRemotelyAccessible() { @@ -69,6 +89,7 @@ public Object getInfo() public long getRetainedSizeInBytes() { return INSTANCE_SIZE - + sizeOf(additionalPredicate, SizeOf::estimatedSizeOf); + + sizeOf(additionalPredicate, SizeOf::estimatedSizeOf) + + dynamicFilter.getRetainedSizeInBytes(JdbcColumnHandle::getRetainedSizeInBytes); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplitManager.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplitManager.java index 0b6864a5d420..2eea16588d8d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplitManager.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplitManager.java @@ -23,6 +23,7 @@ import javax.inject.Inject; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.dynamicFilteringEnabled; import static java.util.Objects.requireNonNull; public class JdbcSplitManager @@ -44,6 +45,11 @@ public ConnectorSplitSource getSplits( DynamicFilter dynamicFilter, Constraint constraint) { - return jdbcClient.getSplits(session, (JdbcTableHandle) table); + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + ConnectorSplitSource jdbcSplitSource = jdbcClient.getSplits(session, tableHandle); + if (dynamicFilteringEnabled(session)) { + return new DynamicFilteringJdbcSplitSource(jdbcSplitSource, dynamicFilter, tableHandle); + } + return jdbcSplitSource; } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java index 48e87520ca6e..17b82f410fc6 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java @@ -104,6 +104,19 @@ public JdbcTableHandle( this.nextSyntheticColumnId = nextSyntheticColumnId; } + public JdbcTableHandle intersectedWithConstraint(TupleDomain newConstraint) + { + return new JdbcTableHandle( + relationHandle, + constraint.intersect(newConstraint), + constraintExpressions, + sortOrder, + limit, + columns, + otherReferencedTables, + nextSyntheticColumnId); + } + /** * @deprecated Use {@code asPlainTable().getSchemaTableName()} instead, but see those methods for more information, as this is not a drop-in replacement. */ diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTypeHandle.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTypeHandle.java index bea3391ab1ed..8bd6bb283a07 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTypeHandle.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTypeHandle.java @@ -16,15 +16,21 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.SizeOf; +import org.openjdk.jol.info.ClassLayout; import java.util.Objects; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public final class JdbcTypeHandle { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(JdbcTypeHandle.class).instanceSize(); + private final int jdbcType; private final Optional jdbcTypeName; private final Optional columnSize; @@ -150,4 +156,15 @@ public String toString() .add("arrayDimensions", arrayDimensions.orElse(null)) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(jdbcType) + + sizeOf(jdbcTypeName, SizeOf::estimatedSizeOf) + + sizeOf(columnSize, SizeOf::sizeOf) + + sizeOf(decimalDigits, SizeOf::sizeOf) + + sizeOf(arrayDimensions, SizeOf::sizeOf) + + sizeOf(caseSensitivity, ignored -> SIZE_OF_INT); + } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java index e0ae3144af99..fb50609f33c2 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java @@ -34,6 +34,7 @@ import io.trino.sql.query.QueryAssertions.QueryAssert; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.SqlExecutor; import io.trino.testing.sql.TestTable; @@ -58,12 +59,19 @@ import static com.google.common.collect.MoreCollectors.toOptional; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.SystemSessionProperties.USE_MARK_DISTINCT; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.DYNAMIC_FILTERING_ENABLED; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.DYNAMIC_FILTERING_WAIT_TIMEOUT; +import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.JOIN_PUSHDOWN_ENABLED; import static io.trino.plugin.jdbc.RemoteDatabaseEvent.Status.CANCELLED; import static io.trino.plugin.jdbc.RemoteDatabaseEvent.Status.RUNNING; +import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType; +import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST; +import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED; import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree; import static io.trino.sql.planner.assertions.PlanMatchPattern.exchange; import static io.trino.sql.planner.assertions.PlanMatchPattern.node; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN_CORRELATION; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN_COUNT_DISTINCT; @@ -73,6 +81,8 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN_VARIANCE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CANCELLATION; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE_WITH_DATA; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DYNAMIC_FILTER_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_INSERT; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_DISTINCT_FROM; @@ -81,12 +91,14 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_INEQUALITY; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_LIMIT_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_LEVEL_DELETE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN_WITH_VARCHAR; import static io.trino.testing.assertions.Assert.assertEventually; +import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.concurrent.Executors.newCachedThreadPool; @@ -114,7 +126,11 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) switch (connectorBehavior) { case SUPPORTS_DELETE: case SUPPORTS_TRUNCATE: - return true; + + case SUPPORTS_DYNAMIC_FILTER_PUSHDOWN: + // Dynamic filters can be pushed down only if predicate push down is supported. + // It is possible for a connector to have predicate push down support but not push down dynamic filters. + return super.hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN); case SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN: // TODO support pushdown of complex expressions in predicates @@ -1679,4 +1695,202 @@ protected TestTable simpleTable() { return new TestTable(onRemoteDatabase(), format("%s.simple_table", getSession().getSchema().orElseThrow()), "(col BIGINT)", ImmutableList.of("1", "2")); } + + @DataProvider + public Object[][] fixedJoinDistributionTypes() + { + return new Object[][] {{BROADCAST}, {PARTITIONED}}; + } + + @Test(timeOut = 60_000, dataProvider = "fixedJoinDistributionTypes") + public void testDynamicFiltering(JoinDistributionType joinDistributionType) + { + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + assertDynamicFiltering( + "SELECT * FROM orders a JOIN orders b ON a.orderkey = b.orderkey AND b.totalprice < 1000", + joinDistributionType); + } + + @Test(timeOut = 60_000) + public void testDynamicFilteringWithAggregationGroupingColumn() + { + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + assertDynamicFiltering( + "SELECT * FROM (SELECT orderkey, count(*) FROM orders GROUP BY orderkey) a JOIN orders b " + + "ON a.orderkey = b.orderkey AND b.totalprice < 1000", + PARTITIONED); + } + + @Test(timeOut = 60_000) + public void testDynamicFilteringWithAggregationAggregateColumn() + { + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + MaterializedResultWithQueryId resultWithQueryId = getDistributedQueryRunner() + .executeWithQueryId(getSession(), "SELECT custkey, count(*) count FROM orders GROUP BY custkey"); + // Detecting whether above aggregation is fully pushed down explicitly as there are cases where SUPPORTS_AGGREGATION_PUSHDOWN + // is false as not all aggregations are pushed down but the above aggregation is. + boolean isAggregationPushedDown = getPhysicalInputPositions(resultWithQueryId.getQueryId()) == 1000; + assertDynamicFiltering( + "SELECT * FROM (SELECT custkey, count(*) count FROM orders GROUP BY custkey) a JOIN orders b " + + "ON a.count = b.custkey AND b.totalprice < 1000", + PARTITIONED, + isAggregationPushedDown); + } + + @Test(timeOut = 60_000) + public void testDynamicFilteringWithAggregationGroupingSet() + { + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + // DF pushdown is not supported for grouping column that is not part of every grouping set + assertNoDynamicFiltering( + "SELECT * FROM (SELECT orderkey, count(*) FROM orders GROUP BY GROUPING SETS ((orderkey), ())) a JOIN orders b " + + "ON a.orderkey = b.orderkey AND b.totalprice < 1000"); + } + + @Test(timeOut = 60_000) + public void testDynamicFilteringWithLimit() + { + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + // DF pushdown is not supported for limit queries + assertNoDynamicFiltering( + "SELECT * FROM (SELECT orderkey FROM orders LIMIT 10000000) a JOIN orders b " + + "ON a.orderkey = b.orderkey AND b.totalprice < 1000"); + } + + @Test(timeOut = 60_000) + public void testDynamicFilteringDomainCompactionThreshold() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA)); + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + String tableName = "orderkeys_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (orderkey) AS VALUES 30000, 60000", 2); + @Language("SQL") String query = "SELECT * FROM orders a JOIN " + tableName + " b ON a.orderkey = b.orderkey"; + + MaterializedResultWithQueryId dynamicFilteringResult = getDistributedQueryRunner().executeWithQueryId( + dynamicFiltering(true), + query); + long filteredInputPositions = getPhysicalInputPositions(dynamicFilteringResult.getQueryId()); + + MaterializedResultWithQueryId dynamicFilteringWithCompactionThresholdResult = getDistributedQueryRunner().executeWithQueryId( + dynamicFilteringWithCompactionThreshold(1), + query); + long smallCompactionInputPositions = getPhysicalInputPositions(dynamicFilteringWithCompactionThresholdResult.getQueryId()); + assertEqualsIgnoreOrder( + dynamicFilteringResult.getResult(), + dynamicFilteringWithCompactionThresholdResult.getResult(), + "For query: \n " + query); + + MaterializedResultWithQueryId noDynamicFilteringResult = getDistributedQueryRunner().executeWithQueryId( + dynamicFiltering(false), + query); + long unfilteredInputPositions = getPhysicalInputPositions(noDynamicFilteringResult.getQueryId()); + assertEqualsIgnoreOrder( + dynamicFilteringWithCompactionThresholdResult.getResult(), + noDynamicFilteringResult.getResult(), + "For query: \n " + query); + + assertThat(unfilteredInputPositions) + .as("unfiltered input positions") + .isGreaterThan(smallCompactionInputPositions); + + assertThat(smallCompactionInputPositions) + .as("small compaction input positions") + .isGreaterThan(filteredInputPositions); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test(timeOut = 60_000) + public void testDynamicFilteringCaseInsensitiveDomainCompaction() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA)); + skipTestUnless(hasBehavior(SUPPORTS_DYNAMIC_FILTER_PUSHDOWN)); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_caseinsensitive", + "(id varchar(1))", + ImmutableList.of("'0'", "'a'", "'B'"))) { + assertThat(computeActual( + // Force conversion to a range predicate which would exclude the row corresponding to 'B' + // if the range predicate were pushed into a case insensitive connector + dynamicFilteringWithCompactionThreshold(1), + "SELECT COUNT(*) FROM " + table.getName() + " a JOIN " + table.getName() + " b ON a.id = b.id") + .getOnlyValue()) + .isEqualTo(3L); + } + } + + protected void assertDynamicFiltering(@Language("SQL") String sql, JoinDistributionType joinDistributionType) + { + assertDynamicFiltering(sql, joinDistributionType, true); + } + + private void assertNoDynamicFiltering(@Language("SQL") String sql) + { + assertDynamicFiltering(sql, PARTITIONED, false); + } + + private void assertDynamicFiltering(@Language("SQL") String sql, JoinDistributionType joinDistributionType, boolean expectDynamicFiltering) + { + MaterializedResultWithQueryId dynamicFilteringResultWithQueryId = getDistributedQueryRunner().executeWithQueryId( + dynamicFiltering(joinDistributionType, true), + sql); + + MaterializedResultWithQueryId noDynamicFilteringResultWithQueryId = getDistributedQueryRunner().executeWithQueryId( + dynamicFiltering(joinDistributionType, false), + sql); + + // ensure results are the same + assertEqualsIgnoreOrder( + dynamicFilteringResultWithQueryId.getResult(), + noDynamicFilteringResultWithQueryId.getResult(), + "For query: \n " + sql); + + long dynamicFilteringInputPositions = getPhysicalInputPositions(dynamicFilteringResultWithQueryId.getQueryId()); + long noDynamicFilteringInputPositions = getPhysicalInputPositions(noDynamicFilteringResultWithQueryId.getQueryId()); + + if (expectDynamicFiltering) { + // Physical input positions is smaller in dynamic filtering case than in no dynamic filtering case + assertThat(dynamicFilteringInputPositions) + .as("filtered input positions") + .isLessThan(noDynamicFilteringInputPositions); + } + else { + assertThat(dynamicFilteringInputPositions) + .as("filtered input positions") + .isEqualTo(noDynamicFilteringInputPositions); + } + } + + private Session dynamicFiltering(boolean enabled) + { + return dynamicFiltering(PARTITIONED, enabled); + } + + private Session dynamicFilteringWithCompactionThreshold(int compactionThreshold) + { + return Session.builder(dynamicFiltering(true)) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), DOMAIN_COMPACTION_THRESHOLD, Integer.toString(compactionThreshold)) + .build(); + } + + private Session dynamicFiltering(JoinDistributionType joinDistributionType, boolean enabled) + { + String catalogName = getSession().getCatalog().orElseThrow(); + return Session.builder(noJoinReordering(joinDistributionType)) + .setCatalogSessionProperty(catalogName, DYNAMIC_FILTERING_ENABLED, Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, DYNAMIC_FILTERING_WAIT_TIMEOUT, "1h") + // test assertions assume join pushdown is not happening so we disable it here + .setCatalogSessionProperty(catalogName, JOIN_PUSHDOWN_ENABLED, "false") + .build(); + } + + private long getPhysicalInputPositions(QueryId queryId) + { + return getDistributedQueryRunner().getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getPhysicalInputPositions(); + } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFilteringConfig.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFilteringConfig.java new file mode 100644 index 000000000000..db0cc50f995e --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFilteringConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class TestJdbcDynamicFilteringConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(JdbcDynamicFilteringConfig.class) + .setDynamicFilteringEnabled(true) + .setDynamicFilteringWaitTimeout(new Duration(20, SECONDS))); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("dynamic-filtering.enabled", "false") + .put("dynamic-filtering.wait-timeout", "10m") + .buildOrThrow(); + + JdbcDynamicFilteringConfig expected = new JdbcDynamicFilteringConfig() + .setDynamicFilteringEnabled(false) + .setDynamicFilteringWaitTimeout(new Duration(10, MINUTES)); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFilteringSplitManager.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFilteringSplitManager.java new file mode 100644 index 000000000000..4615bbd02894 --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFilteringSplitManager.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; +import io.trino.testing.TestingConnectorSession; +import io.trino.testing.TestingSplitManager; +import io.trino.testing.TestingTransactionHandle; +import org.testng.annotations.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static io.airlift.concurrent.MoreFutures.unmodifiableFuture; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.DYNAMIC_FILTERING_ENABLED; +import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.DYNAMIC_FILTERING_WAIT_TIMEOUT; +import static io.trino.spi.connector.Constraint.alwaysTrue; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class TestJdbcDynamicFilteringSplitManager +{ + private static final ConnectorTransactionHandle TRANSACTION_HANDLE = TestingTransactionHandle.create(); + private static final ConnectorSession SESSION = TestingConnectorSession.builder() + .setPropertyMetadata(new JdbcDynamicFilteringSessionProperties(new JdbcDynamicFilteringConfig()).getSessionProperties()) + .setPropertyValues(ImmutableMap.of( + DYNAMIC_FILTERING_WAIT_TIMEOUT, "3s", + DYNAMIC_FILTERING_ENABLED, true)) + .build(); + private static final JdbcTableHandle TABLE_HANDLE = new JdbcTableHandle( + new SchemaTableName("schema", "table"), + new RemoteTableName(Optional.empty(), Optional.empty(), "table"), + Optional.empty()); + private static final DynamicFilter BLOCKED_DYNAMIC_FILTER = new DynamicFilter() + { + @Override + public Set getColumnsCovered() + { + return ImmutableSet.of(); + } + + @Override + public CompletableFuture isBlocked() + { + return unmodifiableFuture(new CompletableFuture<>()); + } + + @Override + public boolean isComplete() + { + return false; + } + + @Override + public boolean isAwaitable() + { + return true; + } + + @Override + public TupleDomain getCurrentPredicate() + { + return TupleDomain.all(); + } + }; + + @Test + public void testBlockingTimeout() + throws Exception + { + JdbcDynamicFilteringSplitManager manager = new JdbcDynamicFilteringSplitManager( + new TestingSplitManager(ImmutableList.of()), + new DynamicFilteringStats()); + ConnectorSplitSource splitSource = manager.getSplits( + TRANSACTION_HANDLE, + SESSION, + TABLE_HANDLE, + BLOCKED_DYNAMIC_FILTER, + alwaysTrue()); + + // verify that getNextBatch() future completes after a timeout + CompletableFuture future = splitSource.getNextBatch(100); + assertFalse(future.isDone()); + future.get(10, SECONDS); + assertTrue(splitSource.isFinished()); + splitSource.close(); + } +} diff --git a/plugin/trino-phoenix5/pom.xml b/plugin/trino-phoenix5/pom.xml index f4f723faacf2..96d3c90d817a 100644 --- a/plugin/trino-phoenix5/pom.xml +++ b/plugin/trino-phoenix5/pom.xml @@ -107,6 +107,11 @@ modernizer-maven-annotations + + org.weakref + jmxutils + + io.airlift diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 1ababe2cf712..d9119b2e52e2 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -29,11 +29,16 @@ import io.trino.plugin.jdbc.DecimalModule; import io.trino.plugin.jdbc.DefaultQueryBuilder; import io.trino.plugin.jdbc.DriverConnectionFactory; +import io.trino.plugin.jdbc.DynamicFilteringStats; import io.trino.plugin.jdbc.ForBaseJdbc; +import io.trino.plugin.jdbc.ForJdbcDynamicFiltering; import io.trino.plugin.jdbc.ForLazyConnectionFactory; import io.trino.plugin.jdbc.ForRecordCursor; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.JdbcDiagnosticModule; +import io.trino.plugin.jdbc.JdbcDynamicFilteringConfig; +import io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties; +import io.trino.plugin.jdbc.JdbcDynamicFilteringSplitManager; import io.trino.plugin.jdbc.JdbcMetadataConfig; import io.trino.plugin.jdbc.JdbcMetadataSessionProperties; import io.trino.plugin.jdbc.JdbcPageSinkProvider; @@ -73,14 +78,24 @@ import static io.trino.plugin.jdbc.JdbcModule.bindTablePropertiesProvider; import static io.trino.plugin.phoenix5.ConfigurationInstantiator.newEmptyConfiguration; import static io.trino.plugin.phoenix5.PhoenixErrorCode.PHOENIX_CONFIG_ERROR; +import static java.util.Objects.requireNonNull; +import static org.weakref.jmx.guice.ExportBinder.newExporter; public class PhoenixClientModule extends AbstractConfigurationAwareModule { + private final String catalogName; + + public PhoenixClientModule(String catalogName) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + } + @Override protected void setup(Binder binder) { - binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); + binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); + binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorRecordSetProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorRecordSetProvider.class).to(ClassLoaderSafeConnectorRecordSetProvider.class).in(Scopes.SINGLETON); @@ -94,9 +109,15 @@ protected void setup(Binder binder) bindSessionPropertiesProvider(binder, JdbcMetadataSessionProperties.class); bindSessionPropertiesProvider(binder, JdbcWriteSessionProperties.class); bindSessionPropertiesProvider(binder, PhoenixSessionProperties.class); + bindSessionPropertiesProvider(binder, JdbcDynamicFilteringSessionProperties.class); + + binder.bind(DynamicFilteringStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(DynamicFilteringStats.class) + .as(generator -> generator.generatedNameOf(DynamicFilteringStats.class, catalogName)); configBinder(binder).bindConfig(JdbcMetadataConfig.class); configBinder(binder).bindConfig(JdbcWriteConfig.class); + configBinder(binder).bindConfig(JdbcDynamicFilteringConfig.class); binder.bind(PhoenixClient.class).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(Key.get(PhoenixClient.class)).in(Scopes.SINGLETON); diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixConnectorFactory.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixConnectorFactory.java index f3ef88e4b0c3..73ac67812d56 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixConnectorFactory.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixConnectorFactory.java @@ -53,7 +53,7 @@ public Connector create(String catalogName, Map requiredConfig, try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { Bootstrap app = new Bootstrap( new JsonModule(), - new PhoenixClientModule(), + new PhoenixClientModule(catalogName), binder -> { binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); binder.bind(ClassLoader.class).toInstance(PhoenixConnectorFactory.class.getClassLoader()); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java new file mode 100644 index 000000000000..3d357d00f3b3 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.mysql; + +import io.trino.tempto.AfterTestWithContext; +import io.trino.tempto.BeforeTestWithContext; +import io.trino.tempto.ProductTest; +import org.testng.annotations.Test; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tests.product.TestGroups.MYSQL; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.utils.QueryExecutors.onMySql; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; + +public class TestJdbcDynamicFilteringJmx + extends ProductTest +{ + private static final String TABLE_NAME = "test.nation_tmp"; + + @BeforeTestWithContext + @AfterTestWithContext + public void dropTestTable() + { + onMySql().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME)); + } + + @Test(groups = {MYSQL, PROFILE_SPECIFIC_TESTS}) + public void testDynamicFilteringStats() + { + assertThat(onTrino().executeQuery(format("CREATE TABLE mysql.%s AS SELECT * FROM tpch.tiny.nation", TABLE_NAME))) + .containsOnly(row(25)); + + onTrino().executeQuery("SET SESSION mysql.dynamic_filtering_wait_timeout = '1h'"); + onTrino().executeQuery("SET SESSION join_reordering_strategy = 'NONE'"); + onTrino().executeQuery("SET SESSION join_distribution_type = 'BROADCAST'"); + assertThat(onTrino().executeQuery(format("SELECT COUNT(*) FROM mysql.%s a JOIN tpch.tiny.nation b ON a.nationkey = b.nationkey AND b.name = 'INDIA'", TABLE_NAME))) + .containsOnly(row(1)); + + assertThat(onTrino().executeQuery("SELECT \"completeddynamicfilters.totalcount\" FROM jmx.current.\"io.trino.plugin.jdbc:name=mysql,type=dynamicfilteringstats\"")) + .containsOnly(row(1)); + assertThat(onTrino().executeQuery("SELECT \"totaldynamicfilters.totalcount\" FROM jmx.current.\"io.trino.plugin.jdbc:name=mysql,type=dynamicfilteringstats\"")) + .containsOnly(row(1)); + } +} diff --git a/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java b/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java index 5e21418142e5..0684c84cc375 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/TestingConnectorBehavior.java @@ -26,6 +26,8 @@ public enum TestingConnectorBehavior SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN(SUPPORTS_PREDICATE_PUSHDOWN), SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE(SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN), + SUPPORTS_DYNAMIC_FILTER_PUSHDOWN(false), + SUPPORTS_LIMIT_PUSHDOWN, SUPPORTS_TOPN_PUSHDOWN,