Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ jobs:
- suite-delta-lake-databricks91
- suite-delta-lake-databricks104
- suite-delta-lake-databricks113
- suite-delta-lake-databricks122
- suite-gcs
- suite-clients
- suite-functions
Expand Down Expand Up @@ -896,6 +897,11 @@ jobs:
- suite: suite-delta-lake-databricks113
ignore exclusion if: >-
${{ secrets.DATABRICKS_TOKEN != '' }}
- suite: suite-delta-lake-databricks122
config: hdp3
- suite: suite-delta-lake-databricks122
ignore exclusion if: >-
${{ secrets.DATABRICKS_TOKEN != '' }}

ignore exclusion if:
# Do not use this property outside of the matrix configuration.
Expand Down Expand Up @@ -992,6 +998,7 @@ jobs:
DATABRICKS_91_JDBC_URL:
DATABRICKS_104_JDBC_URL:
DATABRICKS_113_JDBC_URL:
DATABRICKS_122_JDBC_URL:
DATABRICKS_LOGIN:
DATABRICKS_TOKEN:
GCP_CREDENTIALS_KEY:
Expand Down Expand Up @@ -1062,6 +1069,7 @@ jobs:
DATABRICKS_91_JDBC_URL: ${{ secrets.DATABRICKS_91_JDBC_URL }}
DATABRICKS_104_JDBC_URL: ${{ secrets.DATABRICKS_104_JDBC_URL }}
DATABRICKS_113_JDBC_URL: ${{ secrets.DATABRICKS_113_JDBC_URL }}
DATABRICKS_122_JDBC_URL: ${{ secrets.DATABRICKS_122_JDBC_URL }}
DATABRICKS_LOGIN: token
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
Expand Down
8 changes: 4 additions & 4 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Requirements

To connect to Databricks Delta Lake, you need:

* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS and 11.3 LTS are supported.
* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS, 11.3 LTS and 12.2 LTS are supported.
* Deployments using AWS, HDFS, Azure Storage, and Google Cloud Storage (GCS) are
fully supported.
* Network access from the coordinator and workers to the Delta Lake storage.
Expand Down Expand Up @@ -298,8 +298,8 @@ No other types are supported.
Security
--------

The Delta Lake connector allows you to choose one of several means of providing
autorization at the catalog level. You can select a different type of
The Delta Lake connector allows you to choose one of several means of providing
autorization at the catalog level. You can select a different type of
authorization check in different Delta Lake catalog files.

.. _delta-lake-authorization:
Expand Down Expand Up @@ -935,4 +935,4 @@ connector.
property to ``false`` to disable the optimized parquet reader by default
for structural data types. The equivalent catalog session property is
``parquet_optimized_nested_reader_enabled``.
- ``true``
- ``true``
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedReaderFeatures;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
Expand Down Expand Up @@ -283,8 +284,9 @@ public class DeltaLakeMetadata

public static final int DEFAULT_READER_VERSION = 1;
public static final int DEFAULT_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_WRITER_VERSION = 4;
// The highest reader and writer versions Trino supports
private static final int MAX_READER_VERSION = 3;
private static final int MAX_WRITER_VERSION = 4;
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;

// Matches the dummy column Databricks stores in the metastore
Expand Down Expand Up @@ -437,6 +439,16 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
}
throw e;
}
ProtocolEntry protocolEntry = metastore.getProtocol(session, tableSnapshot);
if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) {
LOG.debug("Skip %s because the reader version is unsupported: %d", dataTableName, protocolEntry.getMinReaderVersion());
return null;
}
Set<String> unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.getReaderFeatures().orElse(ImmutableSet.of()));
if (!unsupportedReaderFeatures.isEmpty()) {
LOG.debug("Skip %s because the table contains unsupported reader features: %s", dataTableName, unsupportedReaderFeatures);
return null;
}
Comment on lines +443 to +451
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fail instead or reporting "table not found".

verifySupportedColumnMapping(getColumnMappingMode(metadataEntry));
return new DeltaLakeTableHandle(
dataTableName.getSchemaName(),
Expand Down Expand Up @@ -1742,7 +1754,7 @@ private ProtocolEntry protocolEntryForNewTable(Map<String, Object> properties)
// Enabling cdf (change data feed) requires setting the writer version to 4
writerVersion = CDF_SUPPORTED_WRITER_VERSION;
}
return new ProtocolEntry(DEFAULT_READER_VERSION, writerVersion);
return new ProtocolEntry(DEFAULT_READER_VERSION, writerVersion, Optional.empty(), Optional.empty());
}

private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional<Long> checkpointInterval, long newVersion)
Expand Down Expand Up @@ -1886,7 +1898,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta

Optional<ProtocolEntry> protocolEntry = Optional.empty();
if (requiredWriterVersion != currentProtocolEntry.getMinWriterVersion()) {
protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.getMinReaderVersion(), requiredWriterVersion));
protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.getMinReaderVersion(), requiredWriterVersion, currentProtocolEntry.getReaderFeatures(), currentProtocolEntry.getWriterFeatures()));
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.base.Enums;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.json.ObjectMapperProvider;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
Expand Down Expand Up @@ -49,6 +51,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;

import static com.google.common.base.Strings.isNullOrEmpty;
Expand Down Expand Up @@ -81,6 +84,12 @@ private DeltaLakeSchemaSupport() {}
public static final String APPEND_ONLY_CONFIGURATION_KEY = "delta.appendOnly";
public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode";

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features
// TODO: Add support for 'deletionVectors' and 'timestampNTZ' reader features
private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add("columnMapping")
.build();

public enum ColumnMappingMode
{
ID,
Expand Down Expand Up @@ -467,6 +476,11 @@ private static <T> Map<String, T> getColumnProperty(String json, Function<JsonNo
}
}

public static Set<String> unsupportedReaderFeatures(Set<String> features)
{
return Sets.difference(features, SUPPORTED_READER_FEATURES);
}

private static Type buildType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
{
if (typeNode.isContainerNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,39 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static java.lang.String.format;

public class ProtocolEntry
{
private static final int MIN_VERSION_SUPPORTS_READER_FEATURES = 3;
private static final int MIN_VERSION_SUPPORTS_WRITER_FEATURES = 7;

private final int minReaderVersion;
private final int minWriterVersion;
private final Optional<Set<String>> readerFeatures;
private final Optional<Set<String>> writerFeatures;

@JsonCreator
public ProtocolEntry(
@JsonProperty("minReaderVersion") int minReaderVersion,
@JsonProperty("minWriterVersion") int minWriterVersion)
@JsonProperty("minWriterVersion") int minWriterVersion,
// The delta protocol documentation mentions that readerFeatures & writerFeatures is Array[String], but their actual implementation is Set
@JsonProperty("readerFeatures") Optional<Set<String>> readerFeatures,
@JsonProperty("writerFeatures") Optional<Set<String>> writerFeatures)
{
this.minReaderVersion = minReaderVersion;
this.minWriterVersion = minWriterVersion;
if (minReaderVersion < MIN_VERSION_SUPPORTS_READER_FEATURES && readerFeatures.isPresent()) {
Comment thread
ebyhr marked this conversation as resolved.
Outdated
throw new IllegalArgumentException("readerFeatures must not exist when minReaderVersion is less than " + MIN_VERSION_SUPPORTS_READER_FEATURES);
}
if (minWriterVersion < MIN_VERSION_SUPPORTS_WRITER_FEATURES && writerFeatures.isPresent()) {
throw new IllegalArgumentException("writerFeatures must not exist when minWriterVersion is less than " + MIN_VERSION_SUPPORTS_WRITER_FEATURES);
}
this.readerFeatures = readerFeatures;
this.writerFeatures = writerFeatures;
}

@JsonProperty
Expand All @@ -46,6 +64,18 @@ public int getMinWriterVersion()
return minWriterVersion;
}

@JsonProperty
public Optional<Set<String>> getReaderFeatures()
{
return readerFeatures;
}

@JsonProperty
public Optional<Set<String>> getWriterFeatures()
{
return writerFeatures;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -57,18 +87,25 @@ public boolean equals(Object o)
}
ProtocolEntry that = (ProtocolEntry) o;
return minReaderVersion == that.minReaderVersion &&
minWriterVersion == that.minWriterVersion;
minWriterVersion == that.minWriterVersion &&
readerFeatures.equals(that.readerFeatures) &&
writerFeatures.equals(that.writerFeatures);
}

@Override
public int hashCode()
{
return Objects.hash(minReaderVersion, minWriterVersion);
return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures);
}

@Override
public String toString()
{
return format("ProtocolEntry{minReaderVersion=%d, minWriterVersion=%d}", minReaderVersion, minWriterVersion);
return format(
"ProtocolEntry{minReaderVersion=%d, minWriterVersion=%d, readerFeatures=%s, writerFeatures=%s}",
minReaderVersion,
minWriterVersion,
readerFeatures,
writerFeatures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
Expand Down Expand Up @@ -213,7 +214,7 @@ private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointS
type = schemaManager.getMetadataEntryType();
break;
case PROTOCOL:
type = schemaManager.getProtocolEntryType();
type = schemaManager.getProtocolEntryType(true, true);
break;
case COMMIT:
type = schemaManager.getCommitInfoEntryType();
Expand Down Expand Up @@ -278,16 +279,21 @@ private DeltaLakeTransactionLogEntry buildProtocolEntry(ConnectorSession session
if (block.isNull(pagePosition)) {
return null;
}
int protocolFields = 2;
int minProtocolFields = 2;
int maxProtocolFields = 4;
Block protocolEntryBlock = block.getObject(pagePosition, Block.class);
log.debug("Block %s has %s fields", block, protocolEntryBlock.getPositionCount());
if (protocolEntryBlock.getPositionCount() != protocolFields) {
if (protocolEntryBlock.getPositionCount() < minProtocolFields || protocolEntryBlock.getPositionCount() > maxProtocolFields) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA,
format("Expected block %s to have %d children, but found %s", block, protocolFields, protocolEntryBlock.getPositionCount()));
format("Expected block %s to have between %d and %d children, but found %s", block, minProtocolFields, maxProtocolFields, protocolEntryBlock.getPositionCount()));
}
// The last entry should be writer feature when protocol entry size is 3 https://github.com/delta-io/delta/blob/master/PROTOCOL.md#disabled-features
int position = 0;
ProtocolEntry result = new ProtocolEntry(
getInt(protocolEntryBlock, 0),
getInt(protocolEntryBlock, 1));
getInt(protocolEntryBlock, position++),
getInt(protocolEntryBlock, position++),
protocolEntryBlock.getPositionCount() == 4 && protocolEntryBlock.isNull(position) ? Optional.empty() : Optional.of(getList(protocolEntryBlock, position++).stream().collect(toImmutableSet())),
protocolEntryBlock.isNull(position) ? Optional.empty() : Optional.of(getList(protocolEntryBlock, position++).stream().collect(toImmutableSet())));
log.debug("Result: %s", result);
return DeltaLakeTransactionLogEntry.protocolEntry(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,16 @@ public class CheckpointSchemaManager
RowType.field("deletionTimestamp", BigintType.BIGINT),
RowType.field("dataChange", BooleanType.BOOLEAN)));

private static final RowType PROTOCOL_ENTRY_TYPE = RowType.from(ImmutableList.of(
RowType.field("minReaderVersion", IntegerType.INTEGER),
RowType.field("minWriterVersion", IntegerType.INTEGER)));

private final RowType metadataEntryType;
private final RowType commitInfoEntryType;
private final ArrayType stringList;

@Inject
public CheckpointSchemaManager(TypeManager typeManager)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");

ArrayType stringList = (ArrayType) this.typeManager.getType(TypeSignature.arrayType(VarcharType.VARCHAR.getTypeSignature()));
stringList = (ArrayType) this.typeManager.getType(TypeSignature.arrayType(VarcharType.VARCHAR.getTypeSignature()));
MapType stringMap = (MapType) this.typeManager.getType(TypeSignature.mapType(VarcharType.VARCHAR.getTypeSignature(), VarcharType.VARCHAR.getTypeSignature()));

metadataEntryType = RowType.from(ImmutableList.of(
Expand Down Expand Up @@ -176,9 +173,18 @@ public RowType getTxnEntryType()
return TXN_ENTRY_TYPE;
}

public RowType getProtocolEntryType()
public RowType getProtocolEntryType(boolean requireReaderFeatures, boolean requireWriterFeatures)
{
return PROTOCOL_ENTRY_TYPE;
ImmutableList.Builder<RowType.Field> fields = ImmutableList.builder();
fields.add(RowType.field("minReaderVersion", IntegerType.INTEGER));
fields.add(RowType.field("minWriterVersion", IntegerType.INTEGER));
if (requireReaderFeatures) {
fields.add(RowType.field("readerFeatures", stringList));
}
if (requireWriterFeatures) {
fields.add(RowType.field("writerFeatures", stringList));
}
return RowType.from(fields.build());
}

public RowType getCommitInfoEntryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue;
Expand Down Expand Up @@ -96,8 +97,10 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile)
// The default value is false in https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-format, but Databricks defaults to true
boolean writeStatsAsStruct = Boolean.parseBoolean(configuration.getOrDefault(DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY, "true"));

ProtocolEntry protocolEntry = entries.getProtocolEntry();

RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType();
RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType();
RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent());
RowType txnEntryType = checkpointSchemaManager.getTxnEntryType();
RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), writeStatsAsJson, writeStatsAsStruct);
RowType removeEntryType = checkpointSchemaManager.getRemoveEntryType();
Expand Down Expand Up @@ -177,8 +180,15 @@ private void writeProtocolEntry(PageBuilder pageBuilder, RowType entryType, Prot
pageBuilder.declarePosition();
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(PROTOCOL_BLOCK_CHANNEL);
BlockBuilder entryBlockBuilder = blockBuilder.beginBlockEntry();
writeLong(entryBlockBuilder, entryType, 0, "minReaderVersion", (long) protocolEntry.getMinReaderVersion());
writeLong(entryBlockBuilder, entryType, 1, "minWriterVersion", (long) protocolEntry.getMinWriterVersion());
int fieldId = 0;
writeLong(entryBlockBuilder, entryType, fieldId++, "minReaderVersion", (long) protocolEntry.getMinReaderVersion());
writeLong(entryBlockBuilder, entryType, fieldId++, "minWriterVersion", (long) protocolEntry.getMinWriterVersion());
if (protocolEntry.getReaderFeatures().isPresent()) {
writeStringList(entryBlockBuilder, entryType, fieldId++, "readerFeatures", protocolEntry.getReaderFeatures().get().stream().collect(toImmutableList()));
}
if (protocolEntry.getWriterFeatures().isPresent()) {
writeStringList(entryBlockBuilder, entryType, fieldId++, "writerFeatures", protocolEntry.getWriterFeatures().get().stream().collect(toImmutableList()));
}
blockBuilder.closeEntry();

// null for others
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static ConnectorPageSink createPageSink(String outputPath, DeltaLakeWrit
true,
Optional.empty(),
Optional.of(false),
new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION));
new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, Optional.empty(), Optional.empty()));

DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider(
new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),
Expand Down
Loading