diff --git a/presto-docs/src/main/sphinx/develop/event-listener.rst b/presto-docs/src/main/sphinx/develop/event-listener.rst index 2f9349e448f3..d07620a41468 100644 --- a/presto-docs/src/main/sphinx/develop/event-listener.rst +++ b/presto-docs/src/main/sphinx/develop/event-listener.rst @@ -46,3 +46,10 @@ Example configuration file: event-listener.name=custom-event-listener custom-property1=custom-value1 custom-property2=custom-value2 + +Multiple Event Listeners +------------------------ + +Multiple instances of the same, or different event listeners can be +installed and configured by setting ``event-listener.config-files`` +to a comma separated list of config files. diff --git a/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerConfig.java b/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerConfig.java new file mode 100644 index 000000000000..04d8167455f9 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.eventlistener; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import io.airlift.configuration.Config; + +import javax.validation.constraints.NotNull; + +import java.io.File; +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +public class EventListenerConfig +{ + private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); + private List eventListenerFiles = ImmutableList.of(); + + @NotNull + public List getEventListenerFiles() + { + return eventListenerFiles; + } + + @Config("event-listener.config-files") + public EventListenerConfig setEventListenerFiles(String eventListenerFiles) + { + this.eventListenerFiles = SPLITTER.splitToList(eventListenerFiles).stream() + .map(File::new) + .collect(toImmutableList()); + return this; + } + + public EventListenerConfig setEventListenerFiles(List eventListenerFiles) + { + this.eventListenerFiles = ImmutableList.copyOf(eventListenerFiles); + return this; + } +} diff --git a/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerManager.java b/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerManager.java index 476573fc8014..0981fefcd2b1 100644 --- a/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerManager.java +++ b/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerManager.java @@ -13,10 +13,9 @@ */ package io.prestosql.eventlistener; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; import io.airlift.log.Logger; -import io.prestosql.spi.classloader.ThreadContextClassLoader; import io.prestosql.spi.eventlistener.EventListener; import io.prestosql.spi.eventlistener.EventListenerFactory; import io.prestosql.spi.eventlistener.QueryCompletedEvent; @@ -24,14 +23,19 @@ import io.prestosql.spi.eventlistener.SplitCompletedEvent; import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.prestosql.util.PropertiesUtil.loadProperties; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -39,12 +43,18 @@ public class EventListenerManager { private static final Logger log = Logger.get(EventListenerManager.class); - private static final File CONFIG_FILE = new File("etc/event-listener.properties"); - private static final String NAME_PROPERTY = "event-listener.name"; - + private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name"; + private final List configFiles; private final Map eventListenerFactories = new ConcurrentHashMap<>(); - private final AtomicReference> configuredEventListener = new AtomicReference<>(Optional.empty()); + private final AtomicReference> configuredEventListeners = new AtomicReference<>(ImmutableList.of()); + private final AtomicBoolean loading = new AtomicBoolean(false); + + @Inject + public EventListenerManager(EventListenerConfig config) + { + this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles()); + } public void addEventListenerFactory(EventListenerFactory eventListenerFactory) { @@ -55,59 +65,79 @@ public void addEventListenerFactory(EventListenerFactory eventListenerFactory) } } - public void loadConfiguredEventListener() - throws Exception + public void loadConfiguredEventListeners() { - File configFile = CONFIG_FILE.getAbsoluteFile(); - if (!configFile.exists()) { - return; + checkState(loading.compareAndSet(false, true), "Event listeners already loaded"); + + List configFiles = this.configFiles; + if (configFiles.isEmpty()) { + if (!CONFIG_FILE.exists()) { + return; + } + configFiles = ImmutableList.of(CONFIG_FILE); } - - Map properties = new HashMap<>(loadProperties(configFile)); - - String name = properties.remove(NAME_PROPERTY); - checkState(!isNullOrEmpty(name), "Access control configuration %s does not contain '%s'", configFile, NAME_PROPERTY); - - setConfiguredEventListener(name, properties); + List eventListeners = configFiles.stream() + .map(this::createEventListener) + .collect(toImmutableList()); + this.configuredEventListeners.set(eventListeners); } - @VisibleForTesting - protected void setConfiguredEventListener(String name, Map properties) + private EventListener createEventListener(File configFile) { - requireNonNull(name, "name is null"); - requireNonNull(properties, "properties is null"); - - log.info("-- Loading event listener --"); - + log.info("-- Loading event listener %s --", configFile); + configFile = configFile.getAbsoluteFile(); + Map properties = loadEventListenerProperties(configFile); + String name = properties.remove(EVENT_LISTENER_NAME_PROPERTY); + checkArgument(!isNullOrEmpty(name), "EventListener plugin configuration for %s does not contain %s", configFile, EVENT_LISTENER_NAME_PROPERTY); EventListenerFactory eventListenerFactory = eventListenerFactories.get(name); - checkState(eventListenerFactory != null, "Event listener '%s' is not registered", name); + EventListener eventListener = eventListenerFactory.create(properties); + log.info("-- Loaded event listener %s --", configFile); + return eventListener; + } - try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(eventListenerFactory.getClass().getClassLoader())) { - EventListener eventListener = eventListenerFactory.create(ImmutableMap.copyOf(properties)); - this.configuredEventListener.set(Optional.of(eventListener)); + private Map loadEventListenerProperties(File configFile) + { + try { + return new HashMap<>(loadProperties(configFile)); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to read configuration file: " + configFile, e); } - - log.info("-- Loaded event listener %s --", name); } public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { - if (configuredEventListener.get().isPresent()) { - configuredEventListener.get().get().queryCompleted(queryCompletedEvent); + for (EventListener listener : configuredEventListeners.get()) { + try { + listener.queryCompleted(queryCompletedEvent); + } + catch (Throwable e) { + log.warn("Failed to publish QueryCompletedEvent for query %s", queryCompletedEvent.getMetadata().getQueryId(), e); + } } } public void queryCreated(QueryCreatedEvent queryCreatedEvent) { - if (configuredEventListener.get().isPresent()) { - configuredEventListener.get().get().queryCreated(queryCreatedEvent); + for (EventListener listener : configuredEventListeners.get()) { + try { + listener.queryCreated(queryCreatedEvent); + } + catch (Throwable e) { + log.warn("Failed to publish QueryCreatedEvent for query %s", queryCreatedEvent.getMetadata().getQueryId(), e); + } } } public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { - if (configuredEventListener.get().isPresent()) { - configuredEventListener.get().get().splitCompleted(splitCompletedEvent); + for (EventListener listener : configuredEventListeners.get()) { + try { + listener.splitCompleted(splitCompletedEvent); + } + catch (Throwable e) { + log.warn("Failed to publish SplitCompletedEvent for query %s", splitCompletedEvent.getQueryId(), e); + } } } } diff --git a/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerModule.java b/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerModule.java index 0979f0d8ac0e..47435e9072be 100644 --- a/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerModule.java +++ b/presto-main/src/main/java/io/prestosql/eventlistener/EventListenerModule.java @@ -17,12 +17,15 @@ import com.google.inject.Module; import com.google.inject.Scopes; +import static io.airlift.configuration.ConfigBinder.configBinder; + public class EventListenerModule implements Module { @Override public void configure(Binder binder) { + configBinder(binder).bindConfig(EventListenerConfig.class); binder.bind(EventListenerManager.class).in(Scopes.SINGLETON); } } diff --git a/presto-main/src/main/java/io/prestosql/server/PrestoServer.java b/presto-main/src/main/java/io/prestosql/server/PrestoServer.java index 3cfbcc84f99d..7bf6c4f7623e 100644 --- a/presto-main/src/main/java/io/prestosql/server/PrestoServer.java +++ b/presto-main/src/main/java/io/prestosql/server/PrestoServer.java @@ -140,7 +140,7 @@ public void run() injector.getInstance(ResourceGroupManager.class).loadConfigurationManager(); injector.getInstance(AccessControlManager.class).loadSystemAccessControl(); injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator(); - injector.getInstance(EventListenerManager.class).loadConfiguredEventListener(); + injector.getInstance(EventListenerManager.class).loadConfiguredEventListeners(); injector.getInstance(GroupProviderManager.class).loadConfiguredGroupProvider(); injector.getInstance(Announcer.class).start(); diff --git a/presto-main/src/main/java/io/prestosql/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/io/prestosql/server/testing/TestingPrestoServer.java index 8b3388931bbb..ad486e9f04d2 100644 --- a/presto-main/src/main/java/io/prestosql/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/io/prestosql/server/testing/TestingPrestoServer.java @@ -43,6 +43,7 @@ import io.prestosql.connector.ConnectorManager; import io.prestosql.cost.StatsCalculator; import io.prestosql.dispatcher.DispatchManager; +import io.prestosql.eventlistener.EventListenerConfig; import io.prestosql.eventlistener.EventListenerManager; import io.prestosql.execution.QueryInfo; import io.prestosql.execution.QueryManager; @@ -231,6 +232,7 @@ private TestingPrestoServer( binder.bind(new TypeLiteral>() {}) .annotatedWith(TestingAccessControlManager.ForSystemAccessControl.class) .toInstance(ImmutableMap.copyOf(systemAccessControlProperties)); + binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON); binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON); binder.bind(TestingEventListenerManager.class).in(Scopes.SINGLETON); binder.bind(AccessControlManager.class).to(TestingAccessControlManager.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/io/prestosql/testing/LocalQueryRunner.java b/presto-main/src/main/java/io/prestosql/testing/LocalQueryRunner.java index aff4e422df82..f9bfd5915a4a 100644 --- a/presto-main/src/main/java/io/prestosql/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/io/prestosql/testing/LocalQueryRunner.java @@ -41,6 +41,7 @@ import io.prestosql.cost.CostComparator; import io.prestosql.cost.StatsCalculator; import io.prestosql.cost.TaskCountEstimator; +import io.prestosql.eventlistener.EventListenerConfig; import io.prestosql.eventlistener.EventListenerManager; import io.prestosql.execution.CommentTask; import io.prestosql.execution.CommitTask; @@ -365,7 +366,7 @@ private LocalQueryRunner( new NoOpResourceGroupManager(), accessControl, new PasswordAuthenticatorManager(), - new EventListenerManager(), + new EventListenerManager(new EventListenerConfig()), new GroupProviderManager(), new SessionPropertyDefaults(nodeInfo)); diff --git a/presto-main/src/main/java/io/prestosql/testing/TestingEventListenerManager.java b/presto-main/src/main/java/io/prestosql/testing/TestingEventListenerManager.java index 6b68711bb9ca..671233edc904 100644 --- a/presto-main/src/main/java/io/prestosql/testing/TestingEventListenerManager.java +++ b/presto-main/src/main/java/io/prestosql/testing/TestingEventListenerManager.java @@ -14,6 +14,8 @@ package io.prestosql.testing; import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.prestosql.eventlistener.EventListenerConfig; import io.prestosql.eventlistener.EventListenerManager; import io.prestosql.spi.eventlistener.EventListener; import io.prestosql.spi.eventlistener.EventListenerFactory; @@ -21,41 +23,48 @@ import io.prestosql.spi.eventlistener.QueryCreatedEvent; import io.prestosql.spi.eventlistener.SplitCompletedEvent; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; public class TestingEventListenerManager extends EventListenerManager { - private final AtomicReference> configuredEventListener = new AtomicReference<>(Optional.empty()); + private final Set configuredEventListeners = Collections.synchronizedSet(new HashSet<>()); + + @Inject + public TestingEventListenerManager(EventListenerConfig config) + { + super(config); + } @Override public void addEventListenerFactory(EventListenerFactory eventListenerFactory) { - configuredEventListener.set(Optional.of(eventListenerFactory.create(ImmutableMap.of()))); + configuredEventListeners.add(eventListenerFactory.create(ImmutableMap.of())); } @Override public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { - if (configuredEventListener.get().isPresent()) { - configuredEventListener.get().get().queryCompleted(queryCompletedEvent); + for (EventListener listener : configuredEventListeners) { + listener.queryCompleted(queryCompletedEvent); } } @Override public void queryCreated(QueryCreatedEvent queryCreatedEvent) { - if (configuredEventListener.get().isPresent()) { - configuredEventListener.get().get().queryCreated(queryCreatedEvent); + for (EventListener listener : configuredEventListeners) { + listener.queryCreated(queryCreatedEvent); } } @Override public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { - if (configuredEventListener.get().isPresent()) { - configuredEventListener.get().get().splitCompleted(splitCompletedEvent); + for (EventListener listener : configuredEventListeners) { + listener.splitCompleted(splitCompletedEvent); } } } diff --git a/presto-main/src/test/java/io/prestosql/eventlistener/TestEventListenerConfig.java b/presto-main/src/test/java/io/prestosql/eventlistener/TestEventListenerConfig.java new file mode 100644 index 000000000000..7f6af18d9cb1 --- /dev/null +++ b/presto-main/src/test/java/io/prestosql/eventlistener/TestEventListenerConfig.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.eventlistener; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.configuration.testing.ConfigAssertions; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; + +public class TestEventListenerConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class) + .setEventListenerFiles("")); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("event-listener.config-files", "a,b,c") + .build(); + + EventListenerConfig expected = new EventListenerConfig() + .setEventListenerFiles(ImmutableList.of(new File("a"), new File("b"), new File("c"))); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-main/src/test/java/io/prestosql/execution/TaskTestUtils.java b/presto-main/src/test/java/io/prestosql/execution/TaskTestUtils.java index 9b5c5a37d546..8ed59c37f7f6 100644 --- a/presto-main/src/test/java/io/prestosql/execution/TaskTestUtils.java +++ b/presto-main/src/test/java/io/prestosql/execution/TaskTestUtils.java @@ -19,6 +19,7 @@ import io.prestosql.connector.CatalogName; import io.prestosql.cost.StatsAndCosts; import io.prestosql.event.SplitMonitor; +import io.prestosql.eventlistener.EventListenerConfig; import io.prestosql.eventlistener.EventListenerManager; import io.prestosql.execution.TestSqlTaskManager.MockExchangeClientSupplier; import io.prestosql.execution.buffer.OutputBuffers; @@ -152,7 +153,7 @@ public static TaskInfo updateTask(SqlTask sqlTask, List taskSources, public static SplitMonitor createTestSplitMonitor() { return new SplitMonitor( - new EventListenerManager(), + new EventListenerManager(new EventListenerConfig()), new ObjectMapperProvider().get()); } }