Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand All @@ -68,6 +69,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.jdbc.CaseSensitivity.CASE_INSENSITIVE;
import static io.trino.plugin.jdbc.CaseSensitivity.CASE_SENSITIVE;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.isNonTransactionalInsert;
import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN;
Expand Down Expand Up @@ -221,6 +224,45 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
}
}

@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
{
ImmutableList.Builder<JdbcColumnHandle> columns = ImmutableList.builder();
try (Connection connection = connectionFactory.openConnection(session);
PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) {
ResultSetMetaData metaData = preparedStatement.getMetaData();
if (metaData == null) {
throw new UnsupportedOperationException("ResultSetMetaData not provided for query");
}
for (int column = 1; column <= metaData.getColumnCount(); column++) {
JdbcTypeHandle jdbcTypeHandle = new JdbcTypeHandle(
metaData.getColumnType(column),
Optional.ofNullable(metaData.getColumnTypeName(column)),
Optional.of(metaData.getPrecision(column)),
Optional.of(metaData.getScale(column)),
Optional.empty(), // TODO support arrays
Optional.of(metaData.isCaseSensitive(column) ? CASE_SENSITIVE : CASE_INSENSITIVE));
Type type = toColumnMapping(session, connection, jdbcTypeHandle)
.orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + jdbcTypeHandle))
.getType();
columns.add(new JdbcColumnHandle(metaData.getColumnName(column), jdbcTypeHandle, type));
}
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}

return new JdbcTableHandle(
new JdbcQueryRelationHandle(preparedQuery),
TupleDomain.all(),
ImmutableList.of(),
Optional.empty(),
OptionalLong.empty(),
Optional.of(columns.build()),
ImmutableSet.of(), // TODO return other tables referenced by the query
0);
}

@Override
public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
return get(tableHandleCache, key, () -> delegate.getTableHandle(session, schemaTableName));
}

@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
{
return delegate.getTableHandle(session, preparedQuery); // TODO add caching?
}

@Override
public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.trino.plugin.jdbc.PredicatePushdownController.DomainPushdownResult;
import io.trino.plugin.jdbc.ptf.RemoteQuery.RemoteQueryHandle;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
Expand All @@ -45,6 +47,7 @@
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
Expand All @@ -53,6 +56,7 @@
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.security.AccessDeniedException;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
Expand Down Expand Up @@ -123,6 +127,12 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName
.orElse(null);
}

@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
{
return jdbcClient.getTableHandle(session, preparedQuery);
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
Expand Down Expand Up @@ -318,10 +328,10 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
.flatMap(List::stream)
.distinct()
.peek(handle.getColumns().<Consumer<JdbcColumnHandle>>map(
columns -> groupKey -> verify(columns.contains(groupKey),
"applyAggregation called with a grouping column %s which was not included in the table columns: %s",
groupKey,
tableColumns))
columns -> groupKey -> verify(columns.contains(groupKey),
"applyAggregation called with a grouping column %s which was not included in the table columns: %s",
groupKey,
tableColumns))
.orElse(groupKey -> {}))
.forEach(newColumns::add);

Expand Down Expand Up @@ -559,6 +569,24 @@ public Optional<TopNApplicationResult<ConnectorTableHandle>> applyTopN(
return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNGuaranteed(session), precalculateStatisticsForPushdown));
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
if (!(handle instanceof RemoteQueryHandle)) {
return Optional.empty();
}

ConnectorTableHandle tableHandle = ((RemoteQueryHandle) handle).getTableHandle();
ConnectorTableSchema tableSchema = getTableSchema(session, tableHandle);
Map<String, ColumnHandle> columnHandlesByName = getColumnHandles(session, tableHandle);
List<ColumnHandle> columnHandles = tableSchema.getColumns().stream()
.map(ColumnSchema::getName)
.map(columnHandlesByName::get)
.collect(toImmutableList());

return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}

@Override
public Optional<TableScanRedirectApplicationResult> applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
return delegate().getTableHandle(session, schemaTableName);
}

@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
{
return delegate().getTableHandle(session, preparedQuery);
}

@Override
public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ default boolean schemaExists(ConnectorSession session, String schema)

Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName);

JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery);

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

Expand All @@ -50,6 +51,7 @@ public class JdbcConnector
private final ConnectorPageSinkProvider jdbcPageSinkProvider;
private final Optional<ConnectorAccessControl> accessControl;
private final Set<Procedure> procedures;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> tableProperties;
private final JdbcTransactionManager transactionManager;
Expand All @@ -62,6 +64,7 @@ public JdbcConnector(
ConnectorPageSinkProvider jdbcPageSinkProvider,
Optional<ConnectorAccessControl> accessControl,
Set<Procedure> procedures,
Set<ConnectorTableFunction> connectorTableFunctions,
Set<SessionPropertiesProvider> sessionProperties,
Set<TablePropertiesProvider> tableProperties,
JdbcTransactionManager transactionManager)
Expand All @@ -72,6 +75,7 @@ public JdbcConnector(
this.jdbcPageSinkProvider = requireNonNull(jdbcPageSinkProvider, "jdbcPageSinkProvider is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "tableFunctions is null"));
this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
.collect(toImmutableList());
Expand Down Expand Up @@ -135,6 +139,12 @@ public Set<Procedure> getProcedures()
return procedures;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package io.trino.plugin.jdbc;

import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;

public interface JdbcMetadata
extends ConnectorMetadata
{
JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery);

void rollback();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.ptf.ConnectorTableFunction;

import javax.annotation.PreDestroy;
import javax.inject.Provider;
Expand Down Expand Up @@ -83,6 +84,8 @@ public void setup(Binder binder)

newSetBinder(binder, Procedure.class).addBinding().toProvider(FlushJdbcMetadataCacheProcedure.class).in(Scopes.SINGLETON);

newSetBinder(binder, ConnectorTableFunction.class);

binder.bind(ConnectionFactory.class)
.annotatedWith(ForLazyConnectionFactory.class)
.to(Key.get(ConnectionFactory.class, StatsCollecting.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
return stats.getGetTableHandle().wrap(() -> delegate().getTableHandle(session, schemaTableName));
}

@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
{
return delegate().getTableHandle(session, preparedQuery); // TODO add stats?
}

@Override
public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc.ptf;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcMetadata;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.plugin.jdbc.JdbcTransactionManager;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.ptf.Argument;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.ptf.Descriptor;
import io.trino.spi.ptf.Descriptor.Field;
import io.trino.spi.ptf.ScalarArgument;
import io.trino.spi.ptf.ScalarArgumentSpecification;

import javax.inject.Inject;
import javax.inject.Provider;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.spi.ptf.DescriptorMapping.EMPTY_MAPPING;
import static io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class RemoteQuery
implements Provider<ConnectorTableFunction>
{
public static final String NAME = "remote_query";

private final JdbcTransactionManager transactionManager;

@Inject
public RemoteQuery(JdbcTransactionManager transactionManager)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
public ConnectorTableFunction get()
{
// TODO wrap in ClassLoaderSafeConnectorTableFunction? (see also TestClassLoaderSafeWrappers)
return new RemoteQueryFunction(transactionManager);
}

public static class RemoteQueryFunction
extends ConnectorTableFunction
{
public static final String SCHEMA_NAME = "system";

private final JdbcTransactionManager transactionManager;

public RemoteQueryFunction(JdbcTransactionManager transactionManager)
{
super(SCHEMA_NAME, NAME, List.of(new ScalarArgumentSpecification("query", VARCHAR)), GENERIC_TABLE);
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
public Analysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map<String, Argument> arguments)
{
ScalarArgument argument = (ScalarArgument) getOnlyElement(arguments.values());
String query = ((Slice) argument.getValue()).toStringUtf8();
PreparedQuery preparedQuery = new PreparedQuery(query, ImmutableList.of());

JdbcMetadata metadata = transactionManager.getMetadata(transaction);
JdbcTableHandle tableHandle = metadata.getTableHandle(session, preparedQuery);
List<JdbcColumnHandle> columns = tableHandle.getColumns().orElseThrow(() -> new IllegalStateException("Query result has no columns"));
Descriptor returnedType = new Descriptor(columns.stream()
.map(column -> new Field(column.getColumnName(), Optional.of(column.getColumnType())))
.collect(toList()));

RemoteQueryHandle handle = new RemoteQueryHandle(tableHandle);

return new Analysis(Optional.of(returnedType), EMPTY_MAPPING, handle);
}
}

public static class RemoteQueryHandle
implements ConnectorTableFunctionHandle
{
private final JdbcTableHandle tableHandle;

@JsonCreator
public RemoteQueryHandle(@JsonProperty("tableHandle") JdbcTableHandle tableHandle)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
}

@JsonProperty
public ConnectorTableHandle getTableHandle()
{
return tableHandle;
}
}
}
Loading