diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 7281b690fa5c..b490f2323d8c 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -157,6 +157,16 @@ values. Typical usage does not require you to configure them. * - ``delta.register-table-procedure.enabled`` - Enable to allow users to call the ``register_table`` procedure - ``false`` + * - ``delta.default-reader-version`` + - The default reader version used by new tables. + The value can be overridden for a specific table with the + ``reader_version`` table property. + - ``1`` + * - ``delta.default-writer-version`` + - The default writer version used by new tables. + The value can be overridden for a specific table with the + ``writer_version`` table property. + - ``2`` The following table describes performance tuning catalog properties for the connector. diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 352b18f6c684..2ebecc975eb6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -25,6 +25,7 @@ import javax.validation.constraints.DecimalMax; import javax.validation.constraints.DecimalMin; +import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -48,6 +49,12 @@ public class DeltaLakeConfig @VisibleForTesting static final DataSize DEFAULT_DATA_FILE_CACHE_SIZE = DataSize.succinctBytes(Math.floorDiv(Runtime.getRuntime().maxMemory(), 10L)); + public static final int MIN_READER_VERSION = 1; + public static final int MIN_WRITER_VERSION = 2; + // The highest reader and writer versions Trino supports writing to + public static final int MAX_READER_VERSION = 2; + public static final int MAX_WRITER_VERSION = 4; + private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES); private long metadataCacheMaxSize = 1000; private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE; @@ -77,6 +84,8 @@ public class DeltaLakeConfig private boolean uniqueTableLocation = true; private boolean legacyCreateTableWithExistingLocationEnabled; private boolean registerTableProcedureEnabled; + private int defaultReaderVersion = MIN_READER_VERSION; + private int defaultWriterVersion = MIN_WRITER_VERSION; public Duration getMetadataCacheTtl() { @@ -475,4 +484,34 @@ public DeltaLakeConfig setRegisterTableProcedureEnabled(boolean registerTablePro this.registerTableProcedureEnabled = registerTableProcedureEnabled; return this; } + + @Min(value = MIN_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION) + @Max(value = MAX_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION) + public int getDefaultReaderVersion() + { + return defaultReaderVersion; + } + + @Config("delta.default-reader-version") + @ConfigDescription("The default reader version used by new tables") + public DeltaLakeConfig setDefaultReaderVersion(int defaultReaderVersion) + { + this.defaultReaderVersion = defaultReaderVersion; + return this; + } + + @Min(value = MIN_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION) + @Max(value = MAX_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION) + public int getDefaultWriterVersion() + { + return defaultWriterVersion; + } + + @Config("delta.default-writer-version") + @ConfigDescription("The default writer version used by new tables") + public DeltaLakeConfig setDefaultWriterVersion(int defaultWriterVersion) + { + this.defaultWriterVersion = defaultWriterVersion; + return this; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 10e910e3ccef..65922784e667 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -171,6 +171,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED; +import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName; @@ -286,13 +287,6 @@ public class DeltaLakeMetadata public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN"; public static final String ISOLATION_LEVEL = "WriteSerializable"; - // The required reader and writer versions used by tables created by Trino - public static final int MIN_READER_VERSION = 1; - public static final int MIN_WRITER_VERSION = 2; - // The highest reader and writer versions Trino supports writing to - public static final int MAX_READER_VERSION = 2; - public static final int MAX_WRITER_VERSION = 4; - private static final int CDF_SUPPORTED_WRITER_VERSION = 4; // Matches the dummy column Databricks stores in the metastore @@ -326,6 +320,8 @@ public class DeltaLakeMetadata private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; private final boolean allowManagedTableRename; + private final int defaultReaderVersion; + private final int defaultWriterVersion; public DeltaLakeMetadata( DeltaLakeMetastore metastore, @@ -346,7 +342,9 @@ public DeltaLakeMetadata( DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider, ExtendedStatisticsAccess statisticsAccess, boolean useUniqueTableLocation, - boolean allowManagedTableRename) + boolean allowManagedTableRename, + int defaultReaderVersion, + int defaultWriterVersion) { this.metastore = requireNonNull(metastore, "metastore is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); @@ -368,6 +366,8 @@ public DeltaLakeMetadata( this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; this.useUniqueTableLocation = useUniqueTableLocation; this.allowManagedTableRename = allowManagedTableRename; + this.defaultReaderVersion = defaultReaderVersion; + this.defaultWriterVersion = defaultWriterVersion; } @Override @@ -1776,8 +1776,8 @@ private ProtocolEntry getProtocolEntry(Map properties) } } return new ProtocolEntry( - readerVersion.orElse(MIN_READER_VERSION), - writerVersion.orElse(MIN_WRITER_VERSION)); + readerVersion.orElse(defaultReaderVersion), + writerVersion.orElse(defaultWriterVersion)); } private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional checkpointInterval, long newVersion) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java index 9f9bec41701c..3f942574f80b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java @@ -58,6 +58,8 @@ public class DeltaLakeMetadataFactory private final long perTransactionMetastoreCacheMaximumSize; private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; + private final int defaultReaderVersion; + private final int defaultWriterVersion; private final boolean allowManagedTableRename; private final String trinoVersion; @@ -100,6 +102,8 @@ public DeltaLakeMetadataFactory( this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize(); this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback(); this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation(); + this.defaultReaderVersion = deltaLakeConfig.getDefaultReaderVersion(); + this.defaultWriterVersion = deltaLakeConfig.getDefaultWriterVersion(); this.allowManagedTableRename = allowManagedTableRename; this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); } @@ -141,6 +145,8 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) deltaLakeRedirectionsProvider, statisticsAccess, useUniqueTableLocation, - allowManagedTableRename); + allowManagedTableRename, + defaultReaderVersion, + defaultWriterVersion); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java index aa1c900b5016..a77f7fcb4ea1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java @@ -26,10 +26,10 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_WRITER_VERSION; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.session.PropertyMetadata.booleanProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 93ee7fade662..fe4f8c84a6c6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -19,6 +19,10 @@ import io.trino.plugin.hive.HiveCompressionCodec; import org.testng.annotations.Test; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import java.lang.annotation.Annotation; import java.util.Map; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -26,6 +30,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.testing.ValidationAssertions.assertFailsValidation; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone; import static java.util.concurrent.TimeUnit.DAYS; @@ -67,7 +72,9 @@ public void testDefaults() .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) .setUniqueTableLocation(true) .setLegacyCreateTableWithExistingLocationEnabled(false) - .setRegisterTableProcedureEnabled(false)); + .setRegisterTableProcedureEnabled(false) + .setDefaultReaderVersion(1) + .setDefaultWriterVersion(2)); } @Test @@ -103,6 +110,8 @@ public void testExplicitPropertyMappings() .put("delta.unique-table-location", "false") .put("delta.legacy-create-table-with-existing-location.enabled", "true") .put("delta.register-table-procedure.enabled", "true") + .put("delta.default-reader-version", "2") + .put("delta.default-writer-version", "3") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -134,8 +143,29 @@ public void testExplicitPropertyMappings() .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)) .setUniqueTableLocation(false) .setLegacyCreateTableWithExistingLocationEnabled(true) - .setRegisterTableProcedureEnabled(true); + .setRegisterTableProcedureEnabled(true) + .setDefaultReaderVersion(2) + .setDefaultWriterVersion(3); assertFullMapping(properties, expected); } + + @Test + public void testValidation() + { + assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(0), Min.class); + assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(3), Max.class); + assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(1), Min.class); + assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(5), Max.class); + } + + private void assertFailsReaderVersionValidation(DeltaLakeConfig config, Class annotation) + { + assertFailsValidation(config, "defaultReaderVersion", "Must be in between 1 and 2", annotation); + } + + private void assertFailsWriterVersionValidation(DeltaLakeConfig config, Class annotation) + { + assertFailsValidation(config, "defaultWriterVersion", "Must be in between 2 and 4", annotation); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 5efc02fa2a34..6f9730ff768b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -53,8 +53,6 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.spi.type.BigintType.BIGINT; @@ -163,7 +161,7 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter true, Optional.empty(), Optional.of(false), - new ProtocolEntry(MIN_READER_VERSION, MIN_WRITER_VERSION)); + new ProtocolEntry(deltaLakeConfig.getDefaultReaderVersion(), deltaLakeConfig.getDefaultWriterVersion())); DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider( new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),