From 811ac4ad51d9fb219b962c38d99f7140936f55bc Mon Sep 17 00:00:00 2001 From: Liu Yang Date: Wed, 8 Jun 2022 15:17:45 +0800 Subject: [PATCH 1/2] enable dynamic filtering in JDBC connector --- .../trino/split/RecordPageSourceProvider.java | 2 +- .../connector/ConnectorRecordSetProvider.java | 13 + ...sLoaderSafeConnectorRecordSetProvider.java | 7 + .../jdbc/DynamicFilteringJdbcConfig.java | 57 +++++ ...DynamicFilteringJdbcSessionProperties.java | 69 +++++ .../java/io/trino/plugin/jdbc/JdbcModule.java | 2 + .../plugin/jdbc/JdbcRecordSetProvider.java | 29 +++ .../io/trino/plugin/jdbc/JdbcTableHandle.java | 10 + .../plugin/jdbc/TestJdbcDynamicFiltering.java | 242 ++++++++++++++++++ .../plugin/phoenix/PhoenixClientModule.java | 5 + .../plugin/phoenix5/PhoenixClientModule.java | 5 + 11 files changed, 440 insertions(+), 1 deletion(-) create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcConfig.java create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSessionProperties.java create mode 100644 plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java diff --git a/core/trino-main/src/main/java/io/trino/split/RecordPageSourceProvider.java b/core/trino-main/src/main/java/io/trino/split/RecordPageSourceProvider.java index 1a905b23db5e..a874fbc052a9 100644 --- a/core/trino-main/src/main/java/io/trino/split/RecordPageSourceProvider.java +++ b/core/trino-main/src/main/java/io/trino/split/RecordPageSourceProvider.java @@ -47,6 +47,6 @@ public ConnectorPageSource createPageSource( List columns, DynamicFilter dynamicFilter) { - return new RecordPageSource(recordSetProvider.getRecordSet(transaction, session, split, table, columns)); + return new RecordPageSource(recordSetProvider.getRecordSet(transaction, session, split, table, columns, dynamicFilter)); } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorRecordSetProvider.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorRecordSetProvider.java index 73d3980f6641..1c0111659367 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorRecordSetProvider.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorRecordSetProvider.java @@ -17,6 +17,19 @@ public interface ConnectorRecordSetProvider { + default RecordSet getRecordSet( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorSplit split, + ConnectorTableHandle table, + List columns, + DynamicFilter dynamicFilter) + { + // By default, ignore dynamic filter (as it is an optimization and doesn't affect correctness). + return getRecordSet(transaction, session, split, table, columns); + } + + @Deprecated default RecordSet getRecordSet( ConnectorTransactionHandle transaction, ConnectorSession session, diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java index da66239f7d30..8a2fd220154a 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java @@ -21,6 +21,7 @@ import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.RecordSet; import javax.inject.Inject; @@ -42,6 +43,12 @@ public ClassLoaderSafeConnectorRecordSetProvider(@ForClassLoaderSafe ConnectorRe this.classLoader = requireNonNull(classLoader, "classLoader is null"); } + @Override + public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns, DynamicFilter dynamicFilter) + { + return getRecordSet(transaction, session, split, table, columns); + } + @Override public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcConfig.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcConfig.java new file mode 100644 index 000000000000..2cccd3913f41 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.jdbc; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; + +import javax.validation.constraints.NotNull; + +import java.util.concurrent.TimeUnit; + +public class DynamicFilteringJdbcConfig +{ + private boolean enableDynamicFiltering; + private Duration dynamicFilteringWaitTimeout = new Duration(0, TimeUnit.MINUTES); + + public boolean isEnableDynamicFiltering() + { + return enableDynamicFiltering; + } + + @Config("dynamic-filtering.enabled") + public DynamicFilteringJdbcConfig setEnableDynamicFiltering(boolean enableDynamicFiltering) + { + this.enableDynamicFiltering = enableDynamicFiltering; + return this; + } + + @NotNull + @MinDuration("0ms") + public Duration getDynamicFilteringWaitTimeout() + { + return dynamicFilteringWaitTimeout; + } + + @Config("dynamic-filtering.wait-timeout") + @ConfigDescription("Duration to wait for completion of dynamic filters") + public DynamicFilteringJdbcConfig setDynamicFilteringWaitTimeout(Duration timeout) + { + this.dynamicFilteringWaitTimeout = timeout; + return this; + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSessionProperties.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSessionProperties.java new file mode 100644 index 000000000000..4d689ebc021d --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSessionProperties.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; +import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.session.PropertyMetadata; + +import javax.inject.Inject; + +import java.util.List; + +import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; +import static io.trino.spi.session.PropertyMetadata.booleanProperty; + +public class DynamicFilteringJdbcSessionProperties + implements SessionPropertiesProvider +{ + private static final String DYNAMIC_FILTERING_ENABLED = "dynamic_filtering_enabled"; + private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; + + private final List> properties; + + @Inject + public DynamicFilteringJdbcSessionProperties(DynamicFilteringJdbcConfig config) + { + properties = ImmutableList.of( + booleanProperty( + DYNAMIC_FILTERING_ENABLED, + "If dynamic filtering is enabled", + config.isEnableDynamicFiltering(), + false), + durationProperty( + DYNAMIC_FILTERING_WAIT_TIMEOUT, + "Duration to wait for completion of dynamic filters", + config.getDynamicFilteringWaitTimeout(), + false)); + } + + public static boolean isEnableDynamicFiltering(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_ENABLED, Boolean.class); + } + + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); + } + + @Override + public List> getSessionProperties() + { + return properties; + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 55af51537629..722885305262 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -75,9 +75,11 @@ public void setup(Binder binder) configBinder(binder).bindConfig(BaseJdbcConfig.class); configBinder(binder).bindConfig(TypeHandlingJdbcConfig.class); + configBinder(binder).bindConfig(DynamicFilteringJdbcConfig.class); bindSessionPropertiesProvider(binder, TypeHandlingJdbcSessionProperties.class); bindSessionPropertiesProvider(binder, JdbcMetadataSessionProperties.class); bindSessionPropertiesProvider(binder, JdbcWriteSessionProperties.class); + bindSessionPropertiesProvider(binder, DynamicFilteringJdbcSessionProperties.class); binder.bind(CachingJdbcClient.class).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).to(Key.get(CachingJdbcClient.class)).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java index 45002e05c4cc..2233368f08da 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java @@ -21,6 +21,7 @@ import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.RecordSet; import javax.inject.Inject; @@ -29,7 +30,10 @@ import java.util.concurrent.ExecutorService; import static com.google.common.base.Verify.verify; +import static io.trino.plugin.jdbc.DynamicFilteringJdbcSessionProperties.getDynamicFilteringWaitTimeout; +import static io.trino.plugin.jdbc.DynamicFilteringJdbcSessionProperties.isEnableDynamicFiltering; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class JdbcRecordSetProvider implements ConnectorRecordSetProvider @@ -44,6 +48,31 @@ public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorSer this.executor = requireNonNull(executor, "executor is null"); } + @Override + public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns, DynamicFilter dynamicFilter) + { + if (isEnableDynamicFiltering(session)) { + long timeoutMillis = getDynamicFilteringWaitTimeout(session).toMillis(); + + // no time for further narrow down or done already + if (timeoutMillis == 0 || !dynamicFilter.isAwaitable()) { + JdbcTableHandle handle = ((JdbcTableHandle) table).withConstraint(dynamicFilter.getCurrentPredicate()); + return getRecordSet(transaction, session, split, handle, columns); + } + + try { + dynamicFilter.isBlocked().get(timeoutMillis, MILLISECONDS); + JdbcTableHandle handle = ((JdbcTableHandle) table).withConstraint(dynamicFilter.getCurrentPredicate()); + return getRecordSet(transaction, session, split, handle, columns); + } + catch (Exception e) { + return getRecordSet(transaction, session, split, table, columns); + } + } + + return getRecordSet(transaction, session, split, table, columns); + } + @Override public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java index 48d08c453c81..3794a8a9a691 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java @@ -103,6 +103,16 @@ public JdbcTableHandle( this.nextSyntheticColumnId = nextSyntheticColumnId; } + public JdbcTableHandle withConstraint(TupleDomain newConstraint) + { + TupleDomain newDomain = constraint.intersect(newConstraint); + if (newDomain == constraint) { + return this; + } + + return new JdbcTableHandle(relationHandle, newDomain, sortOrder, limit, columns, otherReferencedTables, nextSyntheticColumnId); + } + /** * @deprecated Use {@code asPlainTable().getSchemaTableName()} instead, but see those methods for more information, as this is not a drop-in replacement. */ diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java new file mode 100644 index 000000000000..eadde612b5b0 --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java @@ -0,0 +1,242 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Ints; +import io.trino.Session; +import io.trino.execution.QueryStats; +import io.trino.operator.OperatorStats; +import io.trino.spi.QueryId; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryRunner; +import io.trino.testing.ResultWithQueryId; +import io.trino.tpch.TpchTable; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; +import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; +import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST; +import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED; +import static io.trino.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.NONE; +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestJdbcDynamicFiltering + extends AbstractTestQueryFramework +{ + private static final int LINEITEM_COUNT = 60175; + + private static List getOperatorRowsRead(DistributedQueryRunner runner, QueryId queryId) + { + QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats(); + return stats.getOperatorSummaries() + .stream() + .filter(summary -> summary.getOperatorType().equals("ScanFilterAndProjectOperator")) + .map(OperatorStats::getInputPositions) + .map(Math::toIntExact) + .collect(toImmutableList()); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Map properties = ImmutableMap.builder() + .put("connection-url", format("jdbc:h2:mem:test%s;DB_CLOSE_DELAY=-1", System.nanoTime() + ThreadLocalRandom.current().nextLong())) + .put("dynamic-filtering.enabled", "true") + .put("dynamic-filtering.wait-timeout", "30s") + .build(); + return H2QueryRunner.createH2QueryRunner(List.of(TpchTable.LINE_ITEM, TpchTable.ORDERS, TpchTable.PART, TpchTable.CUSTOMER), properties); + } + + @Test + public void testJoinDynamicFilteringNone() + { + // Probe-side is not scanned at all, due to dynamic filtering: + assertDynamicFiltering( + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice < 0", + withBroadcastJoin(), + 0, + 0, 0); + } + + @Test + public void testPartitionedJoinNoDynamicFiltering() + { + // Probe-side is fully scanned, because local dynamic filtering does not work for partitioned joins: + assertDynamicFiltering( + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice < 0", + withPartitionedJoin(), + 0, + LINEITEM_COUNT, 0); + } + + @Test + public void testJoinDynamicFilteringWithMixedConstraint() + { + // Probe-side is fully scanned, because local dynamic filtering does not work for partitioned joins: + assertDynamicFiltering( + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.comment = 'nstructions sleep furiously among ' AND lineitem.linenumber < 0", + withBroadcastJoin(), + 0, + 0, 1); + } + + @Test + public void testJoinDynamicFilteringSingleValue() + { + assertQueryResult("SELECT orderkey FROM orders WHERE comment = 'nstructions sleep furiously among '", 1L); + assertQueryResult("SELECT COUNT() FROM lineitem WHERE orderkey = 1", 6L); + + assertQueryResult("SELECT partkey FROM part WHERE comment = 'onic deposits'", 1552L); + assertQueryResult("SELECT COUNT() FROM lineitem WHERE partkey = 1552", 39L); + + // Join lineitem with a single row of orders + assertDynamicFiltering( + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.comment = 'nstructions sleep furiously among '", + withBroadcastJoin(), + 6, + 6, 1); + + // Join lineitem with a single row of part + assertDynamicFiltering( + "SELECT l.comment FROM lineitem l, part p WHERE p.partkey = l.partkey AND p.comment = 'onic deposits'", + withBroadcastJoin(), + 39, + 39, 1); + } + + @Test + public void testJoinDynamicFilteringBlockProbeSide() + { + // Wait for both build sides to finish before starting the scan of 'lineitem' table (should be very selective given the dynamic filters). + assertDynamicFiltering( + "SELECT l.comment" + + " FROM lineitem l, part p, orders o" + + " WHERE l.orderkey = o.orderkey AND o.comment = 'nstructions sleep furiously among '" + + " AND p.partkey = l.partkey AND p.comment = 'onic deposits'", + withBroadcastJoinNonReordering(), + 1, + 1, 1, 1); + } + + @Test + public void testSemiJoinDynamicFilteringNone() + { + // Probe-side is not scanned at all, due to dynamic filtering: + assertDynamicFiltering( + "SELECT * FROM lineitem WHERE lineitem.orderkey IN (SELECT orders.orderkey FROM orders WHERE orders.totalprice < 0)", + withBroadcastJoin(), + 0, + 0, 0); + } + + @Test + public void testPartitionedSemiJoinNoDynamicFiltering() + { + // Probe-side is fully scanned, because local dynamic filtering does not work for partitioned joins: + assertDynamicFiltering( + "SELECT * FROM lineitem WHERE lineitem.orderkey IN (SELECT orders.orderkey FROM orders WHERE orders.totalprice < 0)", + withPartitionedJoin(), + 0, + LINEITEM_COUNT, 0); + } + + @Test + public void testSemiJoinDynamicFilteringSingleValue() + { + // Join lineitem with a single row of orders + assertDynamicFiltering( + "SELECT * FROM lineitem WHERE lineitem.orderkey IN (SELECT orders.orderkey FROM orders WHERE orders.comment = 'nstructions sleep furiously among ')", + withBroadcastJoin(), + 6, + 6, 1); + + // Join lineitem with a single row of part + assertDynamicFiltering( + "SELECT l.comment FROM lineitem l WHERE l.partkey IN (SELECT p.partkey FROM part p WHERE p.comment = 'onic deposits')", + withBroadcastJoin(), + 39, + 39, 1); + } + + @Test + public void testSemiJoinDynamicFilteringBlockProbeSide() + { + // Wait for both build sides to finish before starting the scan of 'lineitem' table (should be very selective given the dynamic filters). + assertDynamicFiltering( + "SELECT t.comment FROM " + + "(SELECT * FROM lineitem l WHERE l.orderkey IN (SELECT o.orderkey FROM orders o WHERE o.comment = 'nstructions sleep furiously among ')) t " + + "WHERE t.partkey IN (SELECT p.partkey FROM part p WHERE p.comment = 'onic deposits')", + withBroadcastJoinNonReordering(), + 1, + 1, 1, 1); + } + + private void assertDynamicFiltering(@Language("SQL") String selectQuery, Session session, int expectedRowCount, int... expectedOperatorRowsRead) + { + ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(session, selectQuery); + + assertEquals(result.getResult().getRowCount(), expectedRowCount); + assertEquals(getOperatorRowsRead(getDistributedQueryRunner(), result.getQueryId()), Ints.asList(expectedOperatorRowsRead)); + } + + private Session withBroadcastJoin() + { + return Session.builder(getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) + .build(); + } + + private Session withBroadcastJoinNonReordering() + { + return Session.builder(getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) + .setSystemProperty(JOIN_REORDERING_STRATEGY, NONE.name()) + .build(); + } + + private Session withPartitionedJoin() + { + return Session.builder(getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, PARTITIONED.name()) + .build(); + } + + private void assertQueryResult(@Language("SQL") String sql, Object... expected) + { + MaterializedResult rows = computeActual(sql); + assertEquals(rows.getRowCount(), expected.length); + + for (int i = 0; i < expected.length; i++) { + MaterializedRow materializedRow = rows.getMaterializedRows().get(i); + int fieldCount = materializedRow.getFieldCount(); + assertEquals(fieldCount, 1, format("Expected only one column, but got '%d'", fieldCount)); + Object value = materializedRow.getField(0); + assertEquals(value, expected[i]); + assertEquals(materializedRow.getFieldCount(), 1); + } + } +} diff --git a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixClientModule.java b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixClientModule.java index 29e3353fd219..fa5eed5cc842 100644 --- a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixClientModule.java +++ b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixClientModule.java @@ -29,6 +29,8 @@ import io.trino.plugin.jdbc.DecimalModule; import io.trino.plugin.jdbc.DefaultQueryBuilder; import io.trino.plugin.jdbc.DriverConnectionFactory; +import io.trino.plugin.jdbc.DynamicFilteringJdbcConfig; +import io.trino.plugin.jdbc.DynamicFilteringJdbcSessionProperties; import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.ForLazyConnectionFactory; import io.trino.plugin.jdbc.ForRecordCursor; @@ -97,6 +99,9 @@ protected void setup(Binder binder) configBinder(binder).bindConfig(JdbcMetadataConfig.class); configBinder(binder).bindConfig(JdbcWriteConfig.class); + configBinder(binder).bindConfig(DynamicFilteringJdbcConfig.class); + bindSessionPropertiesProvider(binder, DynamicFilteringJdbcSessionProperties.class); + binder.bind(PhoenixClient.class).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(Key.get(PhoenixClient.class)).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).to(Key.get(JdbcClient.class, StatsCollecting.class)).in(Scopes.SINGLETON); diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 780746a44e15..5fc3be37cda7 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -29,6 +29,8 @@ import io.trino.plugin.jdbc.DecimalModule; import io.trino.plugin.jdbc.DefaultQueryBuilder; import io.trino.plugin.jdbc.DriverConnectionFactory; +import io.trino.plugin.jdbc.DynamicFilteringJdbcConfig; +import io.trino.plugin.jdbc.DynamicFilteringJdbcSessionProperties; import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.ForLazyConnectionFactory; import io.trino.plugin.jdbc.ForRecordCursor; @@ -94,6 +96,9 @@ protected void setup(Binder binder) bindSessionPropertiesProvider(binder, JdbcWriteSessionProperties.class); bindSessionPropertiesProvider(binder, PhoenixSessionProperties.class); + configBinder(binder).bindConfig(DynamicFilteringJdbcConfig.class); + bindSessionPropertiesProvider(binder, DynamicFilteringJdbcSessionProperties.class); + configBinder(binder).bindConfig(JdbcMetadataConfig.class); configBinder(binder).bindConfig(JdbcWriteConfig.class); From cd1674c7772affa27253c8032e03ce1e3b878748 Mon Sep 17 00:00:00 2001 From: Liu Yang Date: Wed, 8 Jun 2022 16:13:39 +0800 Subject: [PATCH 2/2] update code and fix styles --- .../main/java/io/trino/plugin/jdbc/JdbcTableHandle.java | 2 +- .../io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java index 3794a8a9a691..6e415f5b2235 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java @@ -110,7 +110,7 @@ public JdbcTableHandle withConstraint(TupleDomain newConstraint) return this; } - return new JdbcTableHandle(relationHandle, newDomain, sortOrder, limit, columns, otherReferencedTables, nextSyntheticColumnId); + return new JdbcTableHandle(relationHandle, newDomain, constraintExpressions, sortOrder, limit, columns, otherReferencedTables, nextSyntheticColumnId); } /** diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java index eadde612b5b0..66227a008f5c 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcDynamicFiltering.java @@ -36,9 +36,9 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; -import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST; -import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED; -import static io.trino.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.NONE; +import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST; +import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED; +import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE; import static java.lang.String.format; import static org.testng.Assert.assertEquals; @@ -67,7 +67,7 @@ protected QueryRunner createQueryRunner() .put("connection-url", format("jdbc:h2:mem:test%s;DB_CLOSE_DELAY=-1", System.nanoTime() + ThreadLocalRandom.current().nextLong())) .put("dynamic-filtering.enabled", "true") .put("dynamic-filtering.wait-timeout", "30s") - .build(); + .buildOrThrow(); return H2QueryRunner.createH2QueryRunner(List.of(TpchTable.LINE_ITEM, TpchTable.ORDERS, TpchTable.PART, TpchTable.CUSTOMER), properties); }