Skip to content

Commit 4a63696

Browse files
committed
Load initial catalogs in DistributedQueryRunner
1 parent 07c32f5 commit 4a63696

File tree

3 files changed

+105
-35
lines changed

3 files changed

+105
-35
lines changed

testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.trino.Session.SessionBuilder;
3131
import io.trino.client.ClientSession;
3232
import io.trino.client.StatementClient;
33+
import io.trino.connector.ConnectorServicesProvider;
3334
import io.trino.connector.CoordinatorDynamicCatalogManager;
3435
import io.trino.cost.StatsCalculator;
3536
import io.trino.execution.FailureInjector.InjectedFailureType;
@@ -997,6 +998,7 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
997998
closeAllSuppress(e, queryRunner);
998999
throw e;
9991000
}
1001+
queryRunner.getCoordinator().getInstance(Key.get(ConnectorServicesProvider.class)).loadInitialCatalogs();
10001002

10011003
return queryRunner;
10021004
}

testing/trino-tests/src/test/java/io/trino/connector/TestDynamicCatalogs.java

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,32 @@
1414
package io.trino.connector;
1515

1616
import com.google.common.collect.ImmutableMap;
17-
import com.google.inject.Key;
17+
import com.google.inject.Binder;
18+
import com.google.inject.Provides;
19+
import com.google.inject.Singleton;
20+
import io.airlift.configuration.AbstractConfigurationAwareModule;
1821
import io.trino.Session;
1922
import io.trino.plugin.memory.MemoryPlugin;
23+
import io.trino.server.ServerConfig;
2024
import io.trino.spi.catalog.CatalogName;
2125
import io.trino.spi.catalog.CatalogProperties;
2226
import io.trino.spi.catalog.CatalogStore;
27+
import io.trino.spi.catalog.CatalogStoreFactory;
2328
import io.trino.spi.connector.ConnectorName;
2429
import io.trino.testing.DistributedQueryRunner;
2530
import io.trino.testing.H2QueryRunner;
2631
import io.trino.testing.QueryRunner;
2732
import org.junit.jupiter.api.Test;
2833
import org.junit.jupiter.api.parallel.Execution;
2934

35+
import java.util.ArrayList;
36+
import java.util.Collection;
37+
import java.util.List;
38+
import java.util.Map;
3039
import java.util.Optional;
3140
import java.util.OptionalLong;
3241

42+
import static io.trino.connector.FileCatalogStore.computeCatalogVersion;
3343
import static io.trino.testing.QueryAssertions.assertQuery;
3444
import static io.trino.testing.QueryAssertions.assertQueryFails;
3545
import static io.trino.testing.QueryAssertions.assertUpdate;
@@ -40,6 +50,10 @@
4050
@Execution(SAME_THREAD)
4151
public class TestDynamicCatalogs
4252
{
53+
private static final String BROKEN_CATALOG = "broken_catalog";
54+
private static final CatalogName BROKEN_CATALOG_NAME = new CatalogName(BROKEN_CATALOG);
55+
private static final ConnectorName MEMORY_CONNECTOR_NAME = new ConnectorName("memory");
56+
4357
@Test
4458
public void testNewHealthyCatalog()
4559
throws Exception
@@ -51,8 +65,6 @@ public void testNewHealthyCatalog()
5165
.build();
5266
queryRunner.installPlugin(new MemoryPlugin());
5367
queryRunner.createCatalog("healthy_catalog", "memory", ImmutableMap.of("memory.max-data-per-node", "128MB"));
54-
ConnectorServicesProvider connectorServicesProvider = queryRunner.getCoordinator().getInstance(new Key<>() {});
55-
connectorServicesProvider.loadInitialCatalogs();
5668
H2QueryRunner h2QueryRunner = new H2QueryRunner();
5769

5870
assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', 'system'", false, false);
@@ -72,27 +84,97 @@ public void testNewHealthyCatalog()
7284
public void testNewUnhealthyCatalog()
7385
throws Exception
7486
{
75-
String catalogName = "new_catalog" + randomNameSuffix();
76-
// simulate loading an unhealthy catalog during a startup
7787
Session session = testSession();
7888
QueryRunner queryRunner = DistributedQueryRunner.builder(session)
89+
.setAdditionalModule(new TestCatalogStoreModule())
90+
.setCoordinatorProperties(ImmutableMap.of("catalog.store", "prepopulated_memory"))
7991
.setWorkerCount(0)
8092
.build();
8193
queryRunner.installPlugin(new MemoryPlugin());
8294
queryRunner.createCatalog("healthy_catalog", "memory", ImmutableMap.of("memory.max-data-per-node", "128MB"));
8395
H2QueryRunner h2QueryRunner = new H2QueryRunner();
84-
CatalogStore catalogStore = queryRunner.getCoordinator().getInstance(new Key<>() {});
85-
ConnectorServicesProvider connectorServicesProvider = queryRunner.getCoordinator().getInstance(new Key<>() {});
86-
CatalogProperties catalogProperties = catalogStore.createCatalogProperties(new CatalogName(catalogName), new ConnectorName("memory"), ImmutableMap.of("invalid", "128MB"));
87-
catalogStore.addOrReplaceCatalog(catalogProperties);
88-
connectorServicesProvider.loadInitialCatalogs();
8996

90-
assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', '" + catalogName + "', 'system'", false, false);
91-
assertQueryFails(queryRunner, session, "CREATE TABLE %s.default.test_table (age INT)".formatted(catalogName), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(catalogName));
92-
assertQueryFails(queryRunner, session, "SELECT * FROM %s.default.test_table".formatted(catalogName), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(catalogName));
93-
assertQueryFails(queryRunner, session, "CREATE CATALOG %s USING memory WITH (\"memory.max-data-per-node\" = '128MB')".formatted(catalogName), ".*Catalog '%s' already exists.*".formatted(catalogName));
97+
assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', '" + BROKEN_CATALOG + "', 'system'", false, false);
98+
assertQueryFails(queryRunner, session, "CREATE TABLE %s.default.test_table (age INT)".formatted(BROKEN_CATALOG), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(BROKEN_CATALOG));
99+
assertQueryFails(queryRunner, session, "SELECT * FROM %s.default.test_table".formatted(BROKEN_CATALOG), ".*Catalog '%s' failed to initialize and is disabled.*".formatted(BROKEN_CATALOG));
100+
assertQueryFails(queryRunner, session, "CREATE CATALOG %s USING memory WITH (\"memory.max-data-per-node\" = '128MB')".formatted(BROKEN_CATALOG), ".*Catalog '%s' already exists.*".formatted(BROKEN_CATALOG));
94101

95-
assertUpdate(queryRunner, session, "DROP CATALOG " + catalogName, OptionalLong.empty(), Optional.empty());
102+
assertUpdate(queryRunner, session, "DROP CATALOG " + BROKEN_CATALOG, OptionalLong.empty(), Optional.empty());
96103
assertQuery(queryRunner, session, "SHOW CATALOGS", h2QueryRunner, "VALUES 'healthy_catalog', 'system'", false, false);
97104
}
105+
106+
public static class TestCatalogStoreModule
107+
extends AbstractConfigurationAwareModule
108+
{
109+
@Override
110+
protected void setup(Binder binder)
111+
{
112+
if (buildConfigObject(ServerConfig.class).isCoordinator()) {
113+
install(new PrepopulatedInMemoryCatalogStoreModule());
114+
}
115+
}
116+
}
117+
118+
private static class PrepopulatedInMemoryCatalogStoreModule
119+
extends AbstractConfigurationAwareModule
120+
{
121+
@Override
122+
protected void setup(Binder binder) {}
123+
124+
@Provides
125+
@Singleton
126+
public PrepopulatedInMemoryCatalogStoreFactory createDbCatalogStoreFactory(CatalogStoreManager catalogStoreManager)
127+
{
128+
PrepopulatedInMemoryCatalogStoreFactory factory = new PrepopulatedInMemoryCatalogStoreFactory();
129+
catalogStoreManager.addCatalogStoreFactory(factory);
130+
return factory;
131+
}
132+
}
133+
134+
private static class PrepopulatedInMemoryCatalogStoreFactory
135+
implements CatalogStoreFactory
136+
{
137+
@Override
138+
public String getName()
139+
{
140+
return "prepopulated_memory";
141+
}
142+
143+
@Override
144+
public CatalogStore create(Map<String, String> config)
145+
{
146+
return new PrepopulatedInMemoryCatalogStore();
147+
}
148+
}
149+
150+
private static class PrepopulatedInMemoryCatalogStore
151+
extends InMemoryCatalogStore
152+
{
153+
@Override
154+
public Collection<StoredCatalog> getCatalogs()
155+
{
156+
Collection<StoredCatalog> catalogs = super.getCatalogs();
157+
List<StoredCatalog> catalogsCopy = new ArrayList<>(catalogs);
158+
catalogsCopy.add(new StoredCatalog()
159+
{
160+
@Override
161+
public CatalogName name()
162+
{
163+
return new CatalogName("broken_catalog");
164+
}
165+
166+
@Override
167+
public CatalogProperties loadProperties()
168+
{
169+
ImmutableMap<String, String> properties = ImmutableMap.of("non_existing", "false");
170+
return new CatalogProperties(
171+
BROKEN_CATALOG_NAME,
172+
computeCatalogVersion(BROKEN_CATALOG_NAME, MEMORY_CONNECTOR_NAME, properties),
173+
MEMORY_CONNECTOR_NAME,
174+
properties);
175+
}
176+
});
177+
return catalogsCopy;
178+
}
179+
}
98180
}

testing/trino-tests/src/test/java/io/trino/connector/system/TestSystemMetadataCatalogTable.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,9 @@
1414
package io.trino.connector.system;
1515

1616
import com.google.common.collect.ImmutableMap;
17-
import com.google.inject.Key;
1817
import io.trino.Session;
19-
import io.trino.connector.ConnectorServicesProvider;
20-
import io.trino.metadata.CatalogManager;
18+
import io.trino.connector.TestDynamicCatalogs;
2119
import io.trino.plugin.memory.MemoryPlugin;
22-
import io.trino.spi.catalog.CatalogName;
23-
import io.trino.spi.catalog.CatalogProperties;
24-
import io.trino.spi.catalog.CatalogStore;
25-
import io.trino.spi.connector.ConnectorName;
2620
import io.trino.testing.AbstractTestQueryFramework;
2721
import io.trino.testing.DistributedQueryRunner;
2822
import io.trino.testing.QueryRunner;
@@ -36,23 +30,18 @@
3630
public class TestSystemMetadataCatalogTable
3731
extends AbstractTestQueryFramework
3832
{
39-
private CatalogStore catalogStore;
40-
private ConnectorServicesProvider connectorServicesProvider;
41-
private CatalogManager catalogManager;
42-
4333
@Override
4434
protected QueryRunner createQueryRunner()
4535
throws Exception
4636
{
4737
Session session = testSessionBuilder().build();
4838
QueryRunner queryRunner = DistributedQueryRunner.builder(session)
39+
.setAdditionalModule(new TestDynamicCatalogs.TestCatalogStoreModule())
40+
.setCoordinatorProperties(ImmutableMap.of("catalog.store", "prepopulated_memory"))
4941
.setWorkerCount(0)
5042
.build();
5143
queryRunner.installPlugin(new MemoryPlugin());
5244
queryRunner.createCatalog("healthy_catalog", "memory", ImmutableMap.of("memory.max-data-per-node", "128MB"));
53-
catalogStore = queryRunner.getCoordinator().getInstance(new Key<>() {});
54-
connectorServicesProvider = queryRunner.getCoordinator().getInstance(new Key<>() {});
55-
catalogManager = queryRunner.getCoordinator().getInstance(new Key<>() {});
5645
return queryRunner;
5746
}
5847

@@ -61,11 +50,13 @@ public void testNewCatalogStatus()
6150
{
6251
assertQuery("SELECT * FROM system.metadata.catalogs", "VALUES" +
6352
"('healthy_catalog', 'healthy_catalog', 'memory', 'OPERATIONAL'), " +
53+
"('broken_catalog', 'broken_catalog', 'memory', 'FAILING'), " +
6454
"('system', 'system', 'system', 'OPERATIONAL')");
6555

6656
assertUpdate("CREATE CATALOG brain USING memory WITH (\"memory.max-data-per-node\" = '128MB')");
6757
assertQuery("SELECT * FROM system.metadata.catalogs", "VALUES" +
6858
"('healthy_catalog', 'healthy_catalog', 'memory', 'OPERATIONAL'), " +
59+
"('broken_catalog', 'broken_catalog', 'memory', 'FAILING'), " +
6960
"('brain', 'brain', 'memory', 'OPERATIONAL'), " +
7061
"('system', 'system', 'system', 'OPERATIONAL')");
7162

@@ -75,14 +66,9 @@ public void testNewCatalogStatus()
7566
@Test
7667
public void testCatalogNotLoadedCorrectly()
7768
{
78-
CatalogProperties catalogProperties = catalogStore.createCatalogProperties(new CatalogName("broken"), new ConnectorName("memory"), ImmutableMap.of("memory.max-data-per-n", "128MB"));
79-
80-
catalogStore.addOrReplaceCatalog(catalogProperties);
81-
connectorServicesProvider.loadInitialCatalogs();
8269
assertQuery("SELECT * FROM system.metadata.catalogs", "VALUES" +
8370
"('healthy_catalog', 'healthy_catalog', 'memory', 'OPERATIONAL'), " +
84-
"('broken', 'broken', 'memory', 'FAILING'), " +
71+
"('broken_catalog', 'broken_catalog', 'memory', 'FAILING'), " +
8572
"('system', 'system', 'system', 'OPERATIONAL')");
86-
catalogManager.dropCatalog(new CatalogName("broken"), false);
8773
}
8874
}

0 commit comments

Comments
 (0)