From 5462a0097f32c49cf7350823d41b212d9bc3d89b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 21 Oct 2025 10:04:00 +0200 Subject: [PATCH 1/2] Clean MockConnector properties setup - consistent field name (plural `sessionProperties`) - make field order more consistent --- .../src/test/java/io/trino/connector/MockConnector.java | 6 +++--- .../java/io/trino/connector/MockConnectorFactory.java | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 7ca221e0abab..33c9653fb12f 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -143,6 +143,7 @@ public class MockConnector private static final String UPDATE_ROW_ID = "update_row_id"; private static final String MERGE_ROW_ID = "merge_row_id"; + private final List> sessionProperties; private final Function metadataWrapper; private final Function> listSchemaNames; private final BiFunction> listTables; @@ -188,7 +189,6 @@ public class MockConnector private final Supplier>> schemaProperties; private final Supplier>> tableProperties; private final Supplier>> columnProperties; - private final List> sessionProperties; private final Function tableFunctionSplitsSources; private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; @@ -197,8 +197,8 @@ public class MockConnector private final boolean allowSplittingReadIntoMultipleSubQueries; MockConnector( - Function metadataWrapper, List> sessionProperties, + Function metadataWrapper, Function> listSchemaNames, BiFunction> listTables, Optional>> streamTableColumns, @@ -250,8 +250,8 @@ public class MockConnector Supplier> capabilities, boolean allowSplittingReadIntoMultipleSubQueries) { - this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); + this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); this.listTables = requireNonNull(listTables, "listTables is null"); this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index 1fc09c5756c3..38c238aca880 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -94,7 +94,7 @@ public class MockConnectorFactory implements ConnectorFactory { private final String name; - private final List> sessionProperty; + private final List> sessionProperties; private final Function metadataWrapper; private final Function> listSchemaNames; private final BiFunction> listTables; @@ -152,7 +152,7 @@ public class MockConnectorFactory private MockConnectorFactory( String name, - List> sessionProperty, + List> sessionProperties, Function metadataWrapper, Function> listSchemaNames, BiFunction> listTables, @@ -206,7 +206,7 @@ private MockConnectorFactory( boolean allowSplittingReadIntoMultipleSubQueries) { this.name = requireNonNull(name, "name is null"); - this.sessionProperty = ImmutableList.copyOf(requireNonNull(sessionProperty, "sessionProperty is null")); + this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); this.listTables = requireNonNull(listTables, "listTables is null"); @@ -270,8 +270,8 @@ public String getName() public Connector create(String catalogName, Map config, ConnectorContext context) { return new MockConnector( + sessionProperties, metadataWrapper, - sessionProperty, listSchemaNames, listTables, streamTableColumns, From 3d9a0ab12fd310d37980660bc7d7e68628f6db8d Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 21 Oct 2025 10:01:56 +0200 Subject: [PATCH 2/2] Fix DROP CATALOG spurious failure if cleanupQuery implemented Before the change, DROP CATALOG could fail if the metadata of the catalog being dropped implements `cleanupQuery` and the `cleanupQuery` checks catalog session properties. This was because inspecting session properties requires catalog to exist and that could have been pruned away. This fixes coordinator catalog pruning to retain catalogs used in active transactions. --- .../io/trino/connector/CatalogPruneTask.java | 4 +- .../io/trino/connector/MockConnector.java | 10 ++ .../trino/connector/MockConnectorFactory.java | 13 ++ .../io/trino/sql/query/TestDropCatalog.java | 160 ++++++++++++++++++ 4 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java diff --git a/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java b/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java index fbad249b5db7..3dc6b650d0bc 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java +++ b/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java @@ -137,8 +137,8 @@ public void pruneWorkerCatalogs() List activeCatalogs = getActiveCatalogs(); pruneWorkerCatalogs(online, activeCatalogs); - // prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs - connectorServicesProvider.pruneCatalogs(ImmutableSet.of()); + // prune inactive catalogs locally + connectorServicesProvider.pruneCatalogs(ImmutableSet.copyOf(activeCatalogs)); } void pruneWorkerCatalogs(Set online, List activeCatalogs) diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 33c9653fb12f..bd6a743009b6 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -118,6 +118,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -145,6 +146,7 @@ public class MockConnector private final List> sessionProperties; private final Function metadataWrapper; + private final Consumer cleanupQuery; private final Function> listSchemaNames; private final BiFunction> listTables; private final Optional>> streamTableColumns; @@ -199,6 +201,7 @@ public class MockConnector MockConnector( List> sessionProperties, Function metadataWrapper, + Consumer cleanupQuery, Function> listSchemaNames, BiFunction> listTables, Optional>> streamTableColumns, @@ -252,6 +255,7 @@ public class MockConnector { this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); + this.cleanupQuery = requireNonNull(cleanupQuery, "cleanupQuery is null"); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); this.listTables = requireNonNull(listTables, "listTables is null"); this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); @@ -1017,6 +1021,12 @@ private MockConnectorAccessControl getMockAccessControl() { return (MockConnectorAccessControl) getAccessControl(); } + + @Override + public void cleanupQuery(ConnectorSession session) + { + cleanupQuery.accept(session); + } } private static class MockPageSinkProvider diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index 38c238aca880..e5edbc1d059d 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -78,6 +78,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -96,6 +97,7 @@ public class MockConnectorFactory private final String name; private final List> sessionProperties; private final Function metadataWrapper; + private final Consumer cleanupQuery; private final Function> listSchemaNames; private final BiFunction> listTables; private final Optional>> streamTableColumns; @@ -154,6 +156,7 @@ private MockConnectorFactory( String name, List> sessionProperties, Function metadataWrapper, + Consumer cleanupQuery, Function> listSchemaNames, BiFunction> listTables, Optional>> streamTableColumns, @@ -208,6 +211,7 @@ private MockConnectorFactory( this.name = requireNonNull(name, "name is null"); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); + this.cleanupQuery = requireNonNull(cleanupQuery, "cleanupQuery is null"); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); this.listTables = requireNonNull(listTables, "listTables is null"); this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); @@ -272,6 +276,7 @@ public Connector create(String catalogName, Map config, Connecto return new MockConnector( sessionProperties, metadataWrapper, + cleanupQuery, listSchemaNames, listTables, streamTableColumns, @@ -421,6 +426,7 @@ public static final class Builder { private String name = "mock"; private final List> sessionProperties = new ArrayList<>(); + private Consumer cleanupQuery = session -> {}; private Function metadataWrapper = identity(); private Function> listSchemaNames = defaultListSchemaNames(); private BiFunction> listTables = defaultListTables(); @@ -501,6 +507,12 @@ public Builder withSessionProperties(Iterable> sessionProper return this; } + public Builder withCleanupQuery(Consumer cleanupQuery) + { + this.cleanupQuery = cleanupQuery; + return this; + } + public Builder withMetadataWrapper(Function metadataWrapper) { this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); @@ -843,6 +855,7 @@ public MockConnectorFactory build() name, sessionProperties, metadataWrapper, + cleanupQuery, listSchemaNames, listTables, streamTableColumns, diff --git a/core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java b/core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java new file mode 100644 index 000000000000..b2a2dae3416c --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java @@ -0,0 +1,160 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.query; + +import io.airlift.units.Duration; +import io.trino.client.NodeVersion; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; +import io.trino.connector.MockConnectorTableHandle; +import io.trino.execution.QueryStateMachine; +import io.trino.execution.warnings.WarningCollector; +import io.trino.spi.TrinoException; +import io.trino.spi.resourcegroups.ResourceGroupId; +import io.trino.testing.QueryRunner; +import io.trino.testing.StandaloneQueryRunner; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.net.URI; +import java.util.Optional; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; +import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.testing.TestingSession.testSession; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; + +@TestInstance(PER_CLASS) +@Execution(CONCURRENT) +public class TestDropCatalog +{ + private QueryAssertions queryAssertions; + + @BeforeAll + public void setUp() + { + Duration catalogPruneInterval = new Duration(1, SECONDS); // lowest allowed + QueryRunner queryRunner = new StandaloneQueryRunner(TEST_SESSION, + server -> server.addProperty("catalog.prune.update-interval", catalogPruneInterval.toString())); + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withName("connector_with_cleanup_query") + .withSessionProperty(stringProperty( + "baz", + "test property", + null, + false)) + .withGetTableHandle((_, name) -> switch (name.toString()) { + case "default.existing" -> new MockConnectorTableHandle(name); + default -> { + throw new TrinoException(NOT_FOUND, "Table not found: " + name); + } + }) + .withCleanupQuery(session -> { + // Increase chances of a bad interleaving of threads for the test to be rather deterministic + try { + Thread.sleep(catalogPruneInterval.toMillis()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + // simulate cleanupQuery that checks session state + session.getProperty("baz", String.class); + }) + .build())); + this.queryAssertions = new QueryAssertions(queryRunner); + } + + @AfterAll + public void tearDown() + { + if (queryAssertions != null) { + queryAssertions.close(); + } + queryAssertions = null; + } + + @Test + void testDropCatalogWithCleanupQuery() + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + String catalogName = "catalog_with_cleanup_query"; + queryRunner.createCatalog(catalogName, "connector_with_cleanup_query"); + assertCatalogExists(catalogName); + + queryRunner.execute("DROP CATALOG " + catalogName); + + assertCatalogDoesNotExist(catalogName); + } + + private void assertCatalogExists(String catalogName) + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + + QueryStateMachine fakeQuery = createNewQuery(); + assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(fakeQuery.getSession(), catalogName)).isTrue(); + + assertThat(queryAssertions.query("SHOW SCHEMAS FROM " + catalogName)) + .containsAll("VALUES VARCHAR 'information_schema'"); + + assertThat(queryAssertions.query("SELECT * FROM " + catalogName + ".default.existing")) + .returnsEmptyResult(); + } + + private void assertCatalogDoesNotExist(String catalogName) + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + + QueryStateMachine fakeQuery = createNewQuery(); + assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(fakeQuery.getSession(), catalogName)).isFalse(); + + assertThat(queryAssertions.query("SHOW SCHEMAS FROM " + catalogName)) + .failure().hasMessage("Catalog '%s' not found".formatted(catalogName)); + + assertThat(queryAssertions.query("SELECT * FROM " + catalogName + ".default.existing")) + .failure().hasMessageContaining("Catalog '%s' not found".formatted(catalogName)); + } + + private QueryStateMachine createNewQuery() + { + QueryRunner queryRunner = queryAssertions.getQueryRunner(); + return QueryStateMachine.begin( + Optional.empty(), + "test", + Optional.empty(), + testSession(queryRunner.getDefaultSession()), + URI.create("fake://uri"), + new ResourceGroupId("test"), + false, + queryRunner.getTransactionManager(), + queryRunner.getAccessControl(), + directExecutor(), + queryRunner.getPlannerContext().getMetadata(), + WarningCollector.NOOP, + createPlanOptimizersStatsCollector(), + Optional.empty(), + true, + Optional.empty(), + new NodeVersion("test")); + } +}