Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.plugin.jdbc.mapping.ReadMapping;
import com.facebook.presto.plugin.jdbc.mapping.WriteMapping;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -71,7 +73,8 @@
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.jdbc.JdbcWarningCode.USE_OF_DEPRECATED_CONFIGURATION_PROPERTY;
import static com.facebook.presto.plugin.jdbc.StandardReadMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.jdbcTypeToReadMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.prestoTypeToWriteMapping;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand Down Expand Up @@ -246,13 +249,13 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Optional<ReadMapping> readMapping = toPrestoType(session, typeHandle);
// skip unsupported column types
if (columnMapping.isPresent()) {
if (readMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
boolean nullable = columnNullable == resultSet.getInt("NULLABLE");
Optional<String> comment = Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
columns.add(new JdbcColumnHandle(connectorId, columnName, typeHandle, columnMapping.get().getType(), nullable, comment));
columns.add(new JdbcColumnHandle(connectorId, columnName, typeHandle, readMapping.get().getType(), nullable, comment));
}
}
if (columns.isEmpty()) {
Expand All @@ -274,7 +277,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
return jdbcTypeToPrestoType(typeHandle);
return jdbcTypeToReadMapping(typeHandle);
}

@Override
Expand Down Expand Up @@ -751,7 +754,6 @@ protected void execute(Connection connection, String query)
statement.execute(query);
}
}

protected String toSqlType(Type type)
{
if (isVarcharType(type)) {
Expand All @@ -778,6 +780,15 @@ protected String toSqlType(Type type)
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}

public WriteMapping toWriteMapping(Type type)
{
Optional<WriteMapping> writeMapping = prestoTypeToWriteMapping(type);
if (writeMapping.isPresent()) {
return writeMapping.get();
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}

protected String quoted(String name)
{
name = name.replace(identifierQuote, identifierQuote + identifierQuote);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.plugin.jdbc.mapping.ReadMapping;
import com.facebook.presto.plugin.jdbc.mapping.WriteMapping;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -51,6 +54,8 @@ default boolean schemaExists(ConnectorSession session, JdbcIdentity identity, St

Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);

WriteMapping toWriteMapping(Type type);

ConnectorSplitSource getSplits(ConnectorSession session, JdbcIdentity identity, JdbcTableLayoutHandle layoutHandle);

Connection getConnection(ConnectorSession session, JdbcIdentity identity, JdbcSplit split)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,34 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.plugin.jdbc.mapping.WriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.BooleanWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.DoubleWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.LongWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.ObjectWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.SliceWriteFunction;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import org.joda.time.DateTimeZone;

import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.Chars.isCharType;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.Decimals.readBigDecimal;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.UuidType.prestoUuidToJavaUuid;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.DAYS;
import static org.joda.time.chrono.ISOChronology.getInstanceUTC;

public class JdbcPageSink
implements ConnectorPageSink
Expand All @@ -71,6 +54,7 @@ public class JdbcPageSink
private final PreparedStatement statement;

private final List<Type> columnTypes;
private final List<WriteFunction> columnWriters;
private int batchSize;

public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient)
Expand All @@ -92,6 +76,12 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
}

columnTypes = handle.getColumnTypes();
columnWriters = columnTypes.stream().map(type -> {
WriteFunction writeFunction = jdbcClient.toWriteMapping(type).getWriteFunction();
verify(type.getJavaType() == writeFunction.getJavaType(),
format("Presto type %s is not compatible with write function %s accepting %s", type, writeFunction, writeFunction.getJavaType()));
return writeFunction;
}).collect(toImmutableList());
}

@Override
Expand Down Expand Up @@ -132,55 +122,27 @@ private void appendColumn(Page page, int position, int channel)
}

Type type = columnTypes.get(channel);
if (BOOLEAN.equals(type)) {
statement.setBoolean(parameter, type.getBoolean(block, position));
Class<?> javaType = type.getJavaType();
WriteFunction writeFunction = columnWriters.get(channel);
if (javaType == boolean.class) {
((BooleanWriteFunction) writeFunction).set(statement, parameter, type.getBoolean(block, position));
}
else if (BIGINT.equals(type)) {
statement.setLong(parameter, type.getLong(block, position));
else if (javaType == long.class) {
((LongWriteFunction) writeFunction).set(statement, parameter, type.getLong(block, position));
}
else if (INTEGER.equals(type)) {
statement.setInt(parameter, toIntExact(type.getLong(block, position)));
else if (javaType == double.class) {
((DoubleWriteFunction) writeFunction).set(statement, parameter, type.getDouble(block, position));
}
else if (SMALLINT.equals(type)) {
statement.setShort(parameter, Shorts.checkedCast(type.getLong(block, position)));
}
else if (TINYINT.equals(type)) {
statement.setByte(parameter, SignedBytes.checkedCast(type.getLong(block, position)));
}
else if (DOUBLE.equals(type)) {
statement.setDouble(parameter, type.getDouble(block, position));
}
else if (REAL.equals(type)) {
statement.setFloat(parameter, intBitsToFloat(toIntExact(type.getLong(block, position))));
}
else if (type instanceof DecimalType) {
statement.setBigDecimal(parameter, readBigDecimal((DecimalType) type, block, position));
}
else if (isVarcharType(type) || isCharType(type)) {
statement.setString(parameter, type.getSlice(block, position).toStringUtf8());
}
else if (VARBINARY.equals(type)) {
statement.setBytes(parameter, type.getSlice(block, position).getBytes());
}
else if (DATE.equals(type)) {
// convert to midnight in default time zone
long utcMillis = DAYS.toMillis(type.getLong(block, position));
long localMillis = getInstanceUTC().getZone().getMillisKeepLocal(DateTimeZone.getDefault(), utcMillis);
statement.setDate(parameter, new Date(localMillis));
}
else if (type instanceof TimestampType) {
long timestampValue = type.getLong(block, position);
statement.setTimestamp(parameter,
Timestamp.from(Instant.ofEpochSecond(
((TimestampType) type).getEpochSecond(timestampValue),
((TimestampType) type).getNanos(timestampValue))));
}
else if (UuidType.UUID.equals(type)) {
Slice slice = type.getSlice(block, position);
statement.setObject(parameter, prestoUuidToJavaUuid(slice));
else if (javaType == Slice.class) {
((SliceWriteFunction) writeFunction).set(statement, parameter, type.getSlice(block, position));
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
try {
((ObjectWriteFunction) writeFunction).set(statement, parameter, type.getObject(block, position));
}
catch (SQLException e) {
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.plugin.jdbc.mapping.ReadFunction;
import com.facebook.presto.plugin.jdbc.mapping.ReadMapping;
import com.facebook.presto.plugin.jdbc.mapping.functions.BooleanReadFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.DoubleReadFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.LongReadFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.ObjectReadFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.SliceReadFunction;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
Expand All @@ -31,7 +38,6 @@
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class JdbcRecordCursor
Expand All @@ -44,6 +50,7 @@ public class JdbcRecordCursor
private final DoubleReadFunction[] doubleReadFunctions;
private final LongReadFunction[] longReadFunctions;
private final SliceReadFunction[] sliceReadFunctions;
private final ObjectReadFunction[] objectReadFunctions;

private final JdbcClient jdbcClient;
private final Connection connection;
Expand All @@ -61,6 +68,7 @@ public JdbcRecordCursor(JdbcClient jdbcClient, ConnectorSession session, JdbcSpl
doubleReadFunctions = new DoubleReadFunction[columnHandles.size()];
longReadFunctions = new LongReadFunction[columnHandles.size()];
sliceReadFunctions = new SliceReadFunction[columnHandles.size()];
objectReadFunctions = new ObjectReadFunction[columnHandles.size()];

for (int i = 0; i < this.columnHandles.length; i++) {
ReadMapping readMapping = jdbcClient.toPrestoType(session, columnHandles.get(i).getJdbcTypeHandle())
Expand All @@ -81,7 +89,12 @@ else if (javaType == Slice.class) {
sliceReadFunctions[i] = (SliceReadFunction) readFunction;
}
else {
throw new IllegalStateException(format("Unsupported java type %s", javaType));
try {
objectReadFunctions[i] = (ObjectReadFunction) readFunction;
}
catch (NullPointerException e) {
throw new UnsupportedOperationException();
}
}
}

Expand Down Expand Up @@ -180,7 +193,13 @@ public Slice getSlice(int field)
@Override
public Object getObject(int field)
{
throw new UnsupportedOperationException();
checkState(!closed, "cursor is closed");
try {
return objectReadFunctions[field].readObject(resultSet, field + 1);
}
catch (SQLException | RuntimeException e) {
throw handleSqlException(e);
}
}

@Override
Expand Down
Loading
Loading