Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.CharType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.plugin.jdbc.mapping.ColumnMapping;
import com.facebook.presto.plugin.jdbc.mapping.WriteMapping;
import com.facebook.presto.plugin.jdbc.mapping.functions.BooleanWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.DoubleWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.LongWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.SliceWriteFunction;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -57,21 +64,41 @@

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.CharType.MAX_LENGTH;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we miss adding support for this since you have added support for TIME, TIME_WITH_TIME_ZONE and TIMESTAMP_WITH_TIME_ZONE.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I didn't see any implementations for them in the original (although I don't think they had proper support back then so it probably was not missed, just not needed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, all of them were not supported back then. Since you added support for some of them now, was just checking to see if you missed TIMESTAMP somehow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see - Timestamp types require some type info to get the correct write function so the implementation for timestamp is here:
https://github.com/prestodb/presto/pull/25124/files#diff-a585323eb9886bcb1569de59ffc9ba78fb29e5da66b43666aa0072a8de747172R816

import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.jdbc.JdbcWarningCode.USE_OF_DEPRECATED_CONFIGURATION_PROPERTY;
import static com.facebook.presto.plugin.jdbc.StandardReadMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.bigintColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.booleanColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.charColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.dateColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.decimalColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.doubleColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.integerColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.realColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.smallintColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timeColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timeWithTimeZoneColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timestampColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timestampWithTimeZoneColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.tinyintColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.uuidColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.varbinaryColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.varcharColumnMapping;
import static com.facebook.presto.plugin.jdbc.mapping.WriteMapping.booleanMapping;
import static com.facebook.presto.plugin.jdbc.mapping.WriteMapping.longMapping;
import static com.facebook.presto.plugin.jdbc.mapping.WriteMapping.sliceMapping;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -94,21 +121,21 @@ public class BaseJdbcClient
{
private static final Logger log = Logger.get(BaseJdbcClient.class);

private static final Map<Type, String> SQL_TYPES = ImmutableMap.<Type, String>builder()
.put(BOOLEAN, "boolean")
.put(BIGINT, "bigint")
.put(INTEGER, "integer")
.put(SMALLINT, "smallint")
.put(TINYINT, "tinyint")
.put(DOUBLE, "double precision")
.put(REAL, "real")
.put(VARBINARY, "varbinary")
.put(DATE, "date")
.put(TIME, "time")
.put(TIME_WITH_TIME_ZONE, "time with timezone")
.put(TIMESTAMP, "timestamp")
.put(TIMESTAMP_WITH_TIME_ZONE, "timestamp with timezone")
.put(UuidType.UUID, "uuid")
private static final Map<Type, WriteMapping> TYPE_MAPPINGS = ImmutableMap.<Type, WriteMapping>builder()
.put(BOOLEAN, booleanMapping("boolean", (BooleanWriteFunction) booleanColumnMapping().getWriteFunction()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these explicit casts to respective writeFunctions needed?

Copy link
Contributor Author

@infvg infvg May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes these are the default write functions for these types & will be used when getting a write function for a specific type.

.put(BIGINT, longMapping("bigint", (LongWriteFunction) bigintColumnMapping().getWriteFunction()))
.put(INTEGER, longMapping("integer", (LongWriteFunction) integerColumnMapping().getWriteFunction()))
.put(SMALLINT, longMapping("smallint", (LongWriteFunction) smallintColumnMapping().getWriteFunction()))
.put(TINYINT, longMapping("tinyint", (LongWriteFunction) tinyintColumnMapping().getWriteFunction()))
.put(DOUBLE, WriteMapping.doubleMapping("double precision", (DoubleWriteFunction) doubleColumnMapping().getWriteFunction()))
.put(REAL, longMapping("real", (LongWriteFunction) realColumnMapping().getWriteFunction()))
.put(VARBINARY, sliceMapping("varbinary", (SliceWriteFunction) varbinaryColumnMapping().getWriteFunction()))
.put(DATE, longMapping("date", (LongWriteFunction) dateColumnMapping().getWriteFunction()))
.put(TIME, longMapping("time", (LongWriteFunction) timeColumnMapping().getWriteFunction()))
.put(UuidType.UUID, sliceMapping("uuid", (SliceWriteFunction) uuidColumnMapping().getWriteFunction()))

.put(TIME_WITH_TIME_ZONE, longMapping("time with timezone", (LongWriteFunction) timeWithTimeZoneColumnMapping().getWriteFunction()))
.put(TIMESTAMP_WITH_TIME_ZONE, longMapping("timestamp with timezone", (LongWriteFunction) timestampWithTimeZoneColumnMapping().getWriteFunction()))
.build();

protected final String connectorId;
Expand Down Expand Up @@ -246,7 +273,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Optional<ColumnMapping> columnMapping = toPrestoType(session, typeHandle);
// skip unsupported column types
if (columnMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
Expand All @@ -272,7 +299,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
}

@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
public Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
return jdbcTypeToPrestoType(typeHandle);
}
Expand Down Expand Up @@ -408,7 +435,7 @@ private String getColumnString(ColumnMetadata column, String columnName)
StringBuilder sb = new StringBuilder()
.append(quoted(columnName))
.append(" ")
.append(toSqlType(column.getType()));
.append(toWriteMapping(column.getType()).getDataType());
if (!column.isNullable()) {
sb.append(" NOT NULL");
}
Expand Down Expand Up @@ -752,28 +779,45 @@ protected void execute(Connection connection, String query)
}
}

protected String toSqlType(Type type)
public WriteMapping toWriteMapping(Type type)
{
String dataType;
if (isVarcharType(type)) {
VarcharType varcharType = (VarcharType) type;
if (varcharType.isUnbounded()) {
return "varchar";
dataType = "varchar";
}
return "varchar(" + varcharType.getLengthSafe() + ")";
else {
dataType = "varchar(" + varcharType.getLengthSafe() + ")";
}
return sliceMapping(dataType, (SliceWriteFunction) varcharColumnMapping(varcharType).getWriteFunction());
}
else if (type instanceof CharType) {
CharType charType = (CharType) type;
if (charType.getLength() == MAX_LENGTH) {
dataType = "char";
}
else {
dataType = "char(" + ((CharType) type).getLength() + ")";
}
return sliceMapping(dataType, (SliceWriteFunction) charColumnMapping(charType).getWriteFunction());
}
if (type instanceof CharType) {
if (((CharType) type).getLength() == CharType.MAX_LENGTH) {
return "char";
else if (type instanceof DecimalType) {
DecimalType decimalType = (DecimalType) type;
dataType = format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
if (decimalType.isShort()) {
return longMapping(dataType, (LongWriteFunction) decimalColumnMapping(decimalType).getWriteFunction());
}
else {
return sliceMapping(dataType, (SliceWriteFunction) decimalColumnMapping(decimalType).getWriteFunction());
}
return "char(" + ((CharType) type).getLength() + ")";
}
if (type instanceof DecimalType) {
return format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
else if (type instanceof TimestampType) {
return longMapping("timestamp", (LongWriteFunction) timestampColumnMapping((TimestampType) type).getWriteFunction());
}

String sqlType = SQL_TYPES.get(type);
if (sqlType != null) {
return sqlType;
WriteMapping writeMapping = TYPE_MAPPINGS.get(type);
if (writeMapping != null) {
return writeMapping;
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.plugin.jdbc.mapping.ColumnMapping;
import com.facebook.presto.plugin.jdbc.mapping.WriteMapping;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -49,7 +52,9 @@ default boolean schemaExists(ConnectorSession session, JdbcIdentity identity, St

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);
Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);

WriteMapping toWriteMapping(Type type);

ConnectorSplitSource getSplits(ConnectorSession session, JdbcIdentity identity, JdbcTableLayoutHandle layoutHandle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,34 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.plugin.jdbc.mapping.WriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.BooleanWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.DoubleWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.LongWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.ObjectWriteFunction;
import com.facebook.presto.plugin.jdbc.mapping.functions.SliceWriteFunction;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import org.joda.time.DateTimeZone;

import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.Chars.isCharType;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.Decimals.readBigDecimal;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.UuidType.prestoUuidToJavaUuid;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.DAYS;
import static org.joda.time.chrono.ISOChronology.getInstanceUTC;

public class JdbcPageSink
implements ConnectorPageSink
Expand All @@ -71,6 +54,7 @@ public class JdbcPageSink
private final PreparedStatement statement;

private final List<Type> columnTypes;
private final List<WriteFunction> columnWriters;
private int batchSize;

public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient)
Expand All @@ -92,6 +76,12 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
}

columnTypes = handle.getColumnTypes();
columnWriters = columnTypes.stream().map(type -> {
WriteFunction writeFunction = jdbcClient.toWriteMapping(type).getWriteFunction();
verify(type.getJavaType() == writeFunction.getJavaType(),
format("Presto type %s is not compatible with write function %s accepting %s", type, writeFunction, writeFunction.getJavaType()));
return writeFunction;
}).collect(toImmutableList());
}

@Override
Expand Down Expand Up @@ -132,55 +122,27 @@ private void appendColumn(Page page, int position, int channel)
}

Type type = columnTypes.get(channel);
if (BOOLEAN.equals(type)) {
statement.setBoolean(parameter, type.getBoolean(block, position));
Class<?> javaType = type.getJavaType();
WriteFunction writeFunction = columnWriters.get(channel);
if (javaType == boolean.class) {
((BooleanWriteFunction) writeFunction).set(statement, parameter, type.getBoolean(block, position));
}
else if (BIGINT.equals(type)) {
statement.setLong(parameter, type.getLong(block, position));
else if (javaType == long.class) {
((LongWriteFunction) writeFunction).set(statement, parameter, type.getLong(block, position));
}
else if (INTEGER.equals(type)) {
statement.setInt(parameter, toIntExact(type.getLong(block, position)));
else if (javaType == double.class) {
((DoubleWriteFunction) writeFunction).set(statement, parameter, type.getDouble(block, position));
}
else if (SMALLINT.equals(type)) {
statement.setShort(parameter, Shorts.checkedCast(type.getLong(block, position)));
}
else if (TINYINT.equals(type)) {
statement.setByte(parameter, SignedBytes.checkedCast(type.getLong(block, position)));
}
else if (DOUBLE.equals(type)) {
statement.setDouble(parameter, type.getDouble(block, position));
}
else if (REAL.equals(type)) {
statement.setFloat(parameter, intBitsToFloat(toIntExact(type.getLong(block, position))));
}
else if (type instanceof DecimalType) {
statement.setBigDecimal(parameter, readBigDecimal((DecimalType) type, block, position));
}
else if (isVarcharType(type) || isCharType(type)) {
statement.setString(parameter, type.getSlice(block, position).toStringUtf8());
}
else if (VARBINARY.equals(type)) {
statement.setBytes(parameter, type.getSlice(block, position).getBytes());
}
else if (DATE.equals(type)) {
// convert to midnight in default time zone
long utcMillis = DAYS.toMillis(type.getLong(block, position));
long localMillis = getInstanceUTC().getZone().getMillisKeepLocal(DateTimeZone.getDefault(), utcMillis);
statement.setDate(parameter, new Date(localMillis));
}
else if (type instanceof TimestampType) {
long timestampValue = type.getLong(block, position);
statement.setTimestamp(parameter,
Timestamp.from(Instant.ofEpochSecond(
((TimestampType) type).getEpochSecond(timestampValue),
((TimestampType) type).getNanos(timestampValue))));
}
else if (UuidType.UUID.equals(type)) {
Slice slice = type.getSlice(block, position);
statement.setObject(parameter, prestoUuidToJavaUuid(slice));
else if (javaType == Slice.class) {
((SliceWriteFunction) writeFunction).set(statement, parameter, type.getSlice(block, position));
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
try {
((ObjectWriteFunction) writeFunction).set(statement, parameter, type.getObject(block, position));
}
catch (SQLException e) {
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
}
}

Expand Down
Loading
Loading