diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index ee28e067314c..43526a7dfa78 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -28,6 +28,7 @@ import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; @@ -58,6 +59,7 @@ import static io.trino.plugin.deltalake.procedure.Procedures.checkProcedureArgument; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; import static java.lang.invoke.MethodHandles.lookup; @@ -67,6 +69,8 @@ public class VacuumProcedure implements Provider { + private static final int MAX_SUPPORTED_WRITER_VERSION = 3; + private static final int MAX_SUPPORTED_READER_VERSION = 2; private static final Logger log = Logger.get(VacuumProcedure.class); private static final MethodHandle VACUUM; @@ -174,6 +178,18 @@ private void doVacuum( String commonPathPrefix = tableLocation + "/"; String queryId = session.getQueryId(); + ProtocolEntry protocolEntry = metadata.getMetastore().getProtocol(session, tableSnapshot); + if (protocolEntry.getMinWriterVersion() > MAX_SUPPORTED_WRITER_VERSION) { + throw new TrinoException( + NOT_SUPPORTED, + format("Table %s.%s requires Delta Lake writer version %d which is not supported", schema, table, protocolEntry.getMinWriterVersion())); + } + if (protocolEntry.getMinReaderVersion() > MAX_SUPPORTED_READER_VERSION) { + throw new TrinoException( + NOT_SUPPORTED, + format("Table %s.%s requires Delta Lake reader version %d which is not supported", schema, table, protocolEntry.getMinReaderVersion())); + } + // Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent"). // Any remaining file are not live, and not needed to read any "recent" snapshot. List recentVersions = transactionLogAccess.getPastTableVersions(fileSystem, transactionLogDir, threshold, tableSnapshot.getVersion()); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeVacuumCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeVacuumCompatibility.java new file mode 100644 index 000000000000..56f596453f89 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeVacuumCompatibility.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake; + +import org.testng.annotations.Test; + +import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onDelta; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestDeltaLakeVacuumCompatibility + extends BaseTestDeltaLakeS3Storage +{ + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testVacuumOnUnsupportedTableVersion() + { + String tableName = "test_dl_create_table_compat_" + randomTableSuffix(); + String tableDirectory = "databricks-compatibility-test-" + tableName; + + onDelta().executeQuery(format("" + + "CREATE TABLE default.%s (col int) " + + "USING DELTA LOCATION 's3://%s/%s'" + + "TBLPROPERTIES ('delta.minWriterVersion'='4')", + tableName, + bucketName, + tableDirectory)); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES 1, 2, 3, 4"); + + assertThatThrownBy(() -> onTrino().executeQuery("CALL delta.system.vacuum('default', '" + tableName + "', '7d')")) + .hasMessageContaining("Table default." + tableName + " requires Delta Lake writer version 4 which is not supported"); + } +}