Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ this table:
- ``BINARY``
* - ``DATE``
- ``DATE``
* - ``TIMESTAMP``
- ``TIMESTAMPNTZ`` (``TIMESTAMP_NTZ``)
* - ``TIMESTAMP(3) WITH TIME ZONE``
- ``TIMESTAMP``
* - ``ARRAY``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import io.trino.spi.type.HyperLogLogType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
Expand All @@ -146,6 +147,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -336,6 +338,8 @@ public class DeltaLakeMetadata
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION = 2;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION = 5;
private static final int TIMESTAMP_NTZ_SUPPORTED_READER_VERSION = 3;
private static final int TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION = 7;

// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
Expand Down Expand Up @@ -879,10 +883,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(tableMetadata.getColumns().size());
ImmutableMap.Builder<String, Object> columnTypes = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size());
ImmutableMap.Builder<String, Map<String, Object>> columnsMetadata = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size());
boolean containsTimestampType = false;
for (ColumnMetadata column : tableMetadata.getColumns()) {
columnNames.add(column.getName());
columnTypes.put(column.getName(), serializeColumnType(columnMappingMode, fieldId, column.getType()));
columnsMetadata.put(column.getName(), generateColumnMetadata(columnMappingMode, fieldId));
if (!containsTimestampType) {
containsTimestampType = containsTimestampType(column.getType());
}
}
Map<String, String> columnComments = tableMetadata.getColumns().stream()
.filter(column -> column.getComment() != null)
Expand All @@ -909,7 +917,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
CREATE_TABLE_OPERATION,
session,
tableMetadata.getComment(),
protocolEntryForNewTable(tableMetadata.getProperties()));
protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties()));

setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory));
transactionLogWriter.flush();
Expand Down Expand Up @@ -1009,6 +1017,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), finalLocation));

boolean usePhysicalName = columnMappingMode == ID || columnMappingMode == NAME;
boolean containsTimestampType = false;
int columnSize = tableMetadata.getColumns().size();
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columnSize);
ImmutableMap.Builder<String, Object> columnTypes = ImmutableMap.builderWithExpectedSize(columnSize);
Expand All @@ -1018,6 +1027,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
for (ColumnMetadata column : tableMetadata.getColumns()) {
columnNames.add(column.getName());
columnNullabilities.put(column.getName(), column.isNullable());
containsTimestampType |= containsTimestampType(column.getType());

Object serializedType = serializeColumnType(columnMappingMode, fieldId, column.getType());
Type physicalType = deserializeType(typeManager, serializedType, usePhysicalName);
Expand Down Expand Up @@ -1067,7 +1077,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
schemaString,
columnMappingMode,
maxFieldId,
protocolEntryForNewTable(tableMetadata.getProperties()));
protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties()));
}

private Optional<String> getSchemaLocation(Database database)
Expand Down Expand Up @@ -1156,6 +1166,21 @@ private static void deleteRecursivelyIfExists(TrinoFileSystem fileSystem, Locati
}
}

private static boolean containsTimestampType(Type type)
{
if (type instanceof ArrayType arrayType) {
return containsTimestampType(arrayType.getElementType());
}
if (type instanceof MapType mapType) {
return containsTimestampType(mapType.getKeyType()) || containsTimestampType(mapType.getValueType());
}
if (type instanceof RowType rowType) {
return rowType.getFields().stream().anyMatch(field -> containsTimestampType(field.getType()));
}
checkArgument(type.getTypeParameters().isEmpty(), "Unexpected type parameters for type %s", type);
return type instanceof TimestampType;
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
Expand Down Expand Up @@ -2243,10 +2268,12 @@ private TableSnapshot getSnapshot(SchemaTableName schemaTableName, String tableL
}
}

private ProtocolEntry protocolEntryForNewTable(Map<String, Object> properties)
private ProtocolEntry protocolEntryForNewTable(boolean containsTimestampType, Map<String, Object> properties)
{
int readerVersion = DEFAULT_READER_VERSION;
int writerVersion = DEFAULT_WRITER_VERSION;
Set<String> readerFeatures = new HashSet<>();
Set<String> writerFeatures = new HashSet<>();
Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(properties);
if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) {
// Enabling cdf (change data feed) requires setting the writer version to 4
Expand All @@ -2258,7 +2285,17 @@ private ProtocolEntry protocolEntryForNewTable(Map<String, Object> properties)
readerVersion = max(readerVersion, COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION);
writerVersion = max(writerVersion, COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION);
}
return new ProtocolEntry(readerVersion, writerVersion, Optional.empty(), Optional.empty());
if (containsTimestampType) {
readerVersion = max(readerVersion, TIMESTAMP_NTZ_SUPPORTED_READER_VERSION);
writerVersion = max(writerVersion, TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION);
readerFeatures.add("timestampNtz");
writerFeatures.add("timestampNtz");
}
return new ProtocolEntry(
readerVersion,
writerVersion,
readerFeatures.isEmpty() ? Optional.empty() : Optional.of(readerFeatures),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why discern between empty Set of features and no set of features?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProtocolEntry has the following logic, so Optional.of(Set.of()) throws an exception for lower reader & writer versions. We could change the logic alternatively.

        if (minReaderVersion < MIN_VERSION_SUPPORTS_READER_FEATURES && readerFeatures.isPresent()) {
            throw new IllegalArgumentException("readerFeatures must not exist when minReaderVersion is less than " + MIN_VERSION_SUPPORTS_READER_FEATURES);
        }
        if (minWriterVersion < MIN_VERSION_SUPPORTS_WRITER_FEATURES && writerFeatures.isPresent()) {
            throw new IllegalArgumentException("writerFeatures must not exist when minWriterVersion is less than " + MIN_VERSION_SUPPORTS_WRITER_FEATURES);
        }

writerFeatures.isEmpty() ? Optional.empty() : Optional.of(writerFeatures));
}

private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> checkpointInterval, long newVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -266,6 +267,10 @@ else if (trinoDecimalType.isShort()) {
typeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS));
trinoType = TIMESTAMP_MILLIS;
}
case "timestamp_ntz" -> {
typeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS));
trinoType = TIMESTAMP_MICROS;
}
default -> throw new TrinoException(NOT_SUPPORTED, format("Unsupported primitive type: %s", primitiveType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
Expand Down Expand Up @@ -104,6 +105,7 @@ private DeltaLakeSchemaSupport() {}
.add("checkConstraints")
.add("changeDataFeed")
.add("columnMapping")
.add("timestampNtz")
.build();

public enum ColumnMappingMode
Expand Down Expand Up @@ -328,6 +330,9 @@ private static String serializePrimitiveType(Type type)

private static Optional<String> serializeSupportedPrimitiveType(Type type)
{
if (type instanceof TimestampType) {
return Optional.of("timestamp_ntz");
}
if (type instanceof TimestampWithTimeZoneType) {
return Optional.of("timestamp");
}
Expand Down Expand Up @@ -374,6 +379,7 @@ private static void validateStructuralType(Optional<Type> rootType, Type type)
private static void validatePrimitiveType(Type type)
{
if (serializeSupportedPrimitiveType(type).isEmpty() ||
(type instanceof TimestampType && ((TimestampType) type).getPrecision() != 6) ||
(type instanceof TimestampWithTimeZoneType && ((TimestampWithTimeZoneType) type).getPrecision() != 3)) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Unsupported type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
Expand Down Expand Up @@ -112,7 +113,7 @@ private static String toPartitionValue(Type type, Block block, int position)
if (DATE.equals(type)) {
return LocalDate.ofEpochDay(DATE.getInt(block, position)).format(DELTA_DATE_FORMATTER);
}
if (TIMESTAMP_MILLIS.equals(type)) {
if (TIMESTAMP_MILLIS.equals(type) || TIMESTAMP_MICROS.equals(type)) {
long epochMicros = type.getLong(block, position);
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Expand Down
Loading