diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 5e45c736f040..3fbec2c1171a 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -580,6 +580,17 @@ table in the metastore, using its existing transaction logs and data files:: To prevent unauthorized users from accessing data, this procedure is disabled by default. The procedure is enabled only when ``delta.register-table-procedure.enabled`` is set to ``true``. +.. _delta-lake-unregister-table: + +Unregister table +^^^^^^^^^^^^^^^^ +The connector can unregister existing Delta Lake tables from the metastore. + +The procedure ``system.unregister_table`` allows the caller to unregister an +existing Delta Lake table from the metastores without deleting the data:: + + CALL example.system.unregister_table(schema_name => 'testdb', table_name => 'customer_orders') + .. _delta-lake-write-support: Updating data diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java index 4f0ad8a596fa..bbd0f7bace91 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java @@ -27,6 +27,7 @@ import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure; import io.trino.plugin.deltalake.procedure.OptimizeTableProcedure; import io.trino.plugin.deltalake.procedure.RegisterTableProcedure; +import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure; import io.trino.plugin.deltalake.procedure.VacuumProcedure; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.ForCachingExtendedStatisticsAccess; @@ -145,6 +146,7 @@ public void setup(Binder binder) procedures.addBinding().toProvider(DropExtendedStatsProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(VacuumProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON); Multibinder tableProcedures = newSetBinder(binder, TableProcedureMetadata.class); tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java new file mode 100644 index 000000000000..238c00b4f097 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.procedure; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; +import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.procedure.Procedure; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.lang.invoke.MethodHandle; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; + +public class UnregisterTableProcedure + implements Provider +{ + private static final MethodHandle UNREGISTER_TABLE; + + private static final String PROCEDURE_NAME = "unregister_table"; + private static final String SYSTEM_SCHEMA = "system"; + + private static final String SCHEMA_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME = "TABLE_NAME"; + + static { + try { + UNREGISTER_TABLE = lookup().unreflect(UnregisterTableProcedure.class.getMethod("unregisterTable", ConnectorAccessControl.class, ConnectorSession.class, String.class, String.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final DeltaLakeMetadataFactory metadataFactory; + + @Inject + public UnregisterTableProcedure(DeltaLakeMetadataFactory metadataFactory) + { + this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + SYSTEM_SCHEMA, + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(SCHEMA_NAME, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR)), + UNREGISTER_TABLE.bindTo(this)); + } + + public void unregisterTable(ConnectorAccessControl accessControl, ConnectorSession session, String schemaName, String tableName) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doUnregisterTable(accessControl, session, schemaName, tableName); + } + } + + private void doUnregisterTable(ConnectorAccessControl accessControl, ConnectorSession session, String schemaName, String tableName) + { + checkProcedureArgument(!isNullOrEmpty(schemaName), "schema_name cannot be null or empty"); + checkProcedureArgument(!isNullOrEmpty(tableName), "table_name cannot be null or empty"); + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + + accessControl.checkCanDropTable(null, schemaTableName); + DeltaLakeMetastore metastore = metadataFactory.create(session.getIdentity()).getMetastore(); + + if (metastore.getDatabase(schemaName).isEmpty()) { + throw new SchemaNotFoundException(schemaTableName.getSchemaName()); + } + + metastore.dropTable(session, schemaName, tableName, false); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index a948114485e7..fa73ec1564ee 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -37,6 +37,7 @@ import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.minio.MinioClient; import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; import org.testng.SkipException; @@ -62,6 +63,7 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DELETE_TABLE; +import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_TABLE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.INSERT_TABLE; import static io.trino.testing.TestingAccessControlManager.privilege; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DELETE; @@ -1819,6 +1821,98 @@ public void testRegisterTable() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testUnregisterTable() + { + String tableName = "test_unregister_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 a", 1); + String tableLocation = getTableLocation(tableName); + + assertUpdate("CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')"); + assertQueryFails("SELECT * FROM " + tableName, ".* Table .* does not exist"); + + assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testUnregisterBrokenTable() + { + String tableName = "test_unregister_broken_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 a", 1); + String tableLocation = getTableLocation(tableName); + + // Break the table by deleting files from the storage + String key = tableLocation.substring(bucketUrl().length()); + MinioClient minio = hiveMinioDataLake.getMinioClient(); + for (String file : minio.listObjects(bucketName, key)) { + minio.removeObject(bucketName, file); + } + assertThat(minio.listObjects(bucketName, key)).isEmpty(); + + // Verify unregister_table successfully deletes the table from metastore + assertUpdate("CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')"); + assertQueryFails("SELECT * FROM " + tableName, ".* Table .* does not exist"); + } + + @Test + public void testUnregisterTableNotExistingSchema() + { + String schemaName = "test_unregister_table_not_existing_schema_" + randomNameSuffix(); + assertQueryFails( + "CALL system.unregister_table('" + schemaName + "', 'non_existent_table')", + "Schema " + schemaName + " not found"); + } + + @Test + public void testUnregisterTableNotExistingTable() + { + String tableName = "test_unregister_table_not_existing_table_" + randomNameSuffix(); + assertQueryFails( + "CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')", + "Table .* not found"); + } + + @Test + public void testRepeatUnregisterTable() + { + String tableName = "test_repeat_unregister_table_not_" + randomNameSuffix(); + assertQueryFails( + "CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')", + "Table .* not found"); + + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 a", 1); + String tableLocation = getTableLocation(tableName); + + assertUpdate("CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')"); + + // Verify failure the procedure can't unregister the tables more than once + assertQueryFails("CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')", "Table .* not found"); + + assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testUnregisterTableAccessControl() + { + String tableName = "test_unregister_table_access_control_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 a", 1); + + assertAccessDenied( + "CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')", + "Cannot drop table .*", + privilege(tableName, DROP_TABLE)); + + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + assertUpdate("DROP TABLE " + tableName); + } + private Set getActiveFiles(String tableName) { return getActiveFiles(tableName, getQueryRunner().getDefaultSession()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java index 585f7f3a304b..3aa852fb6743 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java @@ -307,6 +307,23 @@ public void testRegisterTableWithInvalidParameter() ".*table_location cannot be null or empty.*"); } + @Test + public void testRegisterUnregisteredTable() + { + // Verify register_table procedure can register the unregistered table + String tableName = "test_unregister_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 a", 1); + String tableLocation = getTableLocation(tableName); + + assertUpdate("CALL system.unregister_table(CURRENT_SCHEMA, '" + tableName + "')"); + assertQueryFails("SELECT * FROM " + tableName, ".* Table .* does not exist"); + + assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + } + protected String getTableLocation(String tableName) { Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java new file mode 100644 index 000000000000..a5896ddafaee --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake; + +import io.trino.testng.services.Flaky; +import org.testng.annotations.Test; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.utils.QueryExecutors.onDelta; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; + +public class TestDeltaLakeProceduresCompatibility + extends BaseTestDeltaLakeS3Storage +{ + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testUnregisterTable() + { + String tableName = "test_dl_unregister_table" + randomNameSuffix(); + String tableDirectory = "databricks-compatibility-test-" + tableName; + + onTrino().executeQuery(format("CREATE TABLE delta.default.%s WITH (location = 's3://%s/%s') AS SELECT 123 AS col", + tableName, + bucketName, + tableDirectory)); + try { + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(row(123)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(row(123)); + + onTrino().executeQuery("CALL delta.system.unregister_table('default', '" + tableName + "')"); + + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasMessageMatching(".* Table '.*' does not exist"); + assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM default." + tableName)) + .hasMessageMatching("(?s).* Table or view not found: .*"); + } + finally { + onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName); + } + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeCompatibility.java index 85614f96b486..05f6a7580f72 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeCompatibility.java @@ -27,6 +27,7 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; public class TestHiveAndDeltaLakeCompatibility extends ProductTest @@ -53,4 +54,18 @@ public void testInformationSchemaColumnsOnPresenceOfHiveView() onTrino().executeQuery("DROP SCHEMA " + schemaName); } } + + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testUnregisterNotDeltaLakeTable() + { + String baseTableName = "test_unregister_not_delta_table_" + randomNameSuffix(); + String hiveTableName = "hive.default." + baseTableName; + + onTrino().executeQuery("CREATE TABLE " + hiveTableName + " AS SELECT 1 a"); + + assertThatThrownBy(() -> onTrino().executeQuery("CALL delta.system.unregister_table('default', '" + baseTableName + "')")) + .hasMessageContaining("not a Delta Lake table"); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } }