Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.connector.system;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.base.MappedPageSource;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -32,8 +34,6 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import io.trino.split.MappedPageSource;
import io.trino.split.MappedRecordSet;

import java.util.HashMap;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import io.trino.operator.window.pattern.PhysicalValueAccessor;
import io.trino.operator.window.pattern.PhysicalValuePointer;
import io.trino.operator.window.pattern.SetEvaluator.SetEvaluatorSupplier;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
Expand All @@ -172,7 +173,6 @@
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.spiller.SingleStreamSpillerFactory;
import io.trino.spiller.SpillerFactory;
import io.trino.split.MappedRecordSet;
import io.trino.split.PageSinkManager;
import io.trino.split.PageSourceProvider;
import io.trino.sql.DynamicFilters;
Expand Down
40 changes: 40 additions & 0 deletions docs/src/main/sphinx/connector/sqlserver.rst
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,46 @@ nations by population::
)
);

.. _sqlserver-procedure-function:

``procedure(varchar) -> table``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The ``procedure`` function allows you to run stored procedures on the underlying
database directly.

.. note::

The ``procedure`` function does not support running StoredProcedures that return multiple statements,
use a non-select statement, use output parameters, or use conditional statements.


The follow example runs the stored procedure ``employee_sp`` in the ``example`` catalog and the
``example_schema`` in the underlying SQL Server database::

SELECT
*
FROM
TABLE(
example.system.procedure(
schema => 'example_schema',
procedure => 'employee_sp'
)
);

If the stored procedure ``employee_sp`` requires any input, it can be specified via ``inputs``
argument as an ``array<varchar>``::
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if procedure expects an integer?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we pass the arguments as string - we pass it without quotes for integer and with additional quotes for string (if requested by the underlying datasource).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for SQLServer, it doesn't expect additional quotes. So do we need to document it ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’d be better to pass the arguments as a ROW, so each one has the proper type. You get type safety and avoid conversions that might not be possible to do easily or reliably.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide pointer on where/how it is established during the analysis step - Should we use a different Argument specification for this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of scalar argument is established at TF declaration.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we could pass arbitrary scalars, it seems right to use an array of varchar in this case. It shifts the responsibility of formatting them the right way to the user. That's in the spirit of query pass-through. This way it is harder for the user to pass parameters, or non-literal expressions, but I don't know if a real use-case would need those capabilities.

Like @kokosing noticed, we must watch out for possible abuse (injection).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table argument is the only argument type of table functions that is polymorphic . Technically, we could make use of it, and pass a scalar subquery containing parameters. But then we'd have to refactor the PTF to use the operator-based execution.


SELECT
*
FROM
TABLE(
example.system.procedure(
schema => 'example_schema',
procedure => 'employee_sp',
inputs => ARRAY['0']
Comment thread
Praveen2112 marked this conversation as resolved.
Outdated
)
);

Performance
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.split;
package io.trino.plugin.base;

import com.google.common.primitives.Ints;
import io.trino.spi.Page;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.split;
package io.trino.plugin.base;

import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.log.Logger;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.plugin.jdbc.mapping.IdentifierMapping;
import io.trino.plugin.jdbc.ptf.Procedure.ProcedureInformation;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -45,6 +46,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -245,44 +247,55 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
{
ImmutableList.Builder<JdbcColumnHandle> columns = ImmutableList.builder();
try (Connection connection = connectionFactory.openConnection(session);
PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) {
ResultSetMetaData metadata = preparedStatement.getMetaData();
if (metadata == null) {
throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + preparedQuery.getQuery());
}
for (int column = 1; column <= metadata.getColumnCount(); column++) {
// Use getColumnLabel method because query pass-through table function may contain column aliases
String name = metadata.getColumnLabel(column);
JdbcTypeHandle jdbcTypeHandle = new JdbcTypeHandle(
metadata.getColumnType(column),
Optional.ofNullable(metadata.getColumnTypeName(column)),
Optional.of(metadata.getPrecision(column)),
Optional.of(metadata.getScale(column)),
Optional.empty(), // TODO support arrays
Optional.of(metadata.isCaseSensitive(column) ? CASE_SENSITIVE : CASE_INSENSITIVE));
Type type = toColumnMapping(session, connection, jdbcTypeHandle)
.orElseThrow(() -> new UnsupportedOperationException(format("Unsupported type: %s of column: %s", jdbcTypeHandle, name)))
.getType();
columns.add(new JdbcColumnHandle(name, jdbcTypeHandle, type));
}
return new JdbcTableHandle(
new JdbcQueryRelationHandle(preparedQuery),
TupleDomain.all(),
ImmutableList.of(),
Optional.empty(),
OptionalLong.empty(),
Optional.of(getColumns(session, connection, metadata)),
// The query is opaque, so we don't know referenced tables
Optional.empty(),
0,
Optional.empty());
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, "Failed to get table handle for prepared query. " + firstNonNull(e.getMessage(), e), e);
}
}

@Override
public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, ProcedureInformation procedureInformation)
{
throw new TrinoException(NOT_SUPPORTED, "Procedure is not supported");
}

return new JdbcTableHandle(
new JdbcQueryRelationHandle(preparedQuery),
TupleDomain.all(),
ImmutableList.of(),
Optional.empty(),
OptionalLong.empty(),
Optional.of(columns.build()),
// The query is opaque, so we don't know referenced tables
Optional.empty(),
0,
Optional.empty());
protected List<JdbcColumnHandle> getColumns(ConnectorSession session, Connection connection, ResultSetMetaData metadata)
throws SQLException
{
ImmutableList.Builder<JdbcColumnHandle> columns = ImmutableList.builder();
for (int column = 1; column <= metadata.getColumnCount(); column++) {
// Use getColumnLabel method because query pass-through table function may contain column aliases
String name = metadata.getColumnLabel(column);
JdbcTypeHandle jdbcTypeHandle = new JdbcTypeHandle(
metadata.getColumnType(column),
Optional.ofNullable(metadata.getColumnTypeName(column)),
Optional.of(metadata.getPrecision(column)),
Optional.of(metadata.getScale(column)),
Optional.empty(), // TODO support arrays
Optional.of(metadata.isCaseSensitive(column) ? CASE_SENSITIVE : CASE_INSENSITIVE));
Type type = toColumnMapping(session, connection, jdbcTypeHandle)
.orElseThrow(() -> new UnsupportedOperationException(format("Unsupported type: %s of column: %s", jdbcTypeHandle, name)))
.getType();
columns.add(new JdbcColumnHandle(name, jdbcTypeHandle, type));
}
return columns.build();
}

@Override
Expand Down Expand Up @@ -418,11 +431,30 @@ public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle
return new FixedSplitSource(new JdbcSplit(Optional.empty()));
}

@Override
public ConnectorSplitSource getSplits(ConnectorSession session, JdbcProcedureHandle procedureHandle)
{
return new FixedSplitSource(new JdbcSplit(Optional.empty()));
}

@Override
public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcTableHandle tableHandle)
throws SQLException
{
verify(tableHandle.getAuthorization().isEmpty(), "Unexpected authorization is required for table: %s".formatted(tableHandle));
return getConnection(session);
}

@Override
public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcProcedureHandle procedureHandle)
throws SQLException
{
return getConnection(session);
}

private Connection getConnection(ConnectorSession session)
throws SQLException
{
Connection connection = connectionFactory.openConnection(session);
try {
connection.setReadOnly(true);
Expand Down Expand Up @@ -459,6 +491,13 @@ public PreparedStatement buildSql(ConnectorSession session, Connection connectio
return queryBuilder.prepareStatement(this, session, connection, preparedQuery);
}

@Override
public CallableStatement buildProcedure(ConnectorSession session, Connection connection, JdbcSplit split, JdbcProcedureHandle procedureHandle)
throws SQLException
{
return queryBuilder.callProcedure(this, session, connection, procedureHandle.getProcedureQuery());
}

protected PreparedQuery prepareQuery(
ConnectorSession session,
Connection connection,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.trino.spi.connector.ConnectorTableHandle;

import java.util.List;
import java.util.Optional;

public abstract class BaseJdbcConnectorTableHandle
implements ConnectorTableHandle
{
public abstract Optional<List<JdbcColumnHandle>> getColumns();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.jdbc.IdentityCacheMapping.IdentityCacheKey;
import io.trino.plugin.jdbc.ptf.Procedure.ProcedureInformation;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -45,6 +46,7 @@

import javax.inject.Inject;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class CachingJdbcClient
private final Cache<TableNamesCacheKey, List<SchemaTableName>> tableNamesCache;
private final Cache<TableHandlesByNameCacheKey, Optional<JdbcTableHandle>> tableHandlesByNameCache;
private final Cache<TableHandlesByQueryCacheKey, JdbcTableHandle> tableHandlesByQueryCache;
private final Cache<ProcedureHandlesByQueryCacheKey, JdbcProcedureHandle> procedureHandlesByQueryCache;
private final Cache<ColumnsCacheKey, List<JdbcColumnHandle>> columnsCache;
private final Cache<JdbcTableHandle, TableStatistics> statisticsCache;

Expand Down Expand Up @@ -148,6 +151,7 @@ public CachingJdbcClient(
tableNamesCache = buildCache(cacheSize, tableNamesCachingTtl);
tableHandlesByNameCache = buildCache(cacheSize, metadataCachingTtl);
tableHandlesByQueryCache = buildCache(cacheSize, metadataCachingTtl);
procedureHandlesByQueryCache = buildCache(cacheSize, metadataCachingTtl);
columnsCache = buildCache(cacheSize, metadataCachingTtl);
statisticsCache = buildCache(cacheSize, metadataCachingTtl);
}
Expand Down Expand Up @@ -235,13 +239,26 @@ public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle
return delegate.getSplits(session, tableHandle);
}

@Override
public ConnectorSplitSource getSplits(ConnectorSession session, JdbcProcedureHandle procedureHandle)
{
return delegate.getSplits(session, procedureHandle);
}

@Override
public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcTableHandle tableHandle)
throws SQLException
{
return delegate.getConnection(session, split, tableHandle);
}

@Override
public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcProcedureHandle procedureHandle)
throws SQLException
{
return delegate.getConnection(session, split, procedureHandle);
}

@Override
public void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
Expand All @@ -267,6 +284,13 @@ public PreparedStatement buildSql(ConnectorSession session, Connection connectio
return delegate.buildSql(session, connection, split, table, columns);
}

@Override
public CallableStatement buildProcedure(ConnectorSession session, Connection connection, JdbcSplit split, JdbcProcedureHandle procedureHandle)
throws SQLException
{
return delegate.buildProcedure(session, connection, split, procedureHandle);
}

@Override
public Optional<PreparedQuery> implementJoin(
ConnectorSession session,
Expand Down Expand Up @@ -327,6 +351,13 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery pr
return get(tableHandlesByQueryCache, key, () -> delegate.getTableHandle(session, preparedQuery));
}

@Override
public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, ProcedureInformation procedureInformation)
{
ProcedureHandlesByQueryCacheKey key = new ProcedureHandlesByQueryCacheKey(getIdentityKey(session), procedureInformation);
return get(procedureHandlesByQueryCache, key, () -> delegate.getProcedureHandle(session, procedureInformation));
}

@Override
public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
{
Expand Down Expand Up @@ -637,6 +668,12 @@ CacheStats getTableHandlesByQueryCacheStats()
return tableHandlesByQueryCache.stats();
}

@VisibleForTesting
CacheStats getProcedureHandlesByQueryCacheStats()
{
return procedureHandlesByQueryCache.stats();
}

@VisibleForTesting
CacheStats getColumnsCacheStats()
{
Expand Down Expand Up @@ -687,6 +724,15 @@ private record TableHandlesByQueryCacheKey(IdentityCacheKey identity, PreparedQu
}
}

private record ProcedureHandlesByQueryCacheKey(IdentityCacheKey identity, ProcedureInformation procedureInformation)
{
private ProcedureHandlesByQueryCacheKey
{
requireNonNull(identity, "identity is null");
requireNonNull(procedureInformation, "procedureInformation is null");
}
}

private record TableNamesCacheKey(IdentityCacheKey identity, Optional<String> schemaName)
{
private TableNamesCacheKey
Expand Down
Loading