Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions presto-docs/src/main/sphinx/develop/event-listener.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ Example configuration file:
event-listener.name=custom-event-listener
custom-property1=custom-value1
custom-property2=custom-value2

Multiple Event Listeners
------------------------

Multiple instances of the same, or different event listeners can be
installed and configured by setting ``event-listener.config-files``
to a comma separated list of config files.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.eventlistener;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;

import javax.validation.constraints.NotNull;

import java.io.File;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;

public class EventListenerConfig
{
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private List<File> eventListenerFiles = ImmutableList.of();

@NotNull
public List<File> getEventListenerFiles()
{
return eventListenerFiles;
}

@Config("event-listener.config-files")
public EventListenerConfig setEventListenerFiles(String eventListenerFiles)
{
this.eventListenerFiles = SPLITTER.splitToList(eventListenerFiles).stream()
.map(File::new)
.collect(toImmutableList());
return this;
}

public EventListenerConfig setEventListenerFiles(List<File> eventListenerFiles)
{
this.eventListenerFiles = ImmutableList.copyOf(eventListenerFiles);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,26 @@
*/
package io.prestosql.eventlistener;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.eventlistener.EventListener;
import io.prestosql.spi.eventlistener.EventListenerFactory;
import io.prestosql.spi.eventlistener.QueryCompletedEvent;
import io.prestosql.spi.eventlistener.QueryCreatedEvent;
import io.prestosql.spi.eventlistener.SplitCompletedEvent;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.prestosql.util.PropertiesUtil.loadProperties;
import static java.lang.String.format;
Expand All @@ -39,75 +41,97 @@
public class EventListenerManager
{
private static final Logger log = Logger.get(EventListenerManager.class);

private static final File CONFIG_FILE = new File("etc/event-listener.properties");
private static final String NAME_PROPERTY = "event-listener.name";

private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name";
private final List<File> configFiles;
private final Map<String, EventListenerFactory> eventListenerFactories = new ConcurrentHashMap<>();
private final AtomicReference<Optional<EventListener>> configuredEventListener = new AtomicReference<>(Optional.empty());
private final AtomicReference<List<EventListener>> configuredEventListeners =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wrapping shouldn't change

new AtomicReference<>(ImmutableList.of());

@Inject
public EventListenerManager(EventListenerConfig config)
{
this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles());
}

public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
{
requireNonNull(eventListenerFactory, "eventListenerFactory is null");

if (eventListenerFactories.putIfAbsent(eventListenerFactory.getName(), eventListenerFactory) != null) {
throw new IllegalArgumentException(format("Event listener '%s' is already registered", eventListenerFactory.getName()));
throw new IllegalArgumentException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

format("Event listener '%s' is already registered", eventListenerFactory.getName()));
}
}

public void loadConfiguredEventListener()
throws Exception
public void loadConfiguredEventListeners()
{
File configFile = CONFIG_FILE.getAbsoluteFile();
if (!configFile.exists()) {
return;
List<File> configFiles = this.configFiles;
if (configFiles.isEmpty()) {
if (!CONFIG_FILE.exists()) {
return;
}
configFiles = ImmutableList.of(CONFIG_FILE);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be safe, let's add an AtomicBoolean so that we can't accidentally load twice

checkState(loading.compareAndSet(false, true), "Event listeners already loaded");

Map<String, String> properties = new HashMap<>(loadProperties(configFile));

String name = properties.remove(NAME_PROPERTY);
checkState(!isNullOrEmpty(name), "Access control configuration %s does not contain '%s'", configFile, NAME_PROPERTY);

setConfiguredEventListener(name, properties);
List<EventListener> eventListeners =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting nit: don't wrap the assignment, but do wrap the stream operations. Also, prefer the immutable collectors.

List<EventListener> eventListeners = configFiles.stream()
        .map(this::createEventListener)
        .collect(toImmutableList());

configFiles.stream().map(this::createEventListener).collect(Collectors.toList());
this.configuredEventListeners.set(eventListeners);
}

@VisibleForTesting
protected void setConfiguredEventListener(String name, Map<String, String> properties)
private EventListener createEventListener(File configFile)
{
requireNonNull(name, "name is null");
requireNonNull(properties, "properties is null");

log.info("-- Loading event listener --");

EventListenerFactory eventListenerFactory = eventListenerFactories.get(name);
checkState(eventListenerFactory != null, "Event listener '%s' is not registered", name);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(eventListenerFactory.getClass().getClassLoader())) {
EventListener eventListener = eventListenerFactory.create(ImmutableMap.copyOf(properties));
this.configuredEventListener.set(Optional.of(eventListener));
log.info("-- Loading event listener %s --", configFile);
configFile = configFile.getAbsoluteFile();
Map<String, String> properties;
try {
properties = new HashMap<>(loadProperties(configFile));
}
catch (IOException e) {
throw new UncheckedIOException("Failed to read configuration file: " + configFile, e);
}

log.info("-- Loaded event listener %s --", name);
String name = properties.remove(EVENT_LISTENER_NAME_PROPERTY);
checkArgument(!isNullOrEmpty(name), "EventListener plugin configuration for %s does not contain %s", configFile,
EVENT_LISTENER_NAME_PROPERTY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: don't wrap here

EventListenerFactory eventListenerFactory = eventListenerFactories.get(name);
EventListener eventListener = eventListenerFactory.create(properties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the ThreadContextClassLoader try-with-resources block around the create() call. This is needed to be friendly to event listeners that use libraries that make bad assumptions about the thread context class loader.

log.info("-- Loaded event listener %s --", configFile);
return eventListener;
}

public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
if (configuredEventListener.get().isPresent()) {
configuredEventListener.get().get().queryCompleted(queryCompletedEvent);
for (EventListener listener : configuredEventListeners.get()) {
try {
listener.queryCompleted(queryCompletedEvent);
}
catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be RuntimeException since the event listener doesn't throw any checked exceptions. However, I'm thinking we should make this Throwable. While we hope plugins don't throw Error, if they do, there's nothing we can do, so we might as well log it and continue.

Same for the other events.

log.warn("Failed to publish QueryCompletedEvent for query %s", queryCompletedEvent.getMetadata().getQueryId(), e);
}
}
}

public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
if (configuredEventListener.get().isPresent()) {
configuredEventListener.get().get().queryCreated(queryCreatedEvent);
for (EventListener listener : configuredEventListeners.get()) {
try {
listener.queryCreated(queryCreatedEvent);
}
catch (Exception e) {
log.warn("Failed to publish QueryCreatedEvent for query %s", queryCreatedEvent.getMetadata().getQueryId(), e);
}
}
}

public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
if (configuredEventListener.get().isPresent()) {
configuredEventListener.get().get().splitCompleted(splitCompletedEvent);
for (EventListener listener : configuredEventListeners.get()) {
try {
listener.splitCompleted(splitCompletedEvent);
}
catch (Exception e) {
log.warn("Failed to publish SplitCompletedEvent for query %s", splitCompletedEvent.getQueryId(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
import com.google.inject.Module;
import com.google.inject.Scopes;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class EventListenerModule
implements Module
{
@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(EventListenerConfig.class);
binder.bind(EventListenerManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(EventListenerManager.class).withGeneratedName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this since EventListenerManager doesn't have any @Managed annotations.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void run()
injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();
injector.getInstance(AccessControlManager.class).loadSystemAccessControl();
injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator();
injector.getInstance(EventListenerManager.class).loadConfiguredEventListener();
injector.getInstance(EventListenerManager.class).loadConfiguredEventListeners();

injector.getInstance(Announcer.class).start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.prestosql.connector.ConnectorManager;
import io.prestosql.cost.StatsCalculator;
import io.prestosql.dispatcher.DispatchManager;
import io.prestosql.eventlistener.EventListenerConfig;
import io.prestosql.eventlistener.EventListenerManager;
import io.prestosql.execution.QueryInfo;
import io.prestosql.execution.QueryManager;
Expand Down Expand Up @@ -227,6 +228,7 @@ public TestingPrestoServer(
.add(binder -> {
binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON);
binder.bind(TestingEventListenerManager.class).in(Scopes.SINGLETON);
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
binder.bind(AccessControlManager.class).to(TestingAccessControlManager.class).in(Scopes.SINGLETON);
binder.bind(EventListenerManager.class).to(TestingEventListenerManager.class).in(Scopes.SINGLETON);
binder.bind(AccessControl.class).to(AccessControlManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.prestosql.cost.CostComparator;
import io.prestosql.cost.StatsCalculator;
import io.prestosql.cost.TaskCountEstimator;
import io.prestosql.eventlistener.EventListenerConfig;
import io.prestosql.eventlistener.EventListenerManager;
import io.prestosql.execution.CommentTask;
import io.prestosql.execution.CommitTask;
Expand Down Expand Up @@ -373,7 +374,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
new NoOpResourceGroupManager(),
accessControl,
new PasswordAuthenticatorManager(),
new EventListenerManager(),
new EventListenerManager(new EventListenerConfig()),
new SessionPropertyDefaults(nodeInfo));

connectorManager.addConnectorFactory(globalSystemConnectorFactory, globalSystemConnectorFactory.getClass()::getClassLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,58 @@
package io.prestosql.testing;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.prestosql.eventlistener.EventListenerConfig;
import io.prestosql.eventlistener.EventListenerManager;
import io.prestosql.spi.eventlistener.EventListener;
import io.prestosql.spi.eventlistener.EventListenerFactory;
import io.prestosql.spi.eventlistener.QueryCompletedEvent;
import io.prestosql.spi.eventlistener.QueryCreatedEvent;
import io.prestosql.spi.eventlistener.SplitCompletedEvent;

import java.util.Optional;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

public class TestingEventListenerManager
extends EventListenerManager
{
private final AtomicReference<Optional<EventListener>> configuredEventListener = new AtomicReference<>(Optional.empty());
private final AtomicReference<Set<EventListener>> configuredEventListeners = new AtomicReference(new HashSet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new HashSet<>() so that it's not a raw type.

We don't need to change this class at all, since it still only supports a single event listener. If we want to change it, we should make it add each new listener (instead of overwriting the set). However, I feel changing the test implementation is somewhat unrelated to the rest of the PR (which is allow multiple listeners for the main server).


@Inject
public TestingEventListenerManager(EventListenerConfig config)
{
super(config);
}

@Override
public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
{
configuredEventListener.set(Optional.of(eventListenerFactory.create(ImmutableMap.of())));
configuredEventListeners.set(Collections.singleton(eventListenerFactory.create(ImmutableMap.of())));
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
if (configuredEventListener.get().isPresent()) {
configuredEventListener.get().get().queryCompleted(queryCompletedEvent);
for (EventListener listener : configuredEventListeners.get()) {
listener.queryCompleted(queryCompletedEvent);
}
}

@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
if (configuredEventListener.get().isPresent()) {
configuredEventListener.get().get().queryCreated(queryCreatedEvent);
for (EventListener listener : configuredEventListeners.get()) {
listener.queryCreated(queryCreatedEvent);
}
}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
if (configuredEventListener.get().isPresent()) {
configuredEventListener.get().get().splitCompleted(splitCompletedEvent);
for (EventListener listener : configuredEventListeners.get()) {
listener.splitCompleted(splitCompletedEvent);
}
}
}
Loading