-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Add table function to execute stored procedure in MySQL & SQLServer #15813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -245,27 +245,14 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema | |
| @Override | ||
| public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) | ||
| { | ||
| ImmutableList.Builder<JdbcColumnHandle> columns = ImmutableList.builder(); | ||
| List<JdbcColumnHandle> columns; | ||
| try (Connection connection = connectionFactory.openConnection(session); | ||
| PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) { | ||
| ResultSetMetaData metadata = preparedStatement.getMetaData(); | ||
| if (metadata == null) { | ||
| throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + preparedQuery.getQuery()); | ||
| } | ||
| for (int column = 1; column <= metadata.getColumnCount(); column++) { | ||
| String name = metadata.getColumnName(column); | ||
| JdbcTypeHandle jdbcTypeHandle = new JdbcTypeHandle( | ||
| metadata.getColumnType(column), | ||
| Optional.ofNullable(metadata.getColumnTypeName(column)), | ||
| Optional.of(metadata.getPrecision(column)), | ||
| Optional.of(metadata.getScale(column)), | ||
| Optional.empty(), // TODO support arrays | ||
| Optional.of(metadata.isCaseSensitive(column) ? CASE_SENSITIVE : CASE_INSENSITIVE)); | ||
| Type type = toColumnMapping(session, connection, jdbcTypeHandle) | ||
| .orElseThrow(() -> new UnsupportedOperationException(format("Unsupported type: %s of column: %s", jdbcTypeHandle, name))) | ||
| .getType(); | ||
| columns.add(new JdbcColumnHandle(name, jdbcTypeHandle, type)); | ||
| } | ||
| columns = getColumns(session, connection, metadata); | ||
| } | ||
| catch (SQLException e) { | ||
| throw new TrinoException(JDBC_ERROR, "Failed to get table handle for prepared query. " + firstNonNull(e.getMessage(), e), e); | ||
|
|
@@ -277,13 +264,63 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery pr | |
| ImmutableList.of(), | ||
| Optional.empty(), | ||
| OptionalLong.empty(), | ||
| Optional.of(columns.build()), | ||
| Optional.of(columns), | ||
| // The query is opaque, so we don't know referenced tables | ||
| Optional.empty(), | ||
| 0, | ||
| Optional.empty()); | ||
| } | ||
|
|
||
| @Override | ||
| public JdbcTableHandle getTableHandle(ConnectorSession session, ProcedureQuery procedureQuery) | ||
| { | ||
| List<JdbcColumnHandle> columns; | ||
| try (Connection connection = connectionFactory.openConnection(session); | ||
| PreparedStatement preparedStatement = queryBuilder.callProcedure(this, session, connection, procedureQuery)) { | ||
| ResultSetMetaData metadata = preparedStatement.getMetaData(); | ||
| if (metadata == null) { | ||
| throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + procedureQuery.getQuery()); | ||
| } | ||
| columns = getColumns(session, connection, metadata); | ||
| } | ||
| catch (SQLException e) { | ||
| throw new TrinoException(JDBC_ERROR, "Failed to get table handle for prepared query. " + firstNonNull(e.getMessage(), e), e); | ||
| } | ||
|
|
||
| return new JdbcTableHandle( | ||
| new JdbcProcedureRelationHandle(procedureQuery), | ||
| TupleDomain.all(), | ||
| ImmutableList.of(), | ||
| Optional.empty(), | ||
| OptionalLong.empty(), | ||
| Optional.of(columns), | ||
| // The query is opaque, so we don't know referenced tables | ||
| Optional.empty(), | ||
| 0, | ||
| Optional.empty()); | ||
| } | ||
|
|
||
| private List<JdbcColumnHandle> getColumns(ConnectorSession session, Connection connection, ResultSetMetaData metadata) | ||
| throws SQLException | ||
| { | ||
| ImmutableList.Builder<JdbcColumnHandle> columns = ImmutableList.builder(); | ||
| for (int column = 1; column <= metadata.getColumnCount(); column++) { | ||
| String name = metadata.getColumnName(column); | ||
| JdbcTypeHandle jdbcTypeHandle = new JdbcTypeHandle( | ||
| metadata.getColumnType(column), | ||
| Optional.ofNullable(metadata.getColumnTypeName(column)), | ||
| Optional.of(metadata.getPrecision(column)), | ||
| Optional.of(metadata.getScale(column)), | ||
| Optional.empty(), // TODO support arrays | ||
| Optional.of(metadata.isCaseSensitive(column) ? CASE_SENSITIVE : CASE_INSENSITIVE)); | ||
| Type type = toColumnMapping(session, connection, jdbcTypeHandle) | ||
| .orElseThrow(() -> new UnsupportedOperationException(format("Unsupported type: %s of column: %s", jdbcTypeHandle, name))) | ||
| .getType(); | ||
| columns.add(new JdbcColumnHandle(name, jdbcTypeHandle, type)); | ||
| } | ||
| return columns.build(); | ||
| } | ||
|
|
||
| @Override | ||
| public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle) | ||
| { | ||
|
|
@@ -448,6 +485,9 @@ public PreparedQuery prepareQuery( | |
| public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, JdbcTableHandle table, List<JdbcColumnHandle> columns) | ||
| throws SQLException | ||
| { | ||
| if (table.getRelationHandle() instanceof JdbcProcedureRelationHandle jdbcProcedureRelationHandle) { | ||
| return queryBuilder.callProcedure(this, session, connection, jdbcProcedureRelationHandle.getProcedureQuery()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this ignore all the attributes that may be set on |
||
| } | ||
| PreparedQuery preparedQuery = prepareQuery(session, connection, table, Optional.empty(), columns, ImmutableMap.of(), Optional.of(split)); | ||
| return queryBuilder.prepareStatement(this, session, connection, preparedQuery); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,7 @@ public class CachingJdbcClient | |
| private final Cache<TableNamesCacheKey, List<SchemaTableName>> tableNamesCache; | ||
| private final Cache<TableHandlesByNameCacheKey, Optional<JdbcTableHandle>> tableHandlesByNameCache; | ||
| private final Cache<TableHandlesByQueryCacheKey, JdbcTableHandle> tableHandlesByQueryCache; | ||
| private final Cache<TableHandlesByProcedureCacheKey, JdbcTableHandle> tableHandlesByProcedureCache; | ||
| private final Cache<ColumnsCacheKey, List<JdbcColumnHandle>> columnsCache; | ||
| private final Cache<JdbcTableHandle, TableStatistics> statisticsCache; | ||
|
|
||
|
|
@@ -130,6 +131,7 @@ public CachingJdbcClient( | |
| tableNamesCache = cacheBuilder.build(); | ||
| tableHandlesByNameCache = cacheBuilder.build(); | ||
| tableHandlesByQueryCache = cacheBuilder.build(); | ||
| tableHandlesByProcedureCache = cacheBuilder.build(); | ||
| columnsCache = cacheBuilder.build(); | ||
| statisticsCache = cacheBuilder.build(); | ||
| } | ||
|
|
@@ -299,6 +301,13 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery pr | |
| return get(tableHandlesByQueryCache, key, () -> delegate.getTableHandle(session, preparedQuery)); | ||
| } | ||
|
|
||
| @Override | ||
| public JdbcTableHandle getTableHandle(ConnectorSession session, ProcedureQuery procedureQuery) | ||
| { | ||
| TableHandlesByProcedureCacheKey key = new TableHandlesByProcedureCacheKey(getIdentityKey(session), procedureQuery); | ||
| return get(tableHandlesByProcedureCache, key, () -> delegate.getTableHandle(session, procedureQuery)); | ||
| } | ||
|
|
||
| @Override | ||
| public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds) | ||
| { | ||
|
|
@@ -738,6 +747,38 @@ public int hashCode() | |
| } | ||
| } | ||
|
|
||
| private static final class TableHandlesByProcedureCacheKey | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make it a record |
||
| { | ||
| private final IdentityCacheKey identity; | ||
| private final ProcedureQuery procedureQuery; | ||
|
|
||
| private TableHandlesByProcedureCacheKey(IdentityCacheKey identity, ProcedureQuery procedureQuery) | ||
| { | ||
| this.identity = requireNonNull(identity, "identity is null"); | ||
| this.procedureQuery = requireNonNull(procedureQuery, "procedure is null"); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) | ||
| { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| TableHandlesByProcedureCacheKey that = (TableHandlesByProcedureCacheKey) o; | ||
| return Objects.equals(identity, that.identity) && | ||
| Objects.equals(procedureQuery, that.procedureQuery); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() | ||
| { | ||
| return Objects.hash(identity, procedureQuery); | ||
| } | ||
| } | ||
|
|
||
| private static final class TableNamesCacheKey | ||
| { | ||
| private final IdentityCacheKey identity; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| import com.google.common.collect.ImmutableSet; | ||
| import io.airlift.slice.Slice; | ||
| import io.trino.plugin.jdbc.PredicatePushdownController.DomainPushdownResult; | ||
| import io.trino.plugin.jdbc.ptf.Procedure.ProcedureFunctionHandle; | ||
| import io.trino.plugin.jdbc.ptf.Query.QueryFunctionHandle; | ||
| import io.trino.spi.TrinoException; | ||
| import io.trino.spi.connector.AggregateFunction; | ||
|
|
@@ -141,6 +142,12 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery pr | |
| return jdbcClient.getTableHandle(session, preparedQuery); | ||
| } | ||
|
|
||
| @Override | ||
| public JdbcTableHandle getTableHandle(ConnectorSession session, ProcedureQuery procedureQuery) | ||
| { | ||
| return jdbcClient.getTableHandle(session, procedureQuery); | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName) | ||
| { | ||
|
|
@@ -151,6 +158,10 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl | |
| public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint) | ||
| { | ||
| JdbcTableHandle handle = (JdbcTableHandle) table; | ||
| if (handle.isProcedure()) { | ||
| return Optional.empty(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That means |
||
| } | ||
|
|
||
| if (handle.getSortOrder().isPresent() && handle.getLimit().isPresent()) { | ||
| handle = flushAttributesAsQuery(session, handle); | ||
| } | ||
|
|
@@ -261,6 +272,10 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti | |
| { | ||
| JdbcTableHandle handle = (JdbcTableHandle) table; | ||
|
|
||
| if (handle.isProcedure()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| List<JdbcColumnHandle> newColumns = assignments.values().stream() | ||
| .map(JdbcColumnHandle.class::cast) | ||
| .collect(toImmutableList()); | ||
|
|
@@ -315,6 +330,10 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega | |
|
|
||
| JdbcTableHandle handle = (JdbcTableHandle) table; | ||
|
|
||
| if (handle.isProcedure()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| // Global aggregation is represented by [[]] | ||
| verify(!groupingSets.isEmpty(), "No grouping sets provided"); | ||
|
|
||
|
|
@@ -416,6 +435,10 @@ public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin( | |
| return Optional.empty(); | ||
| } | ||
|
|
||
| if (((JdbcTableHandle) left).isProcedure() || ((JdbcTableHandle) right).isProcedure()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| JdbcTableHandle leftHandle = flushAttributesAsQuery(session, (JdbcTableHandle) left); | ||
| JdbcTableHandle rightHandle = flushAttributesAsQuery(session, (JdbcTableHandle) right); | ||
|
|
||
|
|
@@ -523,6 +546,10 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect | |
| { | ||
| JdbcTableHandle handle = (JdbcTableHandle) table; | ||
|
|
||
| if (handle.isProcedure()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| if (limit > Integer.MAX_VALUE) { | ||
| // Some databases, e.g. Phoenix, Redshift, do not support limit exceeding 2147483647. | ||
| return Optional.empty(); | ||
|
|
@@ -565,6 +592,10 @@ public Optional<TopNApplicationResult<ConnectorTableHandle>> applyTopN( | |
| verify(!sortItems.isEmpty(), "sortItems are empty"); | ||
| JdbcTableHandle handle = (JdbcTableHandle) table; | ||
|
|
||
| if (handle.isProcedure()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| List<JdbcSortItem> resultSortOrder = sortItems.stream() | ||
| .map(sortItem -> { | ||
| verify(assignments.containsKey(sortItem.getName()), "assignments does not contain sortItem: %s", sortItem.getName()); | ||
|
|
@@ -603,18 +634,23 @@ public Optional<TopNApplicationResult<ConnectorTableHandle>> applyTopN( | |
| @Override | ||
| public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle) | ||
| { | ||
| if (!(handle instanceof QueryFunctionHandle)) { | ||
| if (!(handle instanceof QueryFunctionHandle || handle instanceof ProcedureFunctionHandle)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| return Optional.empty(); | ||
| } | ||
| ConnectorTableHandle tableHandle; | ||
| if (handle instanceof QueryFunctionHandle queryFunctionHandle) { | ||
| tableHandle = queryFunctionHandle.getTableHandle(); | ||
| } | ||
| else { | ||
| tableHandle = ((ProcedureFunctionHandle) handle).getTableHandle(); | ||
| } | ||
|
|
||
| ConnectorTableHandle tableHandle = ((QueryFunctionHandle) handle).getTableHandle(); | ||
| ConnectorTableSchema tableSchema = getTableSchema(session, tableHandle); | ||
| Map<String, ColumnHandle> columnHandlesByName = getColumnHandles(session, tableHandle); | ||
| List<ColumnHandle> columnHandles = tableSchema.getColumns().stream() | ||
| .map(ColumnSchema::getName) | ||
| .map(columnHandlesByName::get) | ||
| .collect(toImmutableList()); | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rollback |
||
| return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles)); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.trino.plugin.jdbc; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
|
||
| import static java.lang.String.format; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class JdbcProcedureRelationHandle | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| extends JdbcRelationHandle | ||
| { | ||
| private final ProcedureQuery procedure; | ||
|
|
||
| @JsonCreator | ||
| public JdbcProcedureRelationHandle(ProcedureQuery procedureQuery) | ||
| { | ||
| this.procedure = requireNonNull(procedureQuery, "procedureQuery is null"); | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public ProcedureQuery getProcedureQuery() | ||
| { | ||
| return procedure; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() | ||
| { | ||
| return format("Procedure[%s]", procedure.getQuery()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will extract it to a new commit.