Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ values. Typical usage does not require you to configure them.
* - ``delta.register-table-procedure.enabled``
- Enable to allow users to call the ``register_table`` procedure
- ``false``
* - ``delta.default-reader-version``
- The default reader version used by new tables.
The value can be overridden for a specific table with the
``reader_version`` table property.
- ``1``
* - ``delta.default-writer-version``
- The default writer version used by new tables.
The value can be overridden for a specific table with the
``writer_version`` table property.
- ``2``

The following table describes performance tuning catalog properties for the
connector.
Expand Down Expand Up @@ -573,21 +563,15 @@ The following properties are available for use:
- Set the checkpoint interval in seconds.
* - ``change_data_feed_enabled``
- Enables storing change data feed entries.
* - ``reader_version``
- Set reader version.
* - ``writer_version``
- Set writer version.

The following example uses all six table properties::
The following example uses multiple table properties::

CREATE TABLE example.default.example_partitioned_table
WITH (
location = 's3://my-bucket/a/path',
partitioned_by = ARRAY['regionkey'],
checkpoint_interval = 5,
change_data_feed_enabled = true,
reader_version = 2,
writer_version = 4
change_data_feed_enabled = true
)
AS SELECT name, comment, regionkey FROM tpch.tiny.nation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -49,12 +48,6 @@ public class DeltaLakeConfig
@VisibleForTesting
static final DataSize DEFAULT_DATA_FILE_CACHE_SIZE = DataSize.succinctBytes(Math.floorDiv(Runtime.getRuntime().maxMemory(), 10L));

public static final int MIN_READER_VERSION = 1;
public static final int MIN_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_READER_VERSION = 2;
public static final int MAX_WRITER_VERSION = 4;

private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES);
private long metadataCacheMaxSize = 1000;
private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE;
Expand Down Expand Up @@ -84,8 +77,6 @@ public class DeltaLakeConfig
private boolean uniqueTableLocation = true;
private boolean legacyCreateTableWithExistingLocationEnabled;
private boolean registerTableProcedureEnabled;
private int defaultReaderVersion = MIN_READER_VERSION;
private int defaultWriterVersion = MIN_WRITER_VERSION;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -484,34 +475,4 @@ public DeltaLakeConfig setRegisterTableProcedureEnabled(boolean registerTablePro
this.registerTableProcedureEnabled = registerTableProcedureEnabled;
return this;
}

@Min(value = MIN_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION)
@Max(value = MAX_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION)
public int getDefaultReaderVersion()
{
return defaultReaderVersion;
}

@Config("delta.default-reader-version")
@ConfigDescription("The default reader version used by new tables")
public DeltaLakeConfig setDefaultReaderVersion(int defaultReaderVersion)
{
this.defaultReaderVersion = defaultReaderVersion;
return this;
}

@Min(value = MIN_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION)
@Max(value = MAX_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION)
public int getDefaultWriterVersion()
{
return defaultWriterVersion;
}

@Config("delta.default-writer-version")
@ConfigDescription("The default writer version used by new tables")
public DeltaLakeConfig setDefaultWriterVersion(int defaultWriterVersion)
{
this.defaultWriterVersion = defaultWriterVersion;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getColumnNames;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getFilesModifiedAfterProperty;
Expand All @@ -171,7 +170,6 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
Expand All @@ -183,14 +181,10 @@
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.READER_VERSION_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.WRITER_VERSION_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getChangeDataFeedEnabled;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getCheckpointInterval;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getLocation;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getPartitionedBy;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getReaderVersion;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getWriterVersion;
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY;
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
Expand Down Expand Up @@ -287,6 +281,11 @@ public class DeltaLakeMetadata
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";
public static final String ISOLATION_LEVEL = "WriteSerializable";

public static final int DEFAULT_READER_VERSION = 1;
public static final int DEFAULT_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_WRITER_VERSION = 4;

private static final int CDF_SUPPORTED_WRITER_VERSION = 4;

// Matches the dummy column Databricks stores in the metastore
Expand All @@ -297,7 +296,6 @@ public class DeltaLakeMetadata
.add(NUMBER_OF_DISTINCT_VALUES_SUMMARY)
.build();
private static final String ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY = "delta.enable-non-concurrent-writes";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(READER_VERSION_PROPERTY, WRITER_VERSION_PROPERTY);

private final DeltaLakeMetastore metastore;
private final TrinoFileSystemFactory fileSystemFactory;
Expand All @@ -320,8 +318,6 @@ public class DeltaLakeMetadata
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final boolean allowManagedTableRename;
private final int defaultReaderVersion;
private final int defaultWriterVersion;

public DeltaLakeMetadata(
DeltaLakeMetastore metastore,
Expand All @@ -342,9 +338,7 @@ public DeltaLakeMetadata(
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
ExtendedStatisticsAccess statisticsAccess,
boolean useUniqueTableLocation,
boolean allowManagedTableRename,
int defaultReaderVersion,
int defaultWriterVersion)
boolean allowManagedTableRename)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -366,8 +360,6 @@ public DeltaLakeMetadata(
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.useUniqueTableLocation = useUniqueTableLocation;
this.allowManagedTableRename = allowManagedTableRename;
this.defaultReaderVersion = defaultReaderVersion;
this.defaultWriterVersion = defaultWriterVersion;
}

@Override
Expand Down Expand Up @@ -488,10 +480,6 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
Optional<Boolean> changeDataFeedEnabled = tableHandle.getMetadataEntry().isChangeDataFeedEnabled();
changeDataFeedEnabled.ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value));

ProtocolEntry protocolEntry = getProtocolEntry(session, tableHandle.getSchemaTableName());
properties.put(READER_VERSION_PROPERTY, protocolEntry.getMinReaderVersion());
properties.put(WRITER_VERSION_PROPERTY, protocolEntry.getMinWriterVersion());

return new ConnectorTableMetadata(
tableHandle.getSchemaTableName(),
columns,
Expand Down Expand Up @@ -1758,26 +1746,13 @@ private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName

private ProtocolEntry getProtocolEntry(Map<String, Object> properties)
{
Optional<Integer> readerVersion = getReaderVersion(properties);
Optional<Integer> writerVersion = getWriterVersion(properties);
Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(properties);

if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) {
if (writerVersion.isPresent()) {
if (writerVersion.get() < CDF_SUPPORTED_WRITER_VERSION) {
throw new TrinoException(
INVALID_TABLE_PROPERTY,
WRITER_VERSION_PROPERTY + " cannot be set less than " + CDF_SUPPORTED_WRITER_VERSION + " when cdf is enabled");
}
}
else {
// Enabling cdf (change data feed) requires setting the writer version to 4
writerVersion = Optional.of(CDF_SUPPORTED_WRITER_VERSION);
}
int readerVersion = DEFAULT_READER_VERSION;
int writerVersion = DEFAULT_WRITER_VERSION;
if (getChangeDataFeedEnabled(properties).orElse(false)) {
writerVersion = CDF_SUPPORTED_WRITER_VERSION;
}
return new ProtocolEntry(
readerVersion.orElse(defaultReaderVersion),
writerVersion.orElse(defaultWriterVersion));

return new ProtocolEntry(readerVersion, writerVersion);
}

private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional<Long> checkpointInterval, long newVersion)
Expand Down Expand Up @@ -1878,58 +1853,6 @@ private CommitInfoEntry getCommitInfoEntry(
Optional.of(true));
}

@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
Set<String> unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES);
if (!unsupportedProperties.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties));
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName());

Optional<Integer> readerVersion = Optional.empty();
if (properties.containsKey(READER_VERSION_PROPERTY)) {
readerVersion = Optional.of((int) properties.get(READER_VERSION_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The " + READER_VERSION_PROPERTY + " property cannot be empty")));
if (readerVersion.get() < currentProtocolEntry.getMinReaderVersion()) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format(
"%s cannot be downgraded from %d to %d", READER_VERSION_PROPERTY, currentProtocolEntry.getMinReaderVersion(), readerVersion.get()));
}
}

Optional<Integer> writerVersion = Optional.empty();
if (properties.containsKey(WRITER_VERSION_PROPERTY)) {
writerVersion = Optional.of((int) properties.get(WRITER_VERSION_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The " + WRITER_VERSION_PROPERTY + " property cannot be empty")));
if (writerVersion.get() < currentProtocolEntry.getMinWriterVersion()) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format(
"%s cannot be downgraded from %d to %d", WRITER_VERSION_PROPERTY, currentProtocolEntry.getMinWriterVersion(), writerVersion.get()));
}
}

long readVersion = handle.getReadVersion();
long commitVersion = readVersion + 1;

ProtocolEntry protocolEntry = new ProtocolEntry(
readerVersion.orElse(currentProtocolEntry.getMinReaderVersion()),
writerVersion.orElse(currentProtocolEntry.getMinWriterVersion()));

try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation());

long createdTime = Instant.now().toEpochMilli();
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, SET_TBLPROPERTIES_OPERATION, readVersion));
transactionLogWriter.appendProtocolEntry(protocolEntry);

transactionLogWriter.flush();
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
}
}

@Override
public Map<String, Object> getSchemaProperties(ConnectorSession session, CatalogSchemaName schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ public class DeltaLakeMetadataFactory
private final long perTransactionMetastoreCacheMaximumSize;
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final int defaultReaderVersion;
private final int defaultWriterVersion;

private final boolean allowManagedTableRename;
private final String trinoVersion;
Expand Down Expand Up @@ -102,8 +100,6 @@ public DeltaLakeMetadataFactory(
this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize();
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
this.defaultReaderVersion = deltaLakeConfig.getDefaultReaderVersion();
this.defaultWriterVersion = deltaLakeConfig.getDefaultWriterVersion();
this.allowManagedTableRename = allowManagedTableRename;
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}
Expand Down Expand Up @@ -145,8 +141,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
deltaLakeRedirectionsProvider,
statisticsAccess,
useUniqueTableLocation,
allowManagedTableRename,
defaultReaderVersion,
defaultWriterVersion);
allowManagedTableRename);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_WRITER_VERSION;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.longProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand All @@ -45,8 +40,6 @@ public class DeltaLakeTableProperties
public static final String PARTITIONED_BY_PROPERTY = "partitioned_by";
public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval";
public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled";
public static final String READER_VERSION_PROPERTY = "reader_version";
public static final String WRITER_VERSION_PROPERTY = "writer_version";

private final List<PropertyMetadata<?>> tableProperties;

Expand Down Expand Up @@ -81,18 +74,6 @@ public DeltaLakeTableProperties()
"Enables storing change data feed entries",
null,
false))
.add(integerProperty(
READER_VERSION_PROPERTY,
"Reader version",
null,
value -> validateVersion(READER_VERSION_PROPERTY, value, MIN_READER_VERSION, MAX_READER_VERSION),
false))
.add(integerProperty(
WRITER_VERSION_PROPERTY,
"Writer version",
null,
value -> validateVersion(WRITER_VERSION_PROPERTY, value, MIN_WRITER_VERSION, MAX_WRITER_VERSION),
false))
.build();
}

Expand Down Expand Up @@ -128,23 +109,4 @@ public static Optional<Boolean> getChangeDataFeedEnabled(Map<String, Object> tab
{
return Optional.ofNullable((Boolean) tableProperties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY));
}

public static Optional<Integer> getReaderVersion(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Integer) tableProperties.get(READER_VERSION_PROPERTY));
}

public static Optional<Integer> getWriterVersion(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Integer) tableProperties.get(WRITER_VERSION_PROPERTY));
}

private static void validateVersion(String propertyName, Object value, int minSupportedVersion, int maxSupportedVersion)
{
int version = (int) value;
if (version < minSupportedVersion || version > maxSupportedVersion) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format(
"%s must be between %d and %d", propertyName, minSupportedVersion, maxSupportedVersion));
}
}
}
Loading