From 18759f32adb792ea3c33550b613f9e12a71cede9 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Tue, 26 Jul 2022 20:01:38 -0700 Subject: [PATCH] Fix failures in TestConnectorEventListener Connector event listeners must be explicitly loaded after the catalog is created. --- .../src/main/java/io/trino/server/Server.java | 32 ++++++++++++------- .../server/testing/TestingTrinoServer.java | 17 ++++++++++ .../execution/TestConnectorEventListener.java | 5 ++- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/Server.java b/core/trino-main/src/main/java/io/trino/server/Server.java index 8027c9f9f89e..1656afb7d7c8 100644 --- a/core/trino-main/src/main/java/io/trino/server/Server.java +++ b/core/trino-main/src/main/java/io/trino/server/Server.java @@ -13,6 +13,7 @@ */ package io.trino.server; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.StandardSystemProperty; import com.google.common.collect.ImmutableList; import com.google.inject.Injector; @@ -137,17 +138,10 @@ private void doStart(String trinoVersion) // Connector event listeners are only supported for statically loaded catalogs // TODO: remove connector event listeners or add support for dynamic loading from connector - CatalogManager catalogManager = injector.getInstance(CatalogManager.class); - ConnectorServicesProvider connectorServicesProvider = injector.getInstance(ConnectorServicesProvider.class); - EventListenerManager eventListenerManager = injector.getInstance(EventListenerManager.class); - catalogManager.getCatalogNames().stream() - .map(catalogManager::getCatalog) - .flatMap(Optional::stream) - .map(Catalog::getCatalogHandle) - .map(connectorServicesProvider::getConnectorServices) - .map(ConnectorServices::getEventListeners) - .flatMap(Collection::stream) - .forEach(eventListenerManager::addEventListener); + addConnectorEventListeners( + injector.getInstance(CatalogManager.class), + injector.getInstance(ConnectorServicesProvider.class), + injector.getInstance(EventListenerManager.class)); // TODO: remove this huge hack updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class)); @@ -189,6 +183,22 @@ private void doStart(String trinoVersion) } } + @VisibleForTesting + public static void addConnectorEventListeners( + CatalogManager catalogManager, + ConnectorServicesProvider connectorServicesProvider, + EventListenerManager eventListenerManager) + { + catalogManager.getCatalogNames().stream() + .map(catalogManager::getCatalog) + .flatMap(Optional::stream) + .map(Catalog::getCatalogHandle) + .map(connectorServicesProvider::getConnectorServices) + .map(ConnectorServices::getEventListeners) + .flatMap(Collection::stream) + .forEach(eventListenerManager::addEventListener); + } + @SuppressWarnings("unchecked") private static Key> optionalKey(Class type) { diff --git a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java index 06d489907be7..934b57104d50 100644 --- a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java +++ b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java @@ -42,6 +42,7 @@ import io.trino.connector.CatalogHandle; import io.trino.connector.CatalogManagerModule; import io.trino.connector.ConnectorManager; +import io.trino.connector.ConnectorServicesProvider; import io.trino.cost.StatsCalculator; import io.trino.dispatcher.DispatchManager; import io.trino.eventlistener.EventListenerConfig; @@ -58,6 +59,7 @@ import io.trino.memory.ClusterMemoryManager; import io.trino.memory.LocalMemoryManager; import io.trino.metadata.AllNodes; +import io.trino.metadata.CatalogManager; import io.trino.metadata.FunctionBundle; import io.trino.metadata.FunctionManager; import io.trino.metadata.GlobalFunctionCatalog; @@ -72,6 +74,7 @@ import io.trino.security.GroupProviderManager; import io.trino.server.GracefulShutdownHandler; import io.trino.server.PluginManager; +import io.trino.server.Server; import io.trino.server.ServerMainModule; import io.trino.server.SessionPropertyDefaults; import io.trino.server.ShutdownAction; @@ -447,6 +450,20 @@ public void loadExchangeManager(String name, Map properties) exchangeManagerRegistry.loadExchangeManager(name, properties); } + /** + * Add the event listeners from connectors. Connector event listeners are + * only supported for statically loaded catalogs, and this doesn't match up + * with the model of the testing Trino server. This method should only be + * called once after all catalogs are added. + */ + public void addConnectorEventListeners() + { + Server.addConnectorEventListeners( + injector.getInstance(CatalogManager.class), + injector.getInstance(ConnectorServicesProvider.class), + injector.getInstance(EventListenerManager.class)); + } + public Path getBaseDataDir() { return baseDataDir; diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java b/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java index 3dfcc0de154d..d15e8c6fc439 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java @@ -41,6 +41,8 @@ public void setUp() { closer = Closer.create(); DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(TEST_SESSION).setNodeCount(1).build(); + closer.register(queryRunner); + queryRunner.installPlugin(new Plugin() { @Override @@ -51,8 +53,9 @@ public Iterable getConnectorFactories() .build()); } }); - closer.register(queryRunner); queryRunner.createCatalog("mock-catalog", "mock"); + + queryRunner.getCoordinator().addConnectorEventListeners(); queries = new EventsAwaitingQueries(generatedEvents, queryRunner); }