Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public void pruneWorkerCatalogs()
List<CatalogHandle> activeCatalogs = getActiveCatalogs();
pruneWorkerCatalogs(online, activeCatalogs);

// prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs
connectorServicesProvider.pruneCatalogs(ImmutableSet.of());
// prune inactive catalogs locally
connectorServicesProvider.pruneCatalogs(ImmutableSet.copyOf(activeCatalogs));
}

void pruneWorkerCatalogs(Set<URI> online, List<CatalogHandle> activeCatalogs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -145,6 +146,7 @@ public class MockConnector

private final List<PropertyMetadata<?>> sessionProperties;
private final Function<ConnectorMetadata, ConnectorMetadata> metadataWrapper;
private final Consumer<ConnectorSession> cleanupQuery;
private final Function<ConnectorSession, List<String>> listSchemaNames;
private final BiFunction<ConnectorSession, String, List<String>> listTables;
private final Optional<BiFunction<ConnectorSession, SchemaTablePrefix, Iterator<TableColumnsMetadata>>> streamTableColumns;
Expand Down Expand Up @@ -199,6 +201,7 @@ public class MockConnector
MockConnector(
List<PropertyMetadata<?>> sessionProperties,
Function<ConnectorMetadata, ConnectorMetadata> metadataWrapper,
Consumer<ConnectorSession> cleanupQuery,
Function<ConnectorSession, List<String>> listSchemaNames,
BiFunction<ConnectorSession, String, List<String>> listTables,
Optional<BiFunction<ConnectorSession, SchemaTablePrefix, Iterator<TableColumnsMetadata>>> streamTableColumns,
Expand Down Expand Up @@ -252,6 +255,7 @@ public class MockConnector
{
this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null"));
this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null");
this.cleanupQuery = requireNonNull(cleanupQuery, "cleanupQuery is null");
this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null");
this.listTables = requireNonNull(listTables, "listTables is null");
this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null");
Expand Down Expand Up @@ -1017,6 +1021,12 @@ private MockConnectorAccessControl getMockAccessControl()
{
return (MockConnectorAccessControl) getAccessControl();
}

@Override
public void cleanupQuery(ConnectorSession session)
{
cleanupQuery.accept(session);
}
}

private static class MockPageSinkProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
Expand All @@ -96,6 +97,7 @@ public class MockConnectorFactory
private final String name;
private final List<PropertyMetadata<?>> sessionProperties;
private final Function<ConnectorMetadata, ConnectorMetadata> metadataWrapper;
private final Consumer<ConnectorSession> cleanupQuery;
private final Function<ConnectorSession, List<String>> listSchemaNames;
private final BiFunction<ConnectorSession, String, List<String>> listTables;
private final Optional<BiFunction<ConnectorSession, SchemaTablePrefix, Iterator<TableColumnsMetadata>>> streamTableColumns;
Expand Down Expand Up @@ -154,6 +156,7 @@ private MockConnectorFactory(
String name,
List<PropertyMetadata<?>> sessionProperties,
Function<ConnectorMetadata, ConnectorMetadata> metadataWrapper,
Consumer<ConnectorSession> cleanupQuery,
Function<ConnectorSession, List<String>> listSchemaNames,
BiFunction<ConnectorSession, String, List<String>> listTables,
Optional<BiFunction<ConnectorSession, SchemaTablePrefix, Iterator<TableColumnsMetadata>>> streamTableColumns,
Expand Down Expand Up @@ -208,6 +211,7 @@ private MockConnectorFactory(
this.name = requireNonNull(name, "name is null");
this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null"));
this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null");
this.cleanupQuery = requireNonNull(cleanupQuery, "cleanupQuery is null");
this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null");
this.listTables = requireNonNull(listTables, "listTables is null");
this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null");
Expand Down Expand Up @@ -272,6 +276,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
return new MockConnector(
sessionProperties,
metadataWrapper,
cleanupQuery,
listSchemaNames,
listTables,
streamTableColumns,
Expand Down Expand Up @@ -421,6 +426,7 @@ public static final class Builder
{
private String name = "mock";
private final List<PropertyMetadata<?>> sessionProperties = new ArrayList<>();
private Consumer<ConnectorSession> cleanupQuery = session -> {};
private Function<ConnectorMetadata, ConnectorMetadata> metadataWrapper = identity();
private Function<ConnectorSession, List<String>> listSchemaNames = defaultListSchemaNames();
private BiFunction<ConnectorSession, String, List<String>> listTables = defaultListTables();
Expand Down Expand Up @@ -501,6 +507,12 @@ public Builder withSessionProperties(Iterable<PropertyMetadata<?>> sessionProper
return this;
}

public Builder withCleanupQuery(Consumer<ConnectorSession> cleanupQuery)
{
this.cleanupQuery = cleanupQuery;
return this;
}

public Builder withMetadataWrapper(Function<ConnectorMetadata, ConnectorMetadata> metadataWrapper)
{
this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null");
Expand Down Expand Up @@ -843,6 +855,7 @@ public MockConnectorFactory build()
name,
sessionProperties,
metadataWrapper,
cleanupQuery,
listSchemaNames,
listTables,
streamTableColumns,
Expand Down
160 changes: 160 additions & 0 deletions core/trino-main/src/test/java/io/trino/sql/query/TestDropCatalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.query;

import io.airlift.units.Duration;
import io.trino.client.NodeVersion;
import io.trino.connector.MockConnectorFactory;
import io.trino.connector.MockConnectorPlugin;
import io.trino.connector.MockConnectorTableHandle;
import io.trino.execution.QueryStateMachine;
import io.trino.execution.warnings.WarningCollector;
import io.trino.spi.TrinoException;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.testing.QueryRunner;
import io.trino.testing.StandaloneQueryRunner;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;

import java.net.URI;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.trino.SessionTestUtils.TEST_SESSION;
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.testing.TestingSession.testSession;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;

@TestInstance(PER_CLASS)
@Execution(CONCURRENT)
public class TestDropCatalog
{
private QueryAssertions queryAssertions;

@BeforeAll
public void setUp()
{
Duration catalogPruneInterval = new Duration(1, SECONDS); // lowest allowed
QueryRunner queryRunner = new StandaloneQueryRunner(TEST_SESSION,
server -> server.addProperty("catalog.prune.update-interval", catalogPruneInterval.toString()));
queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder()
.withName("connector_with_cleanup_query")
.withSessionProperty(stringProperty(
"baz",
"test property",
null,
false))
.withGetTableHandle((_, name) -> switch (name.toString()) {
case "default.existing" -> new MockConnectorTableHandle(name);
default -> {
throw new TrinoException(NOT_FOUND, "Table not found: " + name);
}
})
.withCleanupQuery(session -> {
// Increase chances of a bad interleaving of threads for the test to be rather deterministic
try {
Thread.sleep(catalogPruneInterval.toMillis());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// simulate cleanupQuery that checks session state
session.getProperty("baz", String.class);
})
.build()));
this.queryAssertions = new QueryAssertions(queryRunner);
}

@AfterAll
public void tearDown()
{
if (queryAssertions != null) {
queryAssertions.close();
}
queryAssertions = null;
}

@Test
void testDropCatalogWithCleanupQuery()
{
QueryRunner queryRunner = queryAssertions.getQueryRunner();
String catalogName = "catalog_with_cleanup_query";
queryRunner.createCatalog(catalogName, "connector_with_cleanup_query");
assertCatalogExists(catalogName);

queryRunner.execute("DROP CATALOG " + catalogName);

assertCatalogDoesNotExist(catalogName);
}

private void assertCatalogExists(String catalogName)
{
QueryRunner queryRunner = queryAssertions.getQueryRunner();

QueryStateMachine fakeQuery = createNewQuery();
assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(fakeQuery.getSession(), catalogName)).isTrue();

assertThat(queryAssertions.query("SHOW SCHEMAS FROM " + catalogName))
.containsAll("VALUES VARCHAR 'information_schema'");

assertThat(queryAssertions.query("SELECT * FROM " + catalogName + ".default.existing"))
.returnsEmptyResult();
}

private void assertCatalogDoesNotExist(String catalogName)
{
QueryRunner queryRunner = queryAssertions.getQueryRunner();

QueryStateMachine fakeQuery = createNewQuery();
assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(fakeQuery.getSession(), catalogName)).isFalse();

assertThat(queryAssertions.query("SHOW SCHEMAS FROM " + catalogName))
.failure().hasMessage("Catalog '%s' not found".formatted(catalogName));

assertThat(queryAssertions.query("SELECT * FROM " + catalogName + ".default.existing"))
.failure().hasMessageContaining("Catalog '%s' not found".formatted(catalogName));
}

private QueryStateMachine createNewQuery()
{
QueryRunner queryRunner = queryAssertions.getQueryRunner();
return QueryStateMachine.begin(
Optional.empty(),
"test",
Optional.empty(),
testSession(queryRunner.getDefaultSession()),
URI.create("fake://uri"),
new ResourceGroupId("test"),
false,
queryRunner.getTransactionManager(),
queryRunner.getAccessControl(),
directExecutor(),
queryRunner.getPlannerContext().getMetadata(),
WarningCollector.NOOP,
createPlanOptimizersStatsCollector(),
Optional.empty(),
true,
Optional.empty(),
new NodeVersion("test"));
}
}