Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ values. Typical usage does not require you to configure them.
* - ``delta.register-table-procedure.enabled``
- Enable to allow users to call the ``register_table`` procedure
- ``false``
* - ``delta.default-reader-version``
- The default reader version used by new tables.
The value can be overridden for a specific table with the
``reader_version`` table property.
- ``1``
* - ``delta.default-writer-version``
- The default writer version used by new tables.
The value can be overridden for a specific table with the
``writer_version`` table property.
- ``2``

The following table describes performance tuning catalog properties for the
connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -48,6 +49,12 @@ public class DeltaLakeConfig
@VisibleForTesting
static final DataSize DEFAULT_DATA_FILE_CACHE_SIZE = DataSize.succinctBytes(Math.floorDiv(Runtime.getRuntime().maxMemory(), 10L));

public static final int MIN_READER_VERSION = 1;
public static final int MIN_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_READER_VERSION = 2;
public static final int MAX_WRITER_VERSION = 4;

private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES);
private long metadataCacheMaxSize = 1000;
private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE;
Expand Down Expand Up @@ -77,6 +84,8 @@ public class DeltaLakeConfig
private boolean uniqueTableLocation = true;
private boolean legacyCreateTableWithExistingLocationEnabled;
private boolean registerTableProcedureEnabled;
private int defaultReaderVersion = MIN_READER_VERSION;
private int defaultWriterVersion = MIN_WRITER_VERSION;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -475,4 +484,34 @@ public DeltaLakeConfig setRegisterTableProcedureEnabled(boolean registerTablePro
this.registerTableProcedureEnabled = registerTableProcedureEnabled;
return this;
}

@Min(value = MIN_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION)
@Max(value = MAX_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION)
public int getDefaultReaderVersion()
{
return defaultReaderVersion;
}

@Config("delta.default-reader-version")
@ConfigDescription("The default reader version used by new tables")
public DeltaLakeConfig setDefaultReaderVersion(int defaultReaderVersion)
{
this.defaultReaderVersion = defaultReaderVersion;
return this;
}

@Min(value = MIN_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION)
@Max(value = MAX_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION)
public int getDefaultWriterVersion()
{
return defaultWriterVersion;
}

@Config("delta.default-writer-version")
@ConfigDescription("The default writer version used by new tables")
public DeltaLakeConfig setDefaultWriterVersion(int defaultWriterVersion)
{
this.defaultWriterVersion = defaultWriterVersion;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
Expand Down Expand Up @@ -286,13 +287,6 @@ public class DeltaLakeMetadata
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";
public static final String ISOLATION_LEVEL = "WriteSerializable";

// The required reader and writer versions used by tables created by Trino
public static final int MIN_READER_VERSION = 1;
public static final int MIN_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_READER_VERSION = 2;
public static final int MAX_WRITER_VERSION = 4;

private static final int CDF_SUPPORTED_WRITER_VERSION = 4;

// Matches the dummy column Databricks stores in the metastore
Expand Down Expand Up @@ -326,6 +320,8 @@ public class DeltaLakeMetadata
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final boolean allowManagedTableRename;
private final int defaultReaderVersion;
private final int defaultWriterVersion;

public DeltaLakeMetadata(
DeltaLakeMetastore metastore,
Expand All @@ -346,7 +342,9 @@ public DeltaLakeMetadata(
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
ExtendedStatisticsAccess statisticsAccess,
boolean useUniqueTableLocation,
boolean allowManagedTableRename)
boolean allowManagedTableRename,
int defaultReaderVersion,
int defaultWriterVersion)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -368,6 +366,8 @@ public DeltaLakeMetadata(
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.useUniqueTableLocation = useUniqueTableLocation;
this.allowManagedTableRename = allowManagedTableRename;
this.defaultReaderVersion = defaultReaderVersion;
this.defaultWriterVersion = defaultWriterVersion;
}

@Override
Expand Down Expand Up @@ -1776,8 +1776,8 @@ private ProtocolEntry getProtocolEntry(Map<String, Object> properties)
}
}
return new ProtocolEntry(
readerVersion.orElse(MIN_READER_VERSION),
writerVersion.orElse(MIN_WRITER_VERSION));
readerVersion.orElse(defaultReaderVersion),
writerVersion.orElse(defaultWriterVersion));
}

private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional<Long> checkpointInterval, long newVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class DeltaLakeMetadataFactory
private final long perTransactionMetastoreCacheMaximumSize;
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final int defaultReaderVersion;
private final int defaultWriterVersion;

private final boolean allowManagedTableRename;
private final String trinoVersion;
Expand Down Expand Up @@ -100,6 +102,8 @@ public DeltaLakeMetadataFactory(
this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize();
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
this.defaultReaderVersion = deltaLakeConfig.getDefaultReaderVersion();
this.defaultWriterVersion = deltaLakeConfig.getDefaultWriterVersion();
this.allowManagedTableRename = allowManagedTableRename;
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}
Expand Down Expand Up @@ -141,6 +145,8 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
deltaLakeRedirectionsProvider,
statisticsAccess,
useUniqueTableLocation,
allowManagedTableRename);
allowManagedTableRename,
defaultReaderVersion,
defaultWriterVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_WRITER_VERSION;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import org.testng.annotations.Test;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.testing.ValidationAssertions.assertFailsValidation;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone;
import static java.util.concurrent.TimeUnit.DAYS;
Expand Down Expand Up @@ -67,7 +72,9 @@ public void testDefaults()
.setTargetMaxFileSize(DataSize.of(1, GIGABYTE))
.setUniqueTableLocation(true)
.setLegacyCreateTableWithExistingLocationEnabled(false)
.setRegisterTableProcedureEnabled(false));
.setRegisterTableProcedureEnabled(false)
.setDefaultReaderVersion(1)
.setDefaultWriterVersion(2));
}

@Test
Expand Down Expand Up @@ -103,6 +110,8 @@ public void testExplicitPropertyMappings()
.put("delta.unique-table-location", "false")
.put("delta.legacy-create-table-with-existing-location.enabled", "true")
.put("delta.register-table-procedure.enabled", "true")
.put("delta.default-reader-version", "2")
.put("delta.default-writer-version", "3")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -134,8 +143,29 @@ public void testExplicitPropertyMappings()
.setTargetMaxFileSize(DataSize.of(2, GIGABYTE))
.setUniqueTableLocation(false)
.setLegacyCreateTableWithExistingLocationEnabled(true)
.setRegisterTableProcedureEnabled(true);
.setRegisterTableProcedureEnabled(true)
.setDefaultReaderVersion(2)
.setDefaultWriterVersion(3);

assertFullMapping(properties, expected);
}

@Test
public void testValidation()
{
assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(0), Min.class);
assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(3), Max.class);
assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(1), Min.class);
assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(5), Max.class);
}

private void assertFailsReaderVersionValidation(DeltaLakeConfig config, Class<? extends Annotation> annotation)
{
assertFailsValidation(config, "defaultReaderVersion", "Must be in between 1 and 2", annotation);
}

private void assertFailsWriterVersionValidation(DeltaLakeConfig config, Class<? extends Annotation> annotation)
{
assertFailsValidation(config, "defaultWriterVersion", "Must be in between 2 and 4", annotation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -163,7 +161,7 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter
true,
Optional.empty(),
Optional.of(false),
new ProtocolEntry(MIN_READER_VERSION, MIN_WRITER_VERSION));
new ProtocolEntry(deltaLakeConfig.getDefaultReaderVersion(), deltaLakeConfig.getDefaultWriterVersion()));

DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider(
new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),
Expand Down