diff --git a/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java b/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java index fbad249b5db7..3dc6b650d0bc 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java +++ b/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java @@ -137,8 +137,8 @@ public void pruneWorkerCatalogs() List activeCatalogs = getActiveCatalogs(); pruneWorkerCatalogs(online, activeCatalogs); - // prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs - connectorServicesProvider.pruneCatalogs(ImmutableSet.of()); + // prune inactive catalogs locally + connectorServicesProvider.pruneCatalogs(ImmutableSet.copyOf(activeCatalogs)); } void pruneWorkerCatalogs(Set online, List activeCatalogs) diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 7ca221e0abab..bd6a743009b6 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -118,6 +118,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -143,7 +144,9 @@ public class MockConnector private static final String UPDATE_ROW_ID = "update_row_id"; private static final String MERGE_ROW_ID = "merge_row_id"; + private final List> sessionProperties; private final Function metadataWrapper; + private final Consumer cleanupQuery; private final Function> listSchemaNames; private final BiFunction> listTables; private final Optional>> streamTableColumns; @@ -188,7 +191,6 @@ public class MockConnector private final Supplier>> schemaProperties; private final Supplier>> tableProperties; private final Supplier>> columnProperties; - private final List> sessionProperties; private final Function tableFunctionSplitsSources; private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; @@ -197,8 +199,9 @@ public class MockConnector private final boolean allowSplittingReadIntoMultipleSubQueries; MockConnector( - Function metadataWrapper, List> sessionProperties, + Function metadataWrapper, + Consumer cleanupQuery, Function> listSchemaNames, BiFunction> listTables, Optional>> streamTableColumns, @@ -250,8 +253,9 @@ public class MockConnector Supplier> capabilities, boolean allowSplittingReadIntoMultipleSubQueries) { - this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); + this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); + this.cleanupQuery = requireNonNull(cleanupQuery, "cleanupQuery is null"); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); this.listTables = requireNonNull(listTables, "listTables is null"); this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); @@ -1017,6 +1021,12 @@ private MockConnectorAccessControl getMockAccessControl() { return (MockConnectorAccessControl) getAccessControl(); } + + @Override + public void cleanupQuery(ConnectorSession session) + { + cleanupQuery.accept(session); + } } private static class MockPageSinkProvider diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index 1fc09c5756c3..e5edbc1d059d 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -78,6 +78,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -94,8 +95,9 @@ public class MockConnectorFactory implements ConnectorFactory { private final String name; - private final List> sessionProperty; + private final List> sessionProperties; private final Function metadataWrapper; + private final Consumer cleanupQuery; private final Function> listSchemaNames; private final BiFunction> listTables; private final Optional>> streamTableColumns; @@ -152,8 +154,9 @@ public class MockConnectorFactory private MockConnectorFactory( String name, - List> sessionProperty, + List> sessionProperties, Function metadataWrapper, + Consumer cleanupQuery, Function> listSchemaNames, BiFunction> listTables, Optional>> streamTableColumns, @@ -206,8 +209,9 @@ private MockConnectorFactory( boolean allowSplittingReadIntoMultipleSubQueries) { this.name = requireNonNull(name, "name is null"); - this.sessionProperty = ImmutableList.copyOf(requireNonNull(sessionProperty, "sessionProperty is null")); + this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); + this.cleanupQuery = requireNonNull(cleanupQuery, "cleanupQuery is null"); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); this.listTables = requireNonNull(listTables, "listTables is null"); this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); @@ -270,8 +274,9 @@ public String getName() public Connector create(String catalogName, Map config, ConnectorContext context) { return new MockConnector( + sessionProperties, metadataWrapper, - sessionProperty, + cleanupQuery, listSchemaNames, listTables, streamTableColumns, @@ -421,6 +426,7 @@ public static final class Builder { private String name = "mock"; private final List> sessionProperties = new ArrayList<>(); + private Consumer cleanupQuery = session -> {}; private Function metadataWrapper = identity(); private Function> listSchemaNames = defaultListSchemaNames(); private BiFunction> listTables = defaultListTables(); @@ -501,6 +507,12 @@ public Builder withSessionProperties(Iterable> sessionProper return this; } + public Builder withCleanupQuery(Consumer cleanupQuery) + { + this.cleanupQuery = cleanupQuery; + return this; + } + public Builder withMetadataWrapper(Function metadataWrapper) { this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); @@ -843,6 +855,7 @@ public MockConnectorFactory build() name, sessionProperties, metadataWrapper, + cleanupQuery, listSchemaNames, listTables, streamTableColumns, diff --git a/core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java b/core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java new file mode 100644 index 000000000000..b2a2dae3416c --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java @@ -0,0 +1,160 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.query; + +import io.airlift.units.Duration; +import io.trino.client.NodeVersion; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; +import io.trino.connector.MockConnectorTableHandle; +import io.trino.execution.QueryStateMachine; +import io.trino.execution.warnings.WarningCollector; +import io.trino.spi.TrinoException; +import io.trino.spi.resourcegroups.ResourceGroupId; +import io.trino.testing.QueryRunner; +import io.trino.testing.StandaloneQueryRunner; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.net.URI; +import java.util.Optional; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; +import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.testing.TestingSession.testSession; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; + +@TestInstance(PER_CLASS) +@Execution(CONCURRENT) +public class TestDropCatalog +{ + private QueryAssertions queryAssertions; + + @BeforeAll + public void setUp() + { + Duration catalogPruneInterval = new Duration(1, SECONDS); // lowest allowed + QueryRunner queryRunner = new StandaloneQueryRunner(TEST_SESSION, + server -> server.addProperty("catalog.prune.update-interval", catalogPruneInterval.toString())); + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withName("connector_with_cleanup_query") + .withSessionProperty(stringProperty( + "baz", + "test property", + null, + false)) + .withGetTableHandle((_, name) -> switch (name.toString()) { + case "default.existing" -> new MockConnectorTableHandle(name); + default -> { + throw new TrinoException(NOT_FOUND, "Table not found: " + name); + } + }) + .withCleanupQuery(session -> { + // Increase chances of a bad interleaving of threads for the test to be rather deterministic + try { + Thread.sleep(catalogPruneInterval.toMillis()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + // simulate cleanupQuery that checks session state + session.getProperty("baz", String.class); + }) + .build())); + this.queryAssertions = new QueryAssertions(queryRunner); + } + + @AfterAll + public void tearDown() + { + if (queryAssertions != null) { + queryAssertions.close(); + } + queryAssertions = null; + } + + @Test + void testDropCatalogWithCleanupQuery() + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + String catalogName = "catalog_with_cleanup_query"; + queryRunner.createCatalog(catalogName, "connector_with_cleanup_query"); + assertCatalogExists(catalogName); + + queryRunner.execute("DROP CATALOG " + catalogName); + + assertCatalogDoesNotExist(catalogName); + } + + private void assertCatalogExists(String catalogName) + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + + QueryStateMachine fakeQuery = createNewQuery(); + assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(fakeQuery.getSession(), catalogName)).isTrue(); + + assertThat(queryAssertions.query("SHOW SCHEMAS FROM " + catalogName)) + .containsAll("VALUES VARCHAR 'information_schema'"); + + assertThat(queryAssertions.query("SELECT * FROM " + catalogName + ".default.existing")) + .returnsEmptyResult(); + } + + private void assertCatalogDoesNotExist(String catalogName) + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + + QueryStateMachine fakeQuery = createNewQuery(); + assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(fakeQuery.getSession(), catalogName)).isFalse(); + + assertThat(queryAssertions.query("SHOW SCHEMAS FROM " + catalogName)) + .failure().hasMessage("Catalog '%s' not found".formatted(catalogName)); + + assertThat(queryAssertions.query("SELECT * FROM " + catalogName + ".default.existing")) + .failure().hasMessageContaining("Catalog '%s' not found".formatted(catalogName)); + } + + private QueryStateMachine createNewQuery() + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + return QueryStateMachine.begin( + Optional.empty(), + "test", + Optional.empty(), + testSession(queryRunner.getDefaultSession()), + URI.create("fake://uri"), + new ResourceGroupId("test"), + false, + queryRunner.getTransactionManager(), + queryRunner.getAccessControl(), + directExecutor(), + queryRunner.getPlannerContext().getMetadata(), + WarningCollector.NOOP, + createPlanOptimizersStatsCollector(), + Optional.empty(), + true, + Optional.empty(), + new NodeVersion("test")); + } +}