diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java index b7ff37e51f92..67854da168c9 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java @@ -45,6 +45,7 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -68,6 +69,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.jdbc.CaseSensitivity.CASE_INSENSITIVE; +import static io.trino.plugin.jdbc.CaseSensitivity.CASE_SENSITIVE; import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.isNonTransactionalInsert; import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN; @@ -221,6 +224,45 @@ public Optional getTableHandle(ConnectorSession session, Schema } } + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + ImmutableList.Builder columns = ImmutableList.builder(); + try (Connection connection = connectionFactory.openConnection(session); + PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) { + ResultSetMetaData metaData = preparedStatement.getMetaData(); + if (metaData == null) { + throw new UnsupportedOperationException("ResultSetMetaData not provided for query"); + } + for (int column = 1; column <= metaData.getColumnCount(); column++) { + JdbcTypeHandle jdbcTypeHandle = new JdbcTypeHandle( + metaData.getColumnType(column), + Optional.ofNullable(metaData.getColumnTypeName(column)), + Optional.of(metaData.getPrecision(column)), + Optional.of(metaData.getScale(column)), + Optional.empty(), // TODO support arrays + Optional.of(metaData.isCaseSensitive(column) ? CASE_SENSITIVE : CASE_INSENSITIVE)); + Type type = toColumnMapping(session, connection, jdbcTypeHandle) + .orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + jdbcTypeHandle)) + .getType(); + columns.add(new JdbcColumnHandle(metaData.getColumnName(column), jdbcTypeHandle, type)); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + + return new JdbcTableHandle( + new JdbcQueryRelationHandle(preparedQuery), + TupleDomain.all(), + ImmutableList.of(), + Optional.empty(), + OptionalLong.empty(), + Optional.of(columns.build()), + ImmutableSet.of(), // TODO return other tables referenced by the query + 0); + } + @Override public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 7979f86440c6..f05eedb9e1a2 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -288,6 +288,12 @@ public Optional getTableHandle(ConnectorSession session, Schema return get(tableHandleCache, key, () -> delegate.getTableHandle(session, schemaTableName)); } + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return delegate.getTableHandle(session, preparedQuery); // TODO add caching? + } + @Override public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java index 969848f7b4a1..a3f8309a2d9e 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java @@ -18,12 +18,14 @@ import com.google.common.collect.ImmutableSet; import io.airlift.slice.Slice; import io.trino.plugin.jdbc.PredicatePushdownController.DomainPushdownResult; +import io.trino.plugin.jdbc.ptf.RemoteQuery.RemoteQueryHandle; import io.trino.spi.TrinoException; import io.trino.spi.connector.AggregateFunction; import io.trino.spi.connector.AggregationApplicationResult; import io.trino.spi.connector.Assignment; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ColumnSchema; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorOutputTableHandle; @@ -45,6 +47,7 @@ import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SortItem; import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; @@ -53,6 +56,7 @@ import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.security.AccessDeniedException; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; @@ -123,6 +127,12 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName .orElse(null); } + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return jdbcClient.getTableHandle(session, preparedQuery); + } + @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { @@ -318,10 +328,10 @@ public Optional> applyAggrega .flatMap(List::stream) .distinct() .peek(handle.getColumns().>map( - columns -> groupKey -> verify(columns.contains(groupKey), - "applyAggregation called with a grouping column %s which was not included in the table columns: %s", - groupKey, - tableColumns)) + columns -> groupKey -> verify(columns.contains(groupKey), + "applyAggregation called with a grouping column %s which was not included in the table columns: %s", + groupKey, + tableColumns)) .orElse(groupKey -> {})) .forEach(newColumns::add); @@ -559,6 +569,24 @@ public Optional> applyTopN( return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNGuaranteed(session), precalculateStatisticsForPushdown)); } + @Override + public Optional> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle) + { + if (!(handle instanceof RemoteQueryHandle)) { + return Optional.empty(); + } + + ConnectorTableHandle tableHandle = ((RemoteQueryHandle) handle).getTableHandle(); + ConnectorTableSchema tableSchema = getTableSchema(session, tableHandle); + Map columnHandlesByName = getColumnHandles(session, tableHandle); + List columnHandles = tableSchema.getColumns().stream() + .map(ColumnSchema::getName) + .map(columnHandlesByName::get) + .collect(toImmutableList()); + + return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles)); + } + @Override public Optional applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle table) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java index ac7488fbd11a..d1ed89adc02d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java @@ -84,6 +84,12 @@ public Optional getTableHandle(ConnectorSession session, Schema return delegate().getTableHandle(session, schemaTableName); } + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return delegate().getTableHandle(session, preparedQuery); + } + @Override public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java index 223fbcc1b283..73a8d738c358 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java @@ -55,6 +55,8 @@ default boolean schemaExists(ConnectorSession session, String schema) Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName); + JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery); + List getColumns(ConnectorSession session, JdbcTableHandle tableHandle); Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java index 616d955dfcd3..51c414a05e31 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java @@ -27,6 +27,7 @@ import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.procedure.Procedure; +import io.trino.spi.ptf.ConnectorTableFunction; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; @@ -50,6 +51,7 @@ public class JdbcConnector private final ConnectorPageSinkProvider jdbcPageSinkProvider; private final Optional accessControl; private final Set procedures; + private final Set connectorTableFunctions; private final List> sessionProperties; private final List> tableProperties; private final JdbcTransactionManager transactionManager; @@ -62,6 +64,7 @@ public JdbcConnector( ConnectorPageSinkProvider jdbcPageSinkProvider, Optional accessControl, Set procedures, + Set connectorTableFunctions, Set sessionProperties, Set tableProperties, JdbcTransactionManager transactionManager) @@ -72,6 +75,7 @@ public JdbcConnector( this.jdbcPageSinkProvider = requireNonNull(jdbcPageSinkProvider, "jdbcPageSinkProvider is null"); this.accessControl = requireNonNull(accessControl, "accessControl is null"); this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null")); + this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "tableFunctions is null")); this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null").stream() .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream()) .collect(toImmutableList()); @@ -135,6 +139,12 @@ public Set getProcedures() return procedures; } + @Override + public Set getTableFunctions() + { + return connectorTableFunctions; + } + @Override public List> getSessionProperties() { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java index 360aa8ba1e29..10104eceec50 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java @@ -14,9 +14,12 @@ package io.trino.plugin.jdbc; import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; public interface JdbcMetadata extends ConnectorMetadata { + JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery); + void rollback(); } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 054efe37cc6a..55af51537629 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -28,6 +28,7 @@ import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.procedure.Procedure; +import io.trino.spi.ptf.ConnectorTableFunction; import javax.annotation.PreDestroy; import javax.inject.Provider; @@ -83,6 +84,8 @@ public void setup(Binder binder) newSetBinder(binder, Procedure.class).addBinding().toProvider(FlushJdbcMetadataCacheProcedure.class).in(Scopes.SINGLETON); + newSetBinder(binder, ConnectorTableFunction.class); + binder.bind(ConnectionFactory.class) .annotatedWith(ForLazyConnectionFactory.class) .to(Key.get(ConnectionFactory.class, StatsCollecting.class)) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java index f6b58fde5e02..b6c3fa1df277 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java @@ -104,6 +104,12 @@ public Optional getTableHandle(ConnectorSession session, Schema return stats.getGetTableHandle().wrap(() -> delegate().getTableHandle(session, schemaTableName)); } + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return delegate().getTableHandle(session, preparedQuery); // TODO add stats? + } + @Override public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ptf/RemoteQuery.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ptf/RemoteQuery.java new file mode 100644 index 000000000000..dad34b6d1d21 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ptf/RemoteQuery.java @@ -0,0 +1,120 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc.ptf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcMetadata; +import io.trino.plugin.jdbc.JdbcTableHandle; +import io.trino.plugin.jdbc.JdbcTransactionManager; +import io.trino.plugin.jdbc.PreparedQuery; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.ptf.Argument; +import io.trino.spi.ptf.ConnectorTableFunction; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; +import io.trino.spi.ptf.Descriptor; +import io.trino.spi.ptf.Descriptor.Field; +import io.trino.spi.ptf.ScalarArgument; +import io.trino.spi.ptf.ScalarArgumentSpecification; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.spi.ptf.DescriptorMapping.EMPTY_MAPPING; +import static io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; + +public class RemoteQuery + implements Provider +{ + public static final String NAME = "remote_query"; + + private final JdbcTransactionManager transactionManager; + + @Inject + public RemoteQuery(JdbcTransactionManager transactionManager) + { + this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + } + + @Override + public ConnectorTableFunction get() + { + // TODO wrap in ClassLoaderSafeConnectorTableFunction? (see also TestClassLoaderSafeWrappers) + return new RemoteQueryFunction(transactionManager); + } + + public static class RemoteQueryFunction + extends ConnectorTableFunction + { + public static final String SCHEMA_NAME = "system"; + + private final JdbcTransactionManager transactionManager; + + public RemoteQueryFunction(JdbcTransactionManager transactionManager) + { + super(SCHEMA_NAME, NAME, List.of(new ScalarArgumentSpecification("query", VARCHAR)), GENERIC_TABLE); + this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + } + + @Override + public Analysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) + { + ScalarArgument argument = (ScalarArgument) getOnlyElement(arguments.values()); + String query = ((Slice) argument.getValue()).toStringUtf8(); + PreparedQuery preparedQuery = new PreparedQuery(query, ImmutableList.of()); + + JdbcMetadata metadata = transactionManager.getMetadata(transaction); + JdbcTableHandle tableHandle = metadata.getTableHandle(session, preparedQuery); + List columns = tableHandle.getColumns().orElseThrow(() -> new IllegalStateException("Query result has no columns")); + Descriptor returnedType = new Descriptor(columns.stream() + .map(column -> new Field(column.getColumnName(), Optional.of(column.getColumnType()))) + .collect(toList())); + + RemoteQueryHandle handle = new RemoteQueryHandle(tableHandle); + + return new Analysis(Optional.of(returnedType), EMPTY_MAPPING, handle); + } + } + + public static class RemoteQueryHandle + implements ConnectorTableFunctionHandle + { + private final JdbcTableHandle tableHandle; + + @JsonCreator + public RemoteQueryHandle(@JsonProperty("tableHandle") JdbcTableHandle tableHandle) + { + this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + } + + @JsonProperty + public ConnectorTableHandle getTableHandle() + { + return tableHandle; + } + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java index 59f1e8e1d028..11c6529fca0e 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java @@ -1548,4 +1548,11 @@ public static Object[][] batchSizeAndTotalNumberOfRowsToInsertDataProvider() {10, 52}, // number of rows > n * batch size }; } + + @Test + public void testRemoteQueryTableFunction() + { + assertThat(query("SELECT * FROM TABLE(" + getSession().getCatalog().orElseThrow() + ".system.remote_query(\"query\" => 'SELECT 1'))")) + .matches("VALUES 1"); + } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcModule.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcModule.java index ec99a4989f8c..412bc3e06bc0 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcModule.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcModule.java @@ -17,14 +17,18 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.Scopes; import com.google.inject.Singleton; import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.plugin.jdbc.mapping.IdentifierMapping; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import org.h2.Driver; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -44,7 +48,10 @@ public TestingH2JdbcModule(TestingH2JdbcClientFactory testingH2JdbcClientFactory } @Override - public void configure(Binder binder) {} + public void configure(Binder binder) + { + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); + } @Provides @Singleton diff --git a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java index e2bed521d989..1867cb09abf9 100644 --- a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java +++ b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java @@ -593,6 +593,14 @@ public void testCharTrailingSpace() throw new SkipException("Implement test for ClickHouse"); } + @Override + public void testRemoteQueryTableFunction() + { + // table function disabled for ClickHouse, because it doesn't provide ResultSetMetaData, so the result relation type cannot be determined + assertThatThrownBy(super::testRemoteQueryTableFunction) + .hasMessage("line 1:21: Table function clickhouse.system.remote_query not registered"); + } + @Override protected SqlExecutor onRemoteDatabase() { diff --git a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClientModule.java b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClientModule.java index bc781591366f..fd3c6b041709 100644 --- a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClientModule.java +++ b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClientModule.java @@ -24,10 +24,14 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import org.apache.calcite.avatica.remote.Driver; import java.util.Properties; +import static com.google.inject.multibindings.Multibinder.newSetBinder; + public class DruidJdbcClientModule implements Module { @@ -35,6 +39,7 @@ public class DruidJdbcClientModule public void configure(Binder binder) { binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(DruidJdbcClient.class).in(Scopes.SINGLETON); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); } @Provides diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnector.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnector.java index 98cf2a640f19..3e22266d36d7 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnector.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnector.java @@ -22,6 +22,7 @@ import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SystemTable; +import io.trino.spi.ptf.ConnectorTableFunction; import io.trino.spi.transaction.IsolationLevel; import javax.inject.Inject; @@ -40,6 +41,7 @@ public class ElasticsearchConnector private final ElasticsearchSplitManager splitManager; private final ElasticsearchPageSourceProvider pageSourceProvider; private final NodesSystemTable nodesSystemTable; + private final Set connectorTableFunctions; @Inject public ElasticsearchConnector( @@ -47,13 +49,15 @@ public ElasticsearchConnector( ElasticsearchMetadata metadata, ElasticsearchSplitManager splitManager, ElasticsearchPageSourceProvider pageSourceProvider, - NodesSystemTable nodesSystemTable) + NodesSystemTable nodesSystemTable, + Set connectorTableFunctions) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.splitManager = requireNonNull(splitManager, "splitManager is null"); this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); this.nodesSystemTable = requireNonNull(nodesSystemTable, "nodesSystemTable is null"); + this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "tableFunctions is null")); } @Override @@ -87,6 +91,12 @@ public Set getSystemTables() return ImmutableSet.of(nodesSystemTable); } + @Override + public Set getTableFunctions() + { + return connectorTableFunctions; + } + @Override public final void shutdown() { diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnectorModule.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnectorModule.java index d8bf4b832cc7..7689a73058e1 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnectorModule.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConnectorModule.java @@ -17,7 +17,10 @@ import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.elasticsearch.client.ElasticsearchClient; +import io.trino.plugin.elasticsearch.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; @@ -46,6 +49,8 @@ protected void setup(Binder binder) newOptionalBinder(binder, AwsSecurityConfig.class); newOptionalBinder(binder, PasswordConfig.class); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); + install(conditionalModule( ElasticsearchConfig.class, config -> config.getSecurity() diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java index 46bb2322465f..2de3b5fbfcb7 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java @@ -42,25 +42,30 @@ import io.trino.plugin.elasticsearch.decoders.TinyintDecoder; import io.trino.plugin.elasticsearch.decoders.VarbinaryDecoder; import io.trino.plugin.elasticsearch.decoders.VarcharDecoder; +import io.trino.plugin.elasticsearch.ptf.RemoteQuery.RemoteQueryHandle; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ColumnSchema; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableSchema; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; +import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.expression.Call; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Constant; import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.type.ArrayType; import io.trino.spi.type.RowType; import io.trino.spi.type.StandardTypes; @@ -563,8 +568,8 @@ public Optional> applyFilter(C if (!newRegexes.containsKey(columnName) && pattern instanceof Slice) { IndexMetadata metadata = client.getIndexMetadata(handle.getIndex()); if (metadata.getSchema() - .getFields().stream() - .anyMatch(field -> columnName.equals(field.getName()) && field.getType() instanceof PrimitiveType && "keyword".equals(((PrimitiveType) field.getType()).getName()))) { + .getFields().stream() + .anyMatch(field -> columnName.equals(field.getName()) && field.getType() instanceof PrimitiveType && "keyword".equals(((PrimitiveType) field.getType()).getName()))) { newRegexes.put(columnName, likeToRegexp(((Slice) pattern), escape)); continue; } @@ -676,6 +681,24 @@ private static boolean isPassthroughQuery(ElasticsearchTableHandle table) return table.getType().equals(QUERY); } + @Override + public Optional> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle) + { + if (!(handle instanceof RemoteQueryHandle)) { + return Optional.empty(); + } + + ConnectorTableHandle tableHandle = ((RemoteQueryHandle) handle).getTableHandle(); + ConnectorTableSchema tableSchema = getTableSchema(session, tableHandle); + Map columnHandlesByName = getColumnHandles(session, tableHandle); + List columnHandles = tableSchema.getColumns().stream() + .map(ColumnSchema::getName) + .map(columnHandlesByName::get) + .collect(toImmutableList()); + + return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles)); + } + private static class InternalTableMetadata { private final SchemaTableName tableName; diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ptf/RemoteQuery.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ptf/RemoteQuery.java new file mode 100644 index 000000000000..2a25f6543934 --- /dev/null +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ptf/RemoteQuery.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.elasticsearch.ptf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.Slice; +import io.trino.plugin.elasticsearch.ElasticsearchColumnHandle; +import io.trino.plugin.elasticsearch.ElasticsearchMetadata; +import io.trino.plugin.elasticsearch.ElasticsearchTableHandle; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnSchema; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableSchema; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.ptf.Argument; +import io.trino.spi.ptf.ConnectorTableFunction; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; +import io.trino.spi.ptf.Descriptor; +import io.trino.spi.ptf.ScalarArgument; +import io.trino.spi.ptf.ScalarArgumentSpecification; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.elasticsearch.ElasticsearchTableHandle.Type.QUERY; +import static io.trino.spi.ptf.DescriptorMapping.EMPTY_MAPPING; +import static io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; + +public class RemoteQuery + implements Provider +{ + public static final String NAME = "remote_query"; + + private final ElasticsearchMetadata metadata; + + @Inject + public RemoteQuery(ElasticsearchMetadata metadata) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public ConnectorTableFunction get() + { + return new RemoteQueryFunction(metadata); + } + + public static class RemoteQueryFunction + extends ConnectorTableFunction + { + public static final String SCHEMA_NAME = "system"; + + private final ElasticsearchMetadata metadata; + + public RemoteQueryFunction(ElasticsearchMetadata metadata) + { + super( + SCHEMA_NAME, + NAME, + List.of( + new ScalarArgumentSpecification("schema", VARCHAR), + new ScalarArgumentSpecification("index", VARCHAR), + new ScalarArgumentSpecification("query", VARCHAR)), + GENERIC_TABLE); + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public Analysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) + { + String schema = ((Slice) ((ScalarArgument) arguments.get("schema")).getValue()).toStringUtf8(); + String index = ((Slice) ((ScalarArgument) arguments.get("index")).getValue()).toStringUtf8(); + String query = ((Slice) ((ScalarArgument) arguments.get("query")).getValue()).toStringUtf8(); + + ElasticsearchTableHandle tableHandle = new ElasticsearchTableHandle(QUERY, schema, index, Optional.of(query)); + ConnectorTableSchema tableSchema = metadata.getTableSchema(session, tableHandle); + Map columnsByName = metadata.getColumnHandles(session, tableHandle); + List columns = tableSchema.getColumns().stream() + .map(ColumnSchema::getName) + .map(columnsByName::get) + .collect(toImmutableList()); + + Descriptor returnedType = new Descriptor(columns.stream() + .map(ElasticsearchColumnHandle.class::cast) + .map(column -> new Descriptor.Field(column.getName(), Optional.of(column.getType()))) + .collect(toList())); + + RemoteQueryHandle handle = new RemoteQueryHandle(tableHandle); + + return new Analysis(Optional.of(returnedType), EMPTY_MAPPING, handle); + } + } + + public static class RemoteQueryHandle + implements ConnectorTableFunctionHandle + { + private final ElasticsearchTableHandle tableHandle; + + @JsonCreator + public RemoteQueryHandle(@JsonProperty("tableHandle") ElasticsearchTableHandle tableHandle) + { + this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + } + + @JsonProperty + public ConnectorTableHandle getTableHandle() + { + return tableHandle; + } + } +} diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java index 05ef17e43ee1..d5570657c560 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java @@ -1860,6 +1860,12 @@ public void testMissingIndex() assertTableDoesNotExist("nonexistent_table"); } + @Test + public void testRemoteQueryTableFunction() + { + assertQuerySucceeds("SELECT * FROM TABLE(\"" + getSession().getCatalog().orElseThrow() + "\".\"system\".\"remote_query\"(\"schema\" => 'tpch', \"index\" => 'nation', \"query\" => '{\"query\": {\"match\": {\"name\": \"ALGERIA\"}}}'))"); + } + protected void assertTableDoesNotExist(String name) { assertQueryReturnsEmptyResult(format("SELECT * FROM information_schema.columns WHERE table_name = '%s'", name)); diff --git a/plugin/trino-mariadb/src/main/java/io/trino/plugin/mariadb/MariaDbClientModule.java b/plugin/trino-mariadb/src/main/java/io/trino/plugin/mariadb/MariaDbClientModule.java index e89d84cb2f69..a6fef042da46 100644 --- a/plugin/trino-mariadb/src/main/java/io/trino/plugin/mariadb/MariaDbClientModule.java +++ b/plugin/trino-mariadb/src/main/java/io/trino/plugin/mariadb/MariaDbClientModule.java @@ -25,10 +25,14 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import org.mariadb.jdbc.Driver; import java.util.Properties; +import static com.google.inject.multibindings.Multibinder.newSetBinder; + public class MariaDbClientModule implements Module { @@ -37,6 +41,7 @@ public void configure(Binder binder) { binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(MariaDbClient.class).in(Scopes.SINGLETON); binder.install(new DecimalModule()); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); } @Provides diff --git a/plugin/trino-mariadb/src/test/java/io/trino/plugin/mariadb/TestMariaDbConnectorTest.java b/plugin/trino-mariadb/src/test/java/io/trino/plugin/mariadb/TestMariaDbConnectorTest.java index 42db2c33a58e..2dc2149d5685 100644 --- a/plugin/trino-mariadb/src/test/java/io/trino/plugin/mariadb/TestMariaDbConnectorTest.java +++ b/plugin/trino-mariadb/src/test/java/io/trino/plugin/mariadb/TestMariaDbConnectorTest.java @@ -18,6 +18,7 @@ import io.trino.testing.sql.SqlExecutor; import static io.trino.plugin.mariadb.MariaDbQueryRunner.createMariaDbQueryRunner; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestMariaDbConnectorTest @@ -43,4 +44,12 @@ public void testRenameColumn() assertThatThrownBy(super::testRenameColumn) .hasMessageContaining("Rename column not supported for the MariaDB server version"); } + + @Override + public void testRemoteQueryTableFunction() + { + // override because mariaDB returns bigint + assertThat(query("SELECT * FROM TABLE(" + getSession().getCatalog().orElseThrow() + ".system.remote_query(\"query\" => 'SELECT 1'))")) + .matches("VALUES BIGINT '1'"); + } } diff --git a/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClientModule.java b/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClientModule.java index d237a43536a9..343067b01bd7 100644 --- a/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClientModule.java +++ b/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClientModule.java @@ -28,10 +28,13 @@ import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule; import io.trino.plugin.jdbc.JdbcStatisticsConfig; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import java.sql.SQLException; import java.util.Properties; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; public class MySqlClientModule @@ -46,6 +49,7 @@ protected void setup(Binder binder) configBinder(binder).bindConfig(JdbcStatisticsConfig.class); install(new DecimalModule()); install(new JdbcJoinPushdownSupportModule()); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); } @Provides diff --git a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlConnectorTest.java b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlConnectorTest.java index c12ca308f003..6f1986887b21 100644 --- a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlConnectorTest.java +++ b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlConnectorTest.java @@ -15,8 +15,10 @@ import com.google.common.collect.ImmutableMap; import io.trino.testing.QueryRunner; +import org.testng.annotations.Test; import static io.trino.plugin.mysql.MySqlQueryRunner.createMySqlQueryRunner; +import static org.assertj.core.api.Assertions.assertThat; public class TestMySqlConnectorTest extends BaseMySqlConnectorTest @@ -28,4 +30,13 @@ protected QueryRunner createQueryRunner() mySqlServer = closeAfterClass(new TestingMySqlServer(false)); return createMySqlQueryRunner(mySqlServer, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); } + + @Test + @Override + public void testRemoteQueryTableFunction() + { + // override because MySql returns bigint + assertThat(query("SELECT * FROM TABLE(" + getSession().getCatalog().orElseThrow() + ".system.remote_query(\"query\" => 'SELECT 1'))")) + .matches("VALUES BIGINT '1'"); + } } diff --git a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlLegacyConnectorTest.java b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlLegacyConnectorTest.java index c7acbed191dd..bd3e5a3ab8d1 100644 --- a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlLegacyConnectorTest.java +++ b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlLegacyConnectorTest.java @@ -135,6 +135,15 @@ public void testRenameColumn() .hasStackTraceContaining("RENAME COLUMN x TO before_y"); } + @Test + @Override + public void testRemoteQueryTableFunction() + { + // override because MySql returns bigint + assertThat(query("SELECT * FROM TABLE(" + getSession().getCatalog().orElseThrow() + ".system.remote_query(\"query\" => 'SELECT 1'))")) + .matches("VALUES BIGINT '1'"); + } + @Override protected Optional filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup) { diff --git a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java index 119704fdfea7..c29a910179bb 100644 --- a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java +++ b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java @@ -27,12 +27,15 @@ import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.RetryingConnectionFactory; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import oracle.jdbc.OracleConnection; import oracle.jdbc.OracleDriver; import java.sql.SQLException; import java.util.Properties; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider; @@ -48,6 +51,7 @@ public void configure(Binder binder) bindSessionPropertiesProvider(binder, OracleSessionProperties.class); configBinder(binder).bindConfig(OracleConfig.class); newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)).setBinding().toInstance(ORACLE_MAX_LIST_EXPRESSIONS); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); } @Provides diff --git a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestOracleConnectorTest.java b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestOracleConnectorTest.java index 2db89aa7217a..d1c346633bab 100644 --- a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestOracleConnectorTest.java +++ b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestOracleConnectorTest.java @@ -28,6 +28,7 @@ import static java.lang.String.format; import static java.util.stream.Collectors.joining; import static java.util.stream.IntStream.range; +import static org.assertj.core.api.Assertions.assertThat; public class TestOracleConnectorTest extends BaseOracleConnectorTest @@ -85,4 +86,12 @@ protected SqlExecutor onRemoteDatabase() { return oracleServer::execute; } + + @Override + public void testRemoteQueryTableFunction() + { + // override because Oracle requires the FROM clause, and it needs explicit type + assertThat(query("SELECT * FROM TABLE(" + getSession().getCatalog().orElseThrow() + ".system.remote_query(\"query\" => 'SELECT CAST(1 AS number(2, 1)) FROM DUAL'))")) + .matches("VALUES 1.0"); + } } diff --git a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java index a1080c6ea985..433f6a2fb727 100644 --- a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java @@ -428,6 +428,14 @@ public void testMissingColumnsOnInsert() assertQuery("SELECT * FROM test_col_insert", "SELECT 1, 'val1', 'val2'"); } + @Override + public void testRemoteQueryTableFunction() + { + // not implemented + assertThatThrownBy(super::testRemoteQueryTableFunction) + .hasMessage("line 1:21: Table function phoenix.system.remote_query not registered"); + } + @Override protected TestTable createTableWithDoubleAndRealColumns(String name, List rows) { diff --git a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java index 715c2d1ab2e7..9433684eced3 100644 --- a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java @@ -559,6 +559,14 @@ public void testUseSortedPropertiesForPartialTopNElimination() assertUpdate("DROP TABLE " + tableName); } + @Override + public void testRemoteQueryTableFunction() + { + // not implemented + assertThatThrownBy(super::testRemoteQueryTableFunction) + .hasMessage("line 1:21: Table function phoenix.system.remote_query not registered"); + } + @Override protected TestTable createTableWithDoubleAndRealColumns(String name, List rows) { diff --git a/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClientModule.java b/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClientModule.java index d1056ed7ae84..6a27a3adc473 100644 --- a/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClientModule.java +++ b/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClientModule.java @@ -29,8 +29,11 @@ import io.trino.plugin.jdbc.QueryBuilder; import io.trino.plugin.jdbc.RemoteQueryCancellationModule; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import org.postgresql.Driver; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider; @@ -48,6 +51,7 @@ public void setup(Binder binder) newOptionalBinder(binder, QueryBuilder.class).setBinding().to(CollationAwareQueryBuilder.class).in(Scopes.SINGLETON); install(new DecimalModule()); install(new JdbcJoinPushdownSupportModule()); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); install(new RemoteQueryCancellationModule()); } diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java index 73747ebc7904..6ab76dab95c7 100644 --- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java +++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java @@ -25,6 +25,10 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; public class RedshiftClientModule implements Module @@ -33,6 +37,7 @@ public class RedshiftClientModule public void configure(Binder binder) { binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(RedshiftClient.class).in(Scopes.SINGLETON); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); } @Singleton diff --git a/plugin/trino-singlestore/src/main/java/io/trino/plugin/singlestore/SingleStoreClientModule.java b/plugin/trino-singlestore/src/main/java/io/trino/plugin/singlestore/SingleStoreClientModule.java index d82603d3f866..d01ec01f94ce 100644 --- a/plugin/trino-singlestore/src/main/java/io/trino/plugin/singlestore/SingleStoreClientModule.java +++ b/plugin/trino-singlestore/src/main/java/io/trino/plugin/singlestore/SingleStoreClientModule.java @@ -26,9 +26,12 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; import java.util.Properties; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; public class SingleStoreClientModule @@ -40,6 +43,7 @@ public void configure(Binder binder) binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(SingleStoreClient.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(SingleStoreConfig.class); binder.install(new DecimalModule()); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); } @Provides diff --git a/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClientModule.java b/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClientModule.java index 9ae27d7f1834..5a5d42821dcd 100644 --- a/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClientModule.java +++ b/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClientModule.java @@ -29,7 +29,10 @@ import io.trino.plugin.jdbc.JdbcStatisticsConfig; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.plugin.jdbc.ptf.RemoteQuery; +import io.trino.spi.ptf.ConnectorTableFunction; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider; @@ -48,6 +51,7 @@ protected void setup(Binder binder) bindTablePropertiesProvider(binder, SqlServerTableProperties.class); bindSessionPropertiesProvider(binder, SqlServerSessionProperties.class); newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)).setBinding().toInstance(SQL_SERVER_MAX_LIST_EXPRESSIONS); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RemoteQuery.class).in(Scopes.SINGLETON); install(new JdbcJoinPushdownSupportModule()); } diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java index 7829463dad46..2cb763f916d0 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestSqlServerConnectorTest.java @@ -195,4 +195,13 @@ public static Object[][] timestampTypes() {"timestamp(12)"} }; } + + @Test + @Override + public void testRemoteQueryTableFunction() + { + // override because SqlServer provides an empty String as the name for unnamed column + assertThat(query("SELECT * FROM TABLE(" + getSession().getCatalog().orElseThrow() + ".system.remote_query(\"query\" => 'SELECT 1 a'))")) + .matches("VALUES 1"); + } }