diff --git a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java index 8e5d603f5874..c2d4e5059a77 100644 --- a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java +++ b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java @@ -438,6 +438,10 @@ private TestingTrinoServer( spoolingConfiguration.ifPresent(config -> spoolingManagerRegistry.loadSpoolingManager(config.factoryName(), config.configuration())); + catalogStoreManager.ifPresent(CatalogStoreManager::loadConfiguredCatalogStore); + ConnectorServicesProvider connectorServicesProvider = injector.getInstance(ConnectorServicesProvider.class); + connectorServicesProvider.loadInitialCatalogs(); + EventListenerManager eventListenerManager = injector.getInstance(EventListenerManager.class); eventListeners.forEach(eventListenerManager::addEventListener); diff --git a/testing/trino-tests/src/test/java/io/trino/connector/TestDynamicCatalogs.java b/testing/trino-tests/src/test/java/io/trino/connector/TestDynamicCatalogs.java index 08d35e3d60f4..dc22052b3d32 100644 --- a/testing/trino-tests/src/test/java/io/trino/connector/TestDynamicCatalogs.java +++ b/testing/trino-tests/src/test/java/io/trino/connector/TestDynamicCatalogs.java @@ -14,12 +14,17 @@ package io.trino.connector; import com.google.common.collect.ImmutableMap; -import com.google.inject.Key; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.Session; import io.trino.plugin.memory.MemoryPlugin; +import io.trino.server.ServerConfig; import io.trino.spi.catalog.CatalogName; import io.trino.spi.catalog.CatalogProperties; import io.trino.spi.catalog.CatalogStore; +import io.trino.spi.catalog.CatalogStoreFactory; import io.trino.spi.connector.ConnectorName; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.H2QueryRunner; @@ -27,9 +32,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import static io.trino.connector.FileCatalogStore.computeCatalogVersion; import static io.trino.testing.QueryAssertions.assertQuery; import static io.trino.testing.QueryAssertions.assertQueryFails; import static io.trino.testing.QueryAssertions.assertUpdate; @@ -40,6 +50,10 @@ @Execution(SAME_THREAD) public class TestDynamicCatalogs { + private static final String BROKEN_CATALOG = "broken_catalog"; + private static final CatalogName BROKEN_CATALOG_NAME = new CatalogName(BROKEN_CATALOG); + private static final ConnectorName MEMORY_CONNECTOR_NAME = new ConnectorName("memory"); + @Test public void testNewHealthyCatalog() throws Exception @@ -51,8 +65,6 @@ public void testNewHealthyCatalog() .build(); queryRunner.installPlugin(new MemoryPlugin()); queryRunner.createCatalog("healthy_catalog", "memory", ImmutableMap.of("memory.max-data-per-node", "128MB")); - ConnectorServicesProvider connectorServicesProvider = queryRunner.getCoordinator().getInstance(new Key<>() {}); - connectorServicesProvider.loadInitialCatalogs(); H2QueryRunner h2QueryRunner = new H2QueryRunner(); assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', 'system'", false, false); @@ -72,27 +84,97 @@ public void testNewHealthyCatalog() public void testNewUnhealthyCatalog() throws Exception { - String catalogName = "new_catalog" + randomNameSuffix(); - // simulate loading an unhealthy catalog during a startup Session session = testSession(); QueryRunner queryRunner = DistributedQueryRunner.builder(session) + .setAdditionalModule(new TestCatalogStoreModule()) + .setCoordinatorProperties(ImmutableMap.of("catalog.store", "prepopulated_memory")) .setWorkerCount(0) .build(); queryRunner.installPlugin(new MemoryPlugin()); queryRunner.createCatalog("healthy_catalog", "memory", ImmutableMap.of("memory.max-data-per-node", "128MB")); H2QueryRunner h2QueryRunner = new H2QueryRunner(); - CatalogStore catalogStore = queryRunner.getCoordinator().getInstance(new Key<>() {}); - ConnectorServicesProvider connectorServicesProvider = queryRunner.getCoordinator().getInstance(new Key<>() {}); - CatalogProperties catalogProperties = catalogStore.createCatalogProperties(new CatalogName(catalogName), new ConnectorName("memory"), ImmutableMap.of("invalid", "128MB")); - catalogStore.addOrReplaceCatalog(catalogProperties); - connectorServicesProvider.loadInitialCatalogs(); - assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', '" + catalogName + "', 'system'", false, false); - assertQueryFails(queryRunner, session, "CREATE TABLE %s.default.test_table (age INT)".formatted(catalogName), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(catalogName)); - assertQueryFails(queryRunner, session, "SELECT * FROM %s.default.test_table".formatted(catalogName), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(catalogName)); - assertQueryFails(queryRunner, session, "CREATE CATALOG %s USING memory WITH (\"memory.max-data-per-node\" = '128MB')".formatted(catalogName), ".*Catalog '%s' already exists.*".formatted(catalogName)); + assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', '" + BROKEN_CATALOG + "', 'system'", false, false); + assertQueryFails(queryRunner, session, "CREATE TABLE %s.default.test_table (age INT)".formatted(BROKEN_CATALOG), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(BROKEN_CATALOG)); + assertQueryFails(queryRunner, session, "SELECT * FROM %s.default.test_table".formatted(BROKEN_CATALOG), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(BROKEN_CATALOG)); + assertQueryFails(queryRunner, session, "CREATE CATALOG %s USING memory WITH (\"memory.max-data-per-node\" = '128MB')".formatted(BROKEN_CATALOG), ".*Catalog '%s' already exists.*".formatted(BROKEN_CATALOG)); - assertUpdate(queryRunner, session, "DROP CATALOG " + catalogName, OptionalLong.empty(), Optional.empty()); + assertUpdate(queryRunner, session, "DROP CATALOG " + BROKEN_CATALOG, OptionalLong.empty(), Optional.empty()); assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', 'system'", false, false); } + + public static class TestCatalogStoreModule + extends AbstractConfigurationAwareModule + { + @Override + protected void setup(Binder binder) + { + if (buildConfigObject(ServerConfig.class).isCoordinator()) { + install(new PrepopulatedInMemoryCatalogStoreModule()); + } + } + } + + private static class PrepopulatedInMemoryCatalogStoreModule + extends AbstractConfigurationAwareModule + { + @Override + protected void setup(Binder binder) {} + + @Provides + @Singleton + public PrepopulatedInMemoryCatalogStoreFactory createDbCatalogStoreFactory(CatalogStoreManager catalogStoreManager) + { + PrepopulatedInMemoryCatalogStoreFactory factory = new PrepopulatedInMemoryCatalogStoreFactory(); + catalogStoreManager.addCatalogStoreFactory(factory); + return factory; + } + } + + private static class PrepopulatedInMemoryCatalogStoreFactory + implements CatalogStoreFactory + { + @Override + public String getName() + { + return "prepopulated_memory"; + } + + @Override + public CatalogStore create(Map config) + { + return new PrepopulatedInMemoryCatalogStore(); + } + } + + private static class PrepopulatedInMemoryCatalogStore + extends InMemoryCatalogStore + { + @Override + public Collection getCatalogs() + { + Collection catalogs = super.getCatalogs(); + List catalogsCopy = new ArrayList<>(catalogs); + catalogsCopy.add(new StoredCatalog() + { + @Override + public CatalogName name() + { + return new CatalogName("broken_catalog"); + } + + @Override + public CatalogProperties loadProperties() + { + ImmutableMap properties = ImmutableMap.of("non_existing", "false"); + return new CatalogProperties( + BROKEN_CATALOG_NAME, + computeCatalogVersion(BROKEN_CATALOG_NAME, MEMORY_CONNECTOR_NAME, properties), + MEMORY_CONNECTOR_NAME, + properties); + } + }); + return catalogsCopy; + } + } } diff --git a/testing/trino-tests/src/test/java/io/trino/connector/system/TestSystemMetadataCatalogTable.java b/testing/trino-tests/src/test/java/io/trino/connector/system/TestSystemMetadataCatalogTable.java index 0682972230ac..f1be349ce794 100644 --- a/testing/trino-tests/src/test/java/io/trino/connector/system/TestSystemMetadataCatalogTable.java +++ b/testing/trino-tests/src/test/java/io/trino/connector/system/TestSystemMetadataCatalogTable.java @@ -14,15 +14,9 @@ package io.trino.connector.system; import com.google.common.collect.ImmutableMap; -import com.google.inject.Key; import io.trino.Session; -import io.trino.connector.ConnectorServicesProvider; -import io.trino.metadata.CatalogManager; +import io.trino.connector.TestDynamicCatalogs; import io.trino.plugin.memory.MemoryPlugin; -import io.trino.spi.catalog.CatalogName; -import io.trino.spi.catalog.CatalogProperties; -import io.trino.spi.catalog.CatalogStore; -import io.trino.spi.connector.ConnectorName; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; @@ -36,23 +30,18 @@ public class TestSystemMetadataCatalogTable extends AbstractTestQueryFramework { - private CatalogStore catalogStore; - private ConnectorServicesProvider connectorServicesProvider; - private CatalogManager catalogManager; - @Override protected QueryRunner createQueryRunner() throws Exception { Session session = testSessionBuilder().build(); QueryRunner queryRunner = DistributedQueryRunner.builder(session) + .setAdditionalModule(new TestDynamicCatalogs.TestCatalogStoreModule()) + .setCoordinatorProperties(ImmutableMap.of("catalog.store", "prepopulated_memory")) .setWorkerCount(0) .build(); queryRunner.installPlugin(new MemoryPlugin()); queryRunner.createCatalog("healthy_catalog", "memory", ImmutableMap.of("memory.max-data-per-node", "128MB")); - catalogStore = queryRunner.getCoordinator().getInstance(new Key<>() {}); - connectorServicesProvider = queryRunner.getCoordinator().getInstance(new Key<>() {}); - catalogManager = queryRunner.getCoordinator().getInstance(new Key<>() {}); return queryRunner; } @@ -61,11 +50,13 @@ public void testNewCatalogStatus() { assertQuery("SELECT * FROM system.metadata.catalogs", "VALUES" + "('healthy_catalog', 'healthy_catalog', 'memory', 'OPERATIONAL'), " + + "('broken_catalog', 'broken_catalog', 'memory', 'FAILING'), " + "('system', 'system', 'system', 'OPERATIONAL')"); assertUpdate("CREATE CATALOG brain USING memory WITH (\"memory.max-data-per-node\" = '128MB')"); assertQuery("SELECT * FROM system.metadata.catalogs", "VALUES" + "('healthy_catalog', 'healthy_catalog', 'memory', 'OPERATIONAL'), " + + "('broken_catalog', 'broken_catalog', 'memory', 'FAILING'), " + "('brain', 'brain', 'memory', 'OPERATIONAL'), " + "('system', 'system', 'system', 'OPERATIONAL')"); @@ -75,14 +66,9 @@ public void testNewCatalogStatus() @Test public void testCatalogNotLoadedCorrectly() { - CatalogProperties catalogProperties = catalogStore.createCatalogProperties(new CatalogName("broken"), new ConnectorName("memory"), ImmutableMap.of("memory.max-data-per-n", "128MB")); - - catalogStore.addOrReplaceCatalog(catalogProperties); - connectorServicesProvider.loadInitialCatalogs(); assertQuery("SELECT * FROM system.metadata.catalogs", "VALUES" + "('healthy_catalog', 'healthy_catalog', 'memory', 'OPERATIONAL'), " + - "('broken', 'broken', 'memory', 'FAILING'), " + + "('broken_catalog', 'broken_catalog', 'memory', 'FAILING'), " + "('system', 'system', 'system', 'OPERATIONAL')"); - catalogManager.dropCatalog(new CatalogName("broken"), false); } }