Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
*/
package io.trino.plugin.singlestore;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.trino.plugin.jdbc.BaseJdbcClient;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ColumnMapping;
Expand All @@ -29,8 +27,6 @@
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.UnsupportedTypeHandling;
import io.trino.plugin.jdbc.WriteMapping;
import io.trino.plugin.jdbc.mapping.IdentifierMapping;
import io.trino.spi.TrinoException;
Expand All @@ -41,7 +37,6 @@
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
Expand All @@ -57,27 +52,22 @@

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse;
Expand Down Expand Up @@ -115,7 +105,6 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling;
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.IGNORE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand All @@ -138,20 +127,24 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.sql.DatabaseMetaData.columnNoNulls;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class SingleStoreClient
extends BaseJdbcClient
{
private static final Logger log = Logger.get(SingleStoreClient.class);

static final int MEMSQL_DATE_TIME_MAX_PRECISION = 6;
static final int MEMSQL_VARCHAR_MAX_LENGTH = 21844;
static final int MEMSQL_TEXT_MAX_LENGTH = 65535;
static final int MEMSQL_MEDIUMTEXT_MAX_LENGTH = 16777215;
// Singlestore driver returns width of timestamp types instead of precision.
// 19 characters are used for zero-precision timestamps while others
// require 19 + precision + 1 characters with the additional character for decimal separator
private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
// Singlestore driver returns width of time types instead of precision, same as the above timestamp type.
// 10 characters are used for zero-precision time
private static final int ZERO_PRECISION_TIME_COLUMN_SIZE = 10;
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd");
private static final Pattern UNSIGNED_TYPE_REGEX = Pattern.compile("(?i).*unsigned$");

Expand Down Expand Up @@ -201,110 +194,6 @@ protected boolean filterSchema(String schemaName)
return super.filterSchema(schemaName);
}

@Override
public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle)
{
if (tableHandle.getColumns().isPresent()) {
return tableHandle.getColumns().get();
}
checkArgument(tableHandle.isNamedRelation(), "Cannot get columns for %s", tableHandle);
SchemaTableName schemaTableName = tableHandle.getRequiredNamedRelation().getSchemaTableName();
RemoteTableName remoteTableName = tableHandle.getRequiredNamedRelation().getRemoteTableName();

try (Connection connection = connectionFactory.openConnection(session);
ResultSet resultSet = getColumns(tableHandle, connection.getMetaData())) {
Map<String, Integer> timestampPrecisions = getTimestampPrecisions(connection, tableHandle);
int allColumns = 0;
List<JdbcColumnHandle> columns = new ArrayList<>();
while (resultSet.next()) {
// skip if table doesn't match expected
if (!(Objects.equals(remoteTableName, getRemoteTable(resultSet)))) {
continue;
}
allColumns++;
String columnName = resultSet.getString("COLUMN_NAME");
Optional<Integer> decimalDigits = getInteger(resultSet, "DECIMAL_DIGITS");
if (timestampPrecisions.containsKey(columnName)) {
decimalDigits = Optional.of(timestampPrecisions.get(columnName));
}

JdbcTypeHandle typeHandle = new JdbcTypeHandle(
getInteger(resultSet, "DATA_TYPE").orElseThrow(() -> new IllegalStateException("DATA_TYPE is null")),
Optional.ofNullable(resultSet.getString("TYPE_NAME")),
getInteger(resultSet, "COLUMN_SIZE"),
decimalDigits,
Optional.empty(),
Optional.empty());
Optional<ColumnMapping> columnMapping = toColumnMapping(session, connection, typeHandle);
log.debug("Mapping data type of '%s' column '%s': %s mapped to %s", schemaTableName, columnName, typeHandle, columnMapping);
// skip unsupported column types
boolean nullable = (resultSet.getInt("NULLABLE") != columnNoNulls);
Optional<String> comment = Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
if (columnMapping.isPresent()) {
columns.add(JdbcColumnHandle.builder()
.setColumnName(columnName)
.setJdbcTypeHandle(typeHandle)
.setColumnType(columnMapping.get().getType())
.setNullable(nullable)
.setComment(comment)
.build());
}
if (columnMapping.isEmpty()) {
UnsupportedTypeHandling unsupportedTypeHandling = getUnsupportedTypeHandling(session);
verify(
unsupportedTypeHandling == IGNORE,
"Unsupported type handling is set to %s, but toColumnMapping() returned empty for %s",
unsupportedTypeHandling,
typeHandle);
}
}
if (columns.isEmpty()) {
throw new TableNotFoundException(
schemaTableName,
format("Table '%s' has no supported columns (all %s columns are not supported)", schemaTableName, allColumns));
}
return ImmutableList.copyOf(columns);
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

private static RemoteTableName getRemoteTable(ResultSet resultSet)
throws SQLException
{
return new RemoteTableName(
Optional.ofNullable(resultSet.getString("TABLE_CAT")),
Optional.ofNullable(resultSet.getString("TABLE_SCHEM")),
resultSet.getString("TABLE_NAME"));
}

private static Map<String, Integer> getTimestampPrecisions(Connection connection, JdbcTableHandle tableHandle)
throws SQLException
{
// SingleStore JDBC driver doesn't expose timestamp precision when connecting to MemSQL cluster
String sql = "" +
"SELECT column_name, column_type " +
"FROM information_schema.columns " +
"WHERE table_schema = ? " +
"AND table_name = ? " +
"AND column_type IN ('datetime', 'datetime(6)', 'time', 'time(6)', 'timestamp', 'timestamp(6)')";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, tableHandle.getCatalogName());
statement.setString(2, tableHandle.getTableName());

Map<String, Integer> timestampColumnPrecisions = new HashMap<>();
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
String columnType = resultSet.getString("column_type");
int size = columnType.equals("datetime") || columnType.equals("time") || columnType.equals("timestamp") ? 0 : 6;
timestampColumnPrecisions.put(resultSet.getString("column_name"), size);
}
}
return timestampColumnPrecisions;
}
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down Expand Up @@ -373,14 +262,14 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
dateReadFunctionUsingLocalDate(),
dateWriteFunction()));
case Types.TIME:
TimeType timeType = createTimeType(typeHandle.getRequiredDecimalDigits());
TimeType timeType = createTimeType(getTimePrecision(typeHandle.getRequiredColumnSize()));
return Optional.of(ColumnMapping.longMapping(
timeType,
memsqlTimeReadFunction(timeType),
timeWriteFunction(timeType.getPrecision())));
case Types.TIMESTAMP:
// TODO (https://github.com/trinodb/trino/issues/5450) Fix DST handling
TimestampType timestampType = createTimestampType(typeHandle.getRequiredDecimalDigits());
TimestampType timestampType = createTimestampType(getTimestampPrecision(typeHandle.getRequiredColumnSize()));
return Optional.of(timestampColumnMapping(timestampType));
}

Expand All @@ -390,6 +279,26 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
return Optional.empty();
}

private static int getTimePrecision(int timeColumnSize)
{
if (timeColumnSize == ZERO_PRECISION_TIME_COLUMN_SIZE) {
return 0;
}
int timePrecision = timeColumnSize - ZERO_PRECISION_TIME_COLUMN_SIZE - 1;
verify(1 <= timePrecision && timePrecision <= MEMSQL_DATE_TIME_MAX_PRECISION, "Unexpected time precision %s calculated from time column size %s", timePrecision, timeColumnSize);
return timePrecision;
}

private static int getTimestampPrecision(int timestampColumnSize)
{
if (timestampColumnSize == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
return 0;
}
int timestampPrecision = timestampColumnSize - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1;
verify(1 <= timestampPrecision && timestampPrecision <= MEMSQL_DATE_TIME_MAX_PRECISION, "Unexpected timestamp precision %s calculated from timestamp column size %s", timestampPrecision, timestampColumnSize);
return timestampPrecision;
}

@Override
public ResultSet getTables(Connection connection, Optional<String> schemaName, Optional<String> tableName)
throws SQLException
Expand Down