Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.prestosql.plugin.jdbc.BaseJdbcPropertiesProvider.getUnsupportedTypeHandling;
import static io.prestosql.plugin.jdbc.ColumnMapping.DISABLE_PUSHDOWN;
import static io.prestosql.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
Expand All @@ -80,6 +81,8 @@
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varcharReadFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.prestosql.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
import static io.prestosql.plugin.jdbc.UnsupportedTypeHandling.IGNORE;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -257,10 +260,10 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
Optional<ColumnMapping> columnMapping = toPrestoType(session, connection, typeHandle);
log.debug("Mapping data type of '%s' column '%s': %s mapped to %s", tableHandle.getSchemaTableName(), columnName, typeHandle, columnMapping);
// skip unsupported column types
boolean nullable = (resultSet.getInt("NULLABLE") != columnNoNulls);
// Note: some databases (e.g. SQL Server) do not return column remarks/comment here.
Optional<String> comment = Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
if (columnMapping.isPresent()) {
boolean nullable = (resultSet.getInt("NULLABLE") != columnNoNulls);
// Note: some databases (e.g. SQL Server) do not return column remarks/comment here.
Optional<String> comment = Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
columns.add(JdbcColumnHandle.builder()
.setColumnName(columnName)
.setJdbcTypeHandle(typeHandle)
Expand All @@ -269,6 +272,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
.setComment(comment)
.build());
}
verify(columnMapping.isPresent() || getUnsupportedTypeHandling(session) == IGNORE, "Unsupported type handling is set to %s, but toPrestoType() returned empty");
}
if (columns.isEmpty()) {
// A table may have no supported columns. In rare cases (e.g. PostgreSQL) a table might have no columns at all.
Expand Down Expand Up @@ -300,26 +304,37 @@ public Optional<ColumnMapping> toPrestoType(ConnectorSession session, Connection
if (mapping.isPresent()) {
return mapping;
}
return jdbcTypeToPrestoType(session, typeHandle);
Optional<ColumnMapping> connectorMapping = jdbcTypeToPrestoType(session, typeHandle);
if (connectorMapping.isPresent()) {
return connectorMapping;
}
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
return mapToUnboundedVarchar(typeHandle);
}
return Optional.empty();
}

protected Optional<ColumnMapping> getForcedMappingToVarchar(JdbcTypeHandle typeHandle)
{
if (typeHandle.getJdbcTypeName().isPresent() && jdbcTypesMappedToVarchar.contains(typeHandle.getJdbcTypeName().get())) {
return Optional.of(ColumnMapping.sliceMapping(
createUnboundedVarcharType(),
varcharReadFunction(),
(statement, index, value) -> {
// TODO this should be handled during planning phase
throw new PrestoException(
NOT_SUPPORTED,
"Underlying type that is force mapped to VARCHAR is not supported for INSERT: " + typeHandle.getJdbcTypeName().get());
},
DISABLE_PUSHDOWN));
return mapToUnboundedVarchar(typeHandle);
}
return Optional.empty();
}

private static Optional<ColumnMapping> mapToUnboundedVarchar(JdbcTypeHandle typeHandle)
{
return Optional.of(ColumnMapping.sliceMapping(
createUnboundedVarcharType(),
varcharReadFunction(),
(statement, index, value) -> {
throw new PrestoException(
NOT_SUPPORTED,
"Underlying type that is mapped to VARCHAR is not supported for INSERT: " + typeHandle.getJdbcTypeName().get());
},
DISABLE_PUSHDOWN));
}

@Override
public ConnectorSplitSource getSplits(JdbcIdentity identity, JdbcTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

Expand All @@ -32,6 +33,7 @@ public class BaseJdbcConfig
private boolean caseInsensitiveNameMatching;
private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES);
private Set<String> jdbcTypesMappedToVarchar = ImmutableSet.of();
private UnsupportedTypeHandling unsupportedTypeHandling = UnsupportedTypeHandling.IGNORE;

@NotNull
public String getConnectionUrl()
Expand Down Expand Up @@ -83,4 +85,18 @@ public BaseJdbcConfig setJdbcTypesMappedToVarchar(String jdbcTypesMappedToVarcha
this.jdbcTypesMappedToVarchar = ImmutableSet.copyOf(Splitter.on(",").omitEmptyStrings().trimResults().split(nullToEmpty(jdbcTypesMappedToVarchar)));
return this;
}

@NotNull
public UnsupportedTypeHandling getUnsupportedTypeHandling()
{
return unsupportedTypeHandling;
}

@Config("unsupported-type-handling")
@ConfigDescription("Unsupported type handling strategy")
public BaseJdbcConfig setUnsupportedTypeHandling(UnsupportedTypeHandling unsupportedTypeHandling)
{
this.unsupportedTypeHandling = unsupportedTypeHandling;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.session.PropertyMetadata;

import javax.inject.Inject;

import java.util.List;

import static io.prestosql.spi.session.PropertyMetadata.enumProperty;

public class BaseJdbcPropertiesProvider
implements SessionPropertiesProvider
{
public static final String UNSUPPORTED_TYPE_HANDLING = "unsupported_type_handling";

private final List<PropertyMetadata<?>> properties;

@Inject
public BaseJdbcPropertiesProvider(BaseJdbcConfig baseJdbcConfig)
{
properties = ImmutableList.of(
enumProperty(
UNSUPPORTED_TYPE_HANDLING,
"Unsupported type handling strategy",
UnsupportedTypeHandling.class,
baseJdbcConfig.getUnsupportedTypeHandling(),
false));
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return properties;
}

public static UnsupportedTypeHandling getUnsupportedTypeHandling(ConnectorSession session)
{
return session.getProperty(UNSUPPORTED_TYPE_HANDLING, UnsupportedTypeHandling.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void configure(Binder binder)

newOptionalBinder(binder, ConnectorAccessControl.class);
newSetBinder(binder, Procedure.class);
newSetBinder(binder, SessionPropertiesProvider.class);
newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(BaseJdbcPropertiesProvider.class);
binder.bind(JdbcMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(JdbcSplitManager.class).in(Scopes.SINGLETON);
binder.bind(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@
import java.sql.SQLNonTransientException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.prestosql.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;

Expand Down Expand Up @@ -96,11 +95,8 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
}
else {
List<ColumnMapping> columnMappings = handle.getJdbcColumnTypes().get().stream()
.map(typeHandle -> {
Optional<ColumnMapping> columnMapping = jdbcClient.toPrestoType(session, connection, typeHandle);
checkState(columnMapping.isPresent(), "missing column mapping");
return columnMapping.get();
})
.map(typeHandle -> jdbcClient.toPrestoType(session, connection, typeHandle)
.orElseThrow(() -> new PrestoException(NOT_SUPPORTED, "Underlying type is not supported for INSERT: " + typeHandle)))
.collect(toImmutableList());

columnWriters = columnMappings.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.jdbc;

public enum UnsupportedTypeHandling
{
IGNORE,
CONVERT_TO_VARCHAR,
/**/;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public static DistributedQueryRunner createH2QueryRunner(TpchTable<?>... tables)

public static DistributedQueryRunner createH2QueryRunner(Iterable<TpchTable<?>> tables)
throws Exception

{
return createH2QueryRunner(tables, TestingH2JdbcModule.createProperties());
}

public static DistributedQueryRunner createH2QueryRunner(Iterable<TpchTable<?>> tables, Map<String, String> properties)
throws Exception
{
DistributedQueryRunner queryRunner = null;
try {
Expand All @@ -52,7 +59,6 @@ public static DistributedQueryRunner createH2QueryRunner(Iterable<TpchTable<?>>
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Map<String, String> properties = TestingH2JdbcModule.createProperties();
createSchema(properties, "tpch");

queryRunner.installPlugin(new JdbcPlugin("base-jdbc", new TestingH2JdbcModule()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testDefaults()
.setConnectionUrl(null)
.setCaseInsensitiveNameMatching(false)
.setCaseInsensitiveNameMatchingCacheTtl(new Duration(1, MINUTES))
.setJdbcTypesMappedToVarchar(null));
.setJdbcTypesMappedToVarchar(null)
.setUnsupportedTypeHandling(UnsupportedTypeHandling.IGNORE));
}

@Test
Expand All @@ -47,13 +48,15 @@ public void testExplicitPropertyMappings()
.put("case-insensitive-name-matching", "true")
.put("case-insensitive-name-matching.cache-ttl", "1s")
.put("jdbc-types-mapped-to-varchar", "mytype,struct_type1")
.put("unsupported-type-handling", "CONVERT_TO_VARCHAR")
.build();

BaseJdbcConfig expected = new BaseJdbcConfig()
.setConnectionUrl("jdbc:h2:mem:config")
.setCaseInsensitiveNameMatching(true)
.setCaseInsensitiveNameMatchingCacheTtl(new Duration(1, SECONDS))
.setJdbcTypesMappedToVarchar("mytype, struct_type1");
.setJdbcTypesMappedToVarchar("mytype, struct_type1")
.setUnsupportedTypeHandling(UnsupportedTypeHandling.CONVERT_TO_VARCHAR);

assertFullMapping(properties, expected);

Expand Down
Loading