diff --git a/presto-docs/src/main/sphinx/develop/event-listener.rst b/presto-docs/src/main/sphinx/develop/event-listener.rst index 2f9349e448f3a..61cd5254f24ff 100644 --- a/presto-docs/src/main/sphinx/develop/event-listener.rst +++ b/presto-docs/src/main/sphinx/develop/event-listener.rst @@ -46,3 +46,14 @@ Example configuration file: event-listener.name=custom-event-listener custom-property1=custom-value1 custom-property2=custom-value2 + +Multiple Event Listeners +------------------------ + +Multiple instances of the same, or different event listeners can be +installed and configured by setting ``event-listener.config-files`` +to a comma separated list of config files. + +.. code-block:: none + + event-listener.config-files=etc/event-listener.properties,etc/event-listener-second.properties diff --git a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerConfig.java b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerConfig.java new file mode 100644 index 0000000000000..1d248f56e59a0 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.eventlistener; + +import com.facebook.airlift.configuration.Config; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; + +import javax.validation.constraints.NotNull; + +import java.io.File; +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +public class EventListenerConfig +{ + private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); + private List eventListenerFiles = ImmutableList.of(); + + @NotNull + public List getEventListenerFiles() + { + return eventListenerFiles; + } + + @Config("event-listener.config-files") + public EventListenerConfig setEventListenerFiles(String eventListenerFiles) + { + this.eventListenerFiles = SPLITTER.splitToList(eventListenerFiles).stream() + .map(File::new) + .collect(toImmutableList()); + return this; + } + + public EventListenerConfig setEventListenerFiles(List eventListenerFiles) + { + this.eventListenerFiles = ImmutableList.copyOf(eventListenerFiles); + return this; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java index 24a1d04b54bb1..4118cb98ae34e 100644 --- a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java +++ b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java @@ -22,14 +22,18 @@ import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static com.facebook.presto.util.PropertiesUtil.loadProperties; @@ -44,9 +48,17 @@ public class EventListenerManager private static final Logger log = Logger.get(EventListenerManager.class); private static final File EVENT_LISTENER_CONFIGURATION = new File("etc/event-listener.properties"); private static final String EVENT_LISTENER_PROPERTY_NAME = "event-listener.name"; - + private final List configFiles; private final Map eventListenerFactories = new ConcurrentHashMap<>(); - private final AtomicReference> configuredEventListener = new AtomicReference<>(Optional.empty()); + private final AtomicReference> configuredEventListeners = + new AtomicReference<>(ImmutableList.of()); + private final AtomicBoolean loading = new AtomicBoolean(false); + + @Inject + public EventListenerManager(EventListenerConfig config) + { + this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles()); + } public void addEventListenerFactory(EventListenerFactory eventListenerFactory) { @@ -57,30 +69,41 @@ public void addEventListenerFactory(EventListenerFactory eventListenerFactory) } } - public void loadConfiguredEventListener() - throws Exception + public void loadConfiguredEventListeners() { - if (EVENT_LISTENER_CONFIGURATION.exists()) { - Map properties = loadProperties(EVENT_LISTENER_CONFIGURATION); - checkArgument( - !isNullOrEmpty(properties.get(EVENT_LISTENER_PROPERTY_NAME)), - "Access control configuration %s does not contain %s", - EVENT_LISTENER_CONFIGURATION.getAbsoluteFile(), - EVENT_LISTENER_PROPERTY_NAME); - loadConfiguredEventListener(properties); + checkState(loading.compareAndSet(false, true), "Event listeners already loaded"); + List configFiles = this.configFiles; + if (configFiles.isEmpty()) { + if (!EVENT_LISTENER_CONFIGURATION.exists()) { + return; + } + configFiles = ImmutableList.of(EVENT_LISTENER_CONFIGURATION); } + configFiles.forEach(this::createEventListener); } - public void loadConfiguredEventListener(Map properties) + private void createEventListener(File configFile) { - properties = new HashMap<>(properties); - String eventListenerName = properties.remove(EVENT_LISTENER_PROPERTY_NAME); - checkArgument(!isNullOrEmpty(eventListenerName), "event-listener.name property must be present"); - setConfiguredEventListener(eventListenerName, properties); + log.info("-- Loading event listener configuration file %s --", configFile); + if (configFile.exists()) { + configFile = configFile.getAbsoluteFile(); + log.info("-- Loading event listener configuration file : %s --", configFile); + try { + Map properties = new HashMap<>(loadProperties(configFile)); + loadConfiguredEventListener(properties); + log.info("-- Loaded event listener configuration file %s --", configFile); + } + catch (IOException e) { + log.error(e, "IOException while loading configuration file: " + configFile); + throw new UncheckedIOException("Failed to read configuration file: " + configFile, e); + } + } + else { + log.info("Unable to locate configuration file %s --", configFile); + } } - @VisibleForTesting - protected void setConfiguredEventListener(String name, Map properties) + private void setConfiguredEventListener(String name, Map properties) { requireNonNull(name, "name is null"); requireNonNull(properties, "properties is null"); @@ -92,39 +115,51 @@ protected void setConfiguredEventListener(String name, Map prope try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(eventListenerFactory.getClass().getClassLoader())) { EventListener eventListener = eventListenerFactory.create(ImmutableMap.copyOf(properties)); - this.configuredEventListener.set(Optional.of(eventListener)); + ImmutableList eventListeners = ImmutableList.builder() + .addAll(this.configuredEventListeners.get()) + .add(eventListener) + .build(); + this.configuredEventListeners.set(eventListeners); } log.info("-- Loaded event listener %s --", name); } + public void loadConfiguredEventListener(Map properties) + { + properties = new HashMap<>(properties); + String eventListenerName = properties.remove(EVENT_LISTENER_PROPERTY_NAME); + checkArgument(!isNullOrEmpty(eventListenerName), "event-listener.name property must be present"); + setConfiguredEventListener(eventListenerName, properties); + } + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { - configuredEventListener.get() - .ifPresent(eventListener -> eventListener.queryCompleted(queryCompletedEvent)); + configuredEventListeners.get() + .forEach(eventListener -> eventListener.queryCompleted(queryCompletedEvent)); } public void queryCreated(QueryCreatedEvent queryCreatedEvent) { - configuredEventListener.get() - .ifPresent(eventListener -> eventListener.queryCreated(queryCreatedEvent)); + configuredEventListeners.get() + .forEach(eventListener -> eventListener.queryCreated(queryCreatedEvent)); } public void queryUpdated(QueryUpdatedEvent queryUpdatedEvent) { - configuredEventListener.get() - .ifPresent(eventListener -> eventListener.queryUpdated(queryUpdatedEvent)); + configuredEventListeners.get() + .forEach(eventListener -> eventListener.queryUpdated(queryUpdatedEvent)); } public void publishQueryProgress(QueryProgressEvent queryProgressEvent) { - configuredEventListener.get() - .ifPresent(eventListener -> eventListener.publishQueryProgress(queryProgressEvent)); + configuredEventListeners.get() + .forEach(eventListener -> eventListener.publishQueryProgress(queryProgressEvent)); } public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { - configuredEventListener.get() - .ifPresent(eventListener -> eventListener.splitCompleted(splitCompletedEvent)); + configuredEventListeners.get() + .forEach(eventListener -> eventListener.splitCompleted(splitCompletedEvent)); } } diff --git a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerModule.java b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerModule.java index 5d511bca93c77..e8f579a2c0563 100644 --- a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerModule.java @@ -17,12 +17,17 @@ import com.google.inject.Module; import com.google.inject.Scopes; +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + public class EventListenerModule implements Module { @Override public void configure(Binder binder) { + configBinder(binder).bindConfig(EventListenerConfig.class); binder.bind(EventListenerManager.class).in(Scopes.SINGLETON); + newExporter(binder).export(EventListenerManager.class).withGeneratedName(); } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index 19250dd963eb7..3b2e9fd11e322 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -181,7 +181,7 @@ public void run() } injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator(); injector.getInstance(PrestoAuthenticatorManager.class).loadPrestoAuthenticator(); - injector.getInstance(EventListenerManager.class).loadConfiguredEventListener(); + injector.getInstance(EventListenerManager.class).loadConfiguredEventListeners(); injector.getInstance(TempStorageManager.class).loadTempStorages(); injector.getInstance(QueryPrerequisitesManager.class).loadQueryPrerequisites(); injector.getInstance(NodeTtlFetcherManager.class).loadNodeTtlFetcher(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index a34b46003ad89..3a70d8515f550 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -38,6 +38,7 @@ import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.dispatcher.DispatchManager; import com.facebook.presto.dispatcher.QueryPrerequisitesManagerModule; +import com.facebook.presto.eventlistener.EventListenerConfig; import com.facebook.presto.eventlistener.EventListenerManager; import com.facebook.presto.execution.QueryInfo; import com.facebook.presto.execution.QueryManager; @@ -324,6 +325,7 @@ public TestingPrestoServer( binder.bind(TestingTempStorageManager.class).in(Scopes.SINGLETON); binder.bind(AccessControlManager.class).to(TestingAccessControlManager.class).in(Scopes.SINGLETON); binder.bind(EventListenerManager.class).to(TestingEventListenerManager.class).in(Scopes.SINGLETON); + binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON); binder.bind(TempStorageManager.class).to(TestingTempStorageManager.class).in(Scopes.SINGLETON); binder.bind(AccessControl.class).to(AccessControlManager.class).in(Scopes.SINGLETON); binder.bind(ShutdownAction.class).to(TestShutdownAction.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 814209514bf19..3ac355399945f 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -51,6 +51,7 @@ import com.facebook.presto.cost.TaskCountEstimator; import com.facebook.presto.dispatcher.NoOpQueryManager; import com.facebook.presto.dispatcher.QueryPrerequisitesManager; +import com.facebook.presto.eventlistener.EventListenerConfig; import com.facebook.presto.eventlistener.EventListenerManager; import com.facebook.presto.execution.AlterFunctionTask; import com.facebook.presto.execution.CommitTask; @@ -542,7 +543,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, accessControl, new PasswordAuthenticatorManager(), new PrestoAuthenticatorManager(new SecurityConfig()), - new EventListenerManager(), + new EventListenerManager(new EventListenerConfig()), blockEncodingManager, new TestingTempStorageManager(), new QueryPrerequisitesManager(), diff --git a/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java b/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java index d0bc6e235917e..49571690d0fff 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.testing; +import com.facebook.presto.eventlistener.EventListenerConfig; import com.facebook.presto.eventlistener.EventListenerManager; import com.facebook.presto.spi.eventlistener.EventListener; import com.facebook.presto.spi.eventlistener.EventListenerFactory; @@ -22,6 +23,7 @@ import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +33,12 @@ public class TestingEventListenerManager { private final AtomicReference> configuredEventListener = new AtomicReference<>(Optional.empty()); + @Inject + public TestingEventListenerManager(EventListenerConfig config) + { + super(config); + } + @Override public void addEventListenerFactory(EventListenerFactory eventListenerFactory) { diff --git a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java index 8d850679f8e3e..1b54dc1dea4f9 100644 --- a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java +++ b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java @@ -20,6 +20,7 @@ import com.facebook.presto.cost.HistoryBasedPlanStatisticsManager; import com.facebook.presto.event.QueryMonitor; import com.facebook.presto.event.QueryMonitorConfig; +import com.facebook.presto.eventlistener.EventListenerConfig; import com.facebook.presto.eventlistener.EventListenerManager; import com.facebook.presto.execution.ClusterSizeMonitor; import com.facebook.presto.execution.ExecutionFailureInfo; @@ -464,7 +465,7 @@ private QueryMonitor createQueryMonitor(CountingEventListener eventListener) private EventListenerManager createEventListenerManager(CountingEventListener countingEventListener) { - EventListenerManager eventListenerManager = new EventListenerManager(); + EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig()); eventListenerManager.addEventListenerFactory(new TestEventListenerFactory(countingEventListener)); eventListenerManager.loadConfiguredEventListener(ImmutableMap.of("event-listener.name", TestEventListenerFactory.NAME)); return eventListenerManager; diff --git a/presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerConfig.java b/presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerConfig.java new file mode 100644 index 0000000000000..6127d7451c3aa --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.eventlistener; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Map; + +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; +public class TestEventListenerConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(EventListenerConfig.class) + .setEventListenerFiles("")); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("event-listener.config-files", "a,b,c") + .build(); + + EventListenerConfig expected = new EventListenerConfig() + .setEventListenerFiles("a,b,c"); + assertFullMapping(properties, expected); + + ImmutableList.Builder filesBuilder = ImmutableList.builder(); + filesBuilder.add(new File("a"), new File("b"), new File("c")); + //Test List version + expected = new EventListenerConfig() + .setEventListenerFiles(filesBuilder.build()); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerManager.java b/presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerManager.java new file mode 100644 index 0000000000000..ac7179172f08c --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerManager.java @@ -0,0 +1,549 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.eventlistener; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.RuntimeStats; +import com.facebook.presto.common.plan.PlanCanonicalizationStrategy; +import com.facebook.presto.common.resourceGroups.QueryType; +import com.facebook.presto.spi.PrestoWarning; +import com.facebook.presto.spi.eventlistener.CTEInformation; +import com.facebook.presto.spi.eventlistener.EventListener; +import com.facebook.presto.spi.eventlistener.EventListenerFactory; +import com.facebook.presto.spi.eventlistener.OperatorStatistics; +import com.facebook.presto.spi.eventlistener.PlanOptimizerInformation; +import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; +import com.facebook.presto.spi.eventlistener.QueryContext; +import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryFailureInfo; +import com.facebook.presto.spi.eventlistener.QueryIOMetadata; +import com.facebook.presto.spi.eventlistener.QueryInputMetadata; +import com.facebook.presto.spi.eventlistener.QueryMetadata; +import com.facebook.presto.spi.eventlistener.QueryOutputMetadata; +import com.facebook.presto.spi.eventlistener.QueryStatistics; +import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; +import com.facebook.presto.spi.eventlistener.SplitFailureInfo; +import com.facebook.presto.spi.eventlistener.SplitStatistics; +import com.facebook.presto.spi.eventlistener.StageStatistics; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.spi.prestospark.PrestoSparkExecutionContext; +import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.facebook.presto.spi.session.ResourceEstimates; +import com.facebook.presto.spi.statistics.PlanStatisticsWithSourceInfo; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.expectThrows; + +@Test +public class TestEventListenerManager +{ + private static final Logger log = Logger.get(TestEventListenerManager.class); + private final EventsCapture generatedEvents = new EventsCapture(); + + @Test + public void testMultipleEventListeners() throws IOException + { + Path tempFile1 = Files.createTempFile("listener1_", ".properties"); + Path tempFile2 = Files.createTempFile("listener2_", ".properties"); + Path tempFile3 = Files.createTempFile("listener3_", ".properties"); + + writeProperties(tempFile1, "event-listener.name", "wxd-event-listener1"); + writeProperties(tempFile2, "event-listener.name", "wxd-event-listener2"); + writeProperties(tempFile3, "event-listener.name", "wxd-event-listener3"); + + EventListenerConfig config = new EventListenerConfig() + .setEventListenerFiles(tempFile1.toFile().getPath() + "," + tempFile2.toFile().getPath() + "," + tempFile3.toFile().getPath()); + EventListenerManager eventListenerManager = new EventListenerManager(config); + TestingEventListener testingEventListener = new TestingEventListener(generatedEvents); + eventListenerManager.addEventListenerFactory(new TestEventListenerFactory(testingEventListener, "wxd-event-listener1")); + eventListenerManager.addEventListenerFactory(new TestEventListenerFactory(testingEventListener, "wxd-event-listener2")); + eventListenerManager.addEventListenerFactory(new TestEventListenerFactory(testingEventListener, "wxd-event-listener3")); + eventListenerManager.loadConfiguredEventListeners(); + + QueryCreatedEvent queryCreatedEvent = createDummyQueryCreatedEvent(); + eventListenerManager.queryCreated(queryCreatedEvent); + QueryCompletedEvent queryCompletedEvent = createDummyQueryCompletedEvent(); + eventListenerManager.queryCompleted(queryCompletedEvent); + SplitCompletedEvent splitCompletedEvent = createDummySplitCompletedEvent(); + eventListenerManager.splitCompleted(splitCompletedEvent); + + assertEquals(generatedEvents.getQueryCreatedEvents().size(), 3); + assertEquals(generatedEvents.getQueryCompletedEvents().size(), 3); + assertEquals(generatedEvents.getSplitCompletedEvents().size(), 3); + generatedEvents.getQueryCreatedEvents().forEach(event -> assertEquals(event, queryCreatedEvent)); + generatedEvents.getQueryCompletedEvents().forEach(event -> assertEquals(event, queryCompletedEvent)); + generatedEvents.getSplitCompletedEvents().forEach(event -> assertEquals(event, splitCompletedEvent)); + + tryDeleteFile(tempFile1); + tryDeleteFile(tempFile2); + tryDeleteFile(tempFile3); + } + + @Test + public void testEventListenerNotRegistered() throws IOException + { + Path tempFile1 = Files.createTempFile("listener1_", ".properties"); + Path tempFile2 = Files.createTempFile("listener2_", ".properties"); + + writeProperties(tempFile1, "event-listener.name", "wxd-event-listener1"); + writeProperties(tempFile2, "event-listener.name", "wxd-event-listener2"); + EventListenerConfig config = new EventListenerConfig().setEventListenerFiles(tempFile1.toFile().getPath() + "," + tempFile2.toFile().getPath()); + + EventListenerManager eventListenerManager = new EventListenerManager(config); + TestingEventListener testingEventListener = new TestingEventListener(generatedEvents); + eventListenerManager.addEventListenerFactory(new TestEventListenerFactory(testingEventListener, "wxd-event-listener1")); + + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { + eventListenerManager.loadConfiguredEventListeners(); + }); + + String expectedMessage = "Event listener wxd-event-listener2 is not registered"; + assertEquals(exception.getMessage(), expectedMessage); + } + + private void writeProperties(Path filePath, String key, String value) + throws IOException + { + Properties properties = new Properties(); + properties.setProperty(key, value); + + try (FileOutputStream outputStream = new FileOutputStream(filePath.toFile())) { + properties.store(outputStream, "Test Properties"); + } + } + + public static QueryCreatedEvent createDummyQueryCreatedEvent() + { + QueryMetadata metadata = createDummyQueryMetadata(); + QueryContext context = createDummyQueryContext(); + return new QueryCreatedEvent(Instant.now(), context, metadata); + } + + public static QueryCompletedEvent createDummyQueryCompletedEvent() + { + QueryMetadata metadata = createDummyQueryMetadata(); + QueryStatistics statistics = createDummyQueryStatistics(); + QueryContext context = createDummyQueryContext(); + QueryIOMetadata ioMetadata = createDummyQueryIoMetadata(); + Optional failureInfo = Optional.empty(); + List warnings = new ArrayList<>(); + Optional queryType = Optional.empty(); + List failedTasks = new ArrayList<>(); + Instant createTime = Instant.now(); + Instant executionStartTime = Instant.now().minusSeconds(10); + Instant endTime = Instant.now().plusSeconds(10); + List stageStatistics = new ArrayList<>(); + List operatorStatistics = new ArrayList<>(); + List planStatisticsRead = new ArrayList<>(); + List planStatisticsWritten = new ArrayList<>(); + Map> planNodeHash = new HashMap<>(); + Map canonicalPlan = new HashMap<>(); + Optional statsEquivalentPlan = Optional.empty(); + Optional expandedQuery = Optional.empty(); + List optimizerInformation = new ArrayList<>(); + List cteInformationList = new ArrayList<>(); + Set scalarFunctions = new HashSet<>(); + Set aggregateFunctions = new HashSet<>(); + Set windowFunctions = new HashSet<>(); + Optional prestoSparkExecutionContext = Optional.empty(); + Map hboPlanHash = new HashMap<>(); + Optional> planIdNodeMap = Optional.ofNullable(new HashMap<>()); + + return new QueryCompletedEvent( + metadata, + statistics, + context, + ioMetadata, + failureInfo, + warnings, + queryType, + failedTasks, + createTime, + executionStartTime, + endTime, + stageStatistics, + operatorStatistics, + planStatisticsRead, + planStatisticsWritten, + planNodeHash, + canonicalPlan, + statsEquivalentPlan, + expandedQuery, + optimizerInformation, + cteInformationList, + scalarFunctions, + aggregateFunctions, + windowFunctions, + prestoSparkExecutionContext, + hboPlanHash, + planIdNodeMap); + } + + public static QueryStatistics createDummyQueryStatistics() + { + Duration cpuTime = Duration.ofMillis(1000); + Duration retriedCpuTime = Duration.ofMillis(500); + Duration wallTime = Duration.ofMillis(2000); + Duration waitingForPrerequisitesTime = Duration.ofMillis(300); + Duration queuedTime = Duration.ofMillis(1500); + Duration waitingForResourcesTime = Duration.ofMillis(600); + Duration semanticAnalyzingTime = Duration.ofMillis(700); + Duration columnAccessPermissionCheckingTime = Duration.ofMillis(200); + Duration dispatchingTime = Duration.ofMillis(1200); + Duration planningTime = Duration.ofMillis(2500); + Optional analysisTime = Optional.of(Duration.ofMillis(1800)); + Duration executionTime = Duration.ofMillis(3500); + + int peakRunningTasks = 5; + long peakUserMemoryBytes = 500000000L; + long peakTotalNonRevocableMemoryBytes = 800000000L; + long peakTaskUserMemory = 100000000L; + long peakTaskTotalMemory = 200000000L; + long peakNodeTotalMemory = 120000000L; + long shuffledBytes = 10000000L; + long shuffledRows = 200000L; + long totalBytes = 30000000L; + long totalRows = 400000L; + long outputBytes = 5000000L; + long outputRows = 60000L; + long writtenOutputBytes = 7000000L; + long writtenOutputRows = 80000L; + long writtenIntermediateBytes = 9000000L; + long spilledBytes = 1000000L; + double cumulativeMemory = 150.5; + double cumulativeTotalMemory = 200.5; + int completedSplits = 100; + boolean complete = true; + RuntimeStats runtimeStats = new RuntimeStats(); + return new QueryStatistics( + cpuTime, + retriedCpuTime, + wallTime, + waitingForPrerequisitesTime, + queuedTime, + waitingForResourcesTime, + semanticAnalyzingTime, + columnAccessPermissionCheckingTime, + dispatchingTime, + planningTime, + analysisTime, + executionTime, + peakRunningTasks, + peakUserMemoryBytes, + peakTotalNonRevocableMemoryBytes, + peakTaskUserMemory, + peakTaskTotalMemory, + peakNodeTotalMemory, + shuffledBytes, + shuffledRows, + totalBytes, + totalRows, + outputBytes, + outputRows, + writtenOutputBytes, + writtenOutputRows, + writtenIntermediateBytes, + spilledBytes, + cumulativeMemory, + cumulativeTotalMemory, + completedSplits, + complete, + runtimeStats); + } + + private static QueryMetadata createDummyQueryMetadata() + { + String queryId = "20250216_173945_00000_9r4vt"; + Optional transactionId = Optional.of("dummy-transaction-id"); + String query = "SELECT * FROM dummy_table"; + String queryHash = "dummy-query-hash"; + Optional preparedQuery = Optional.of("PREPARE SELECT * FROM dummy_table"); + String queryState = "COMPLETED"; + URI uri = URI.create("http://localhost/query/dummy-query-id"); + Optional plan = Optional.of("dummy-plan"); + Optional jsonPlan = Optional.of("{\"plan\": \"dummy-plan\"}"); + Optional graphvizPlan = Optional.of("digraph {node1 -> node2}"); + Optional payload = Optional.of("dummy-payload"); + List runtimeOptimizedStages = new ArrayList<>(Arrays.asList("stage1", "stage2")); + Optional tracingId = Optional.of("dummy-tracing-id"); + + return new QueryMetadata( + queryId, + transactionId, + query, + queryHash, + preparedQuery, + queryState, + uri, + plan, + jsonPlan, + graphvizPlan, + payload, + runtimeOptimizedStages, + tracingId); + } + + private static QueryContext createDummyQueryContext() + { + String user = "dummyUser"; + String serverAddress = "127.0.0.1"; + String serverVersion = "testversion"; + String environment = "testing"; + String workerType = "worker-1"; + + Optional principal = Optional.of("dummyPrincipal"); + Optional remoteClientAddress = Optional.of("192.168.1.100"); + Optional userAgent = Optional.of("Mozilla/5.0"); + Optional clientInfo = Optional.of("Dummy Client Info"); + Optional source = Optional.empty(); + Optional catalog = Optional.of("dummyCatalog"); + Optional schema = Optional.of("dummySchema"); + Optional resourceGroupId = Optional.of(new ResourceGroupId("dummyGroupId")); + + Set clientTags = new HashSet<>(Arrays.asList("tag1", "tag2", "tag3")); + + Map sessionProperties = new HashMap<>(); + sessionProperties.put("property1", "value1"); + sessionProperties.put("property2", "value2"); + + ResourceEstimates resourceEstimates = new ResourceEstimates( + Optional.of(new io.airlift.units.Duration(1200, TimeUnit.SECONDS)), + Optional.of(new io.airlift.units.Duration(1200, TimeUnit.SECONDS)), + Optional.of(new io.airlift.units.DataSize(2, DataSize.Unit.GIGABYTE)), + Optional.of(new io.airlift.units.DataSize(2, DataSize.Unit.GIGABYTE))); + return new QueryContext( + user, + principal, + remoteClientAddress, + userAgent, + clientInfo, + clientTags, + source, + catalog, + schema, + resourceGroupId, + sessionProperties, + resourceEstimates, + serverAddress, + serverVersion, + environment, + workerType); + } + + private static QueryIOMetadata createDummyQueryIoMetadata() + { + List inputs = new ArrayList<>(); + QueryInputMetadata queryInputMetadata = getQueryInputMetadata(); + inputs.add(queryInputMetadata); + QueryOutputMetadata outputMetadata = new QueryOutputMetadata( + "dummyCatalog", + "dummySchema", + "dummyTable", + Optional.of("dummyConnectorMetadata"), + Optional.of(true), + "dummySerializedCommitOutput"); + return new QueryIOMetadata(inputs, Optional.of(outputMetadata)); + } + + private static QueryInputMetadata getQueryInputMetadata() + { + String catalogName = "dummyCatalog"; + String schema = "dummySchema"; + String table = "dummyTable"; + String serializedCommitOutput = "commitOutputDummy"; + List columns = new ArrayList<>(Arrays.asList("column1", "column2", "column3")); + Optional connectorInfo = Optional.of(new Object()); + return new QueryInputMetadata( + catalogName, + schema, + table, + columns, + connectorInfo, + Optional.empty(), + serializedCommitOutput); + } + + private static SplitCompletedEvent createDummySplitCompletedEvent() + { + Instant now = Instant.now(); + Instant startTimeDummy = now.minusSeconds(100); + Instant endTimeDummy = now.minusSeconds(50); + SplitStatistics stats = createDummySplitStatistics(); + SplitFailureInfo failureInfo = new SplitFailureInfo("Error", "Dummy failure message"); + return new SplitCompletedEvent( + "query123", + "stage456", + "stageExec789", + "task012", + now, + Optional.of(startTimeDummy), + Optional.of(endTimeDummy), + stats, + Optional.of(failureInfo), + "dummyPayload"); + } + + private static SplitStatistics createDummySplitStatistics() + { + Duration cpuTime = Duration.ofSeconds(500); + Duration wallTime = Duration.ofSeconds(1000); + Duration queuedTime = Duration.ofSeconds(120); + Duration completedReadTime = Duration.ofSeconds(800); + + long completedPositions = 1500; + long completedDataSizeBytes = 10000000L; + + Optional timeToFirstByte = Optional.of(Duration.ofSeconds(10)); + Optional timeToLastByte = Optional.empty(); + + return new SplitStatistics( + cpuTime, + wallTime, + queuedTime, + completedReadTime, + completedPositions, + completedDataSizeBytes, + timeToFirstByte, + timeToLastByte); + } + + private static void tryDeleteFile(Path path) + { + try { + File file = new File(path.toUri()); + if (file.exists()) { + Files.delete(file.toPath()); + } + } + catch (IOException e) { + log.error(e, "Could not delete file found at [%s]", path); + } + } + + private static class TestEventListenerFactory + implements EventListenerFactory + { + public static String name; + private final TestingEventListener testingEventListener; + + public TestEventListenerFactory(TestingEventListener testingEventListener, String name) + { + this.testingEventListener = requireNonNull(testingEventListener, "testingEventListener is null"); + this.name = name; + } + + @Override + public String getName() + { + return name; + } + + @Override + public EventListener create(Map config) + { + return testingEventListener; + } + } + + private static class TestingEventListener + implements EventListener + { + private final EventsCapture eventsCapture; + + public TestingEventListener(EventsCapture eventsCapture) + { + this.eventsCapture = eventsCapture; + } + + @Override + public void queryCreated(QueryCreatedEvent queryCreatedEvent) + { + eventsCapture.addQueryCreated(queryCreatedEvent); + } + + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + eventsCapture.addQueryCompleted(queryCompletedEvent); + } + + @Override + public void splitCompleted(SplitCompletedEvent splitCompletedEvent) + { + eventsCapture.addSplitCompleted(splitCompletedEvent); + } + } + + private static class EventsCapture + { + private final ImmutableList.Builder queryCreatedEvents = ImmutableList.builder(); + private final ImmutableList.Builder queryCompletedEvents = ImmutableList.builder(); + private final ImmutableList.Builder splitCompletedEvents = ImmutableList.builder(); + + public synchronized void addQueryCreated(QueryCreatedEvent event) + { + queryCreatedEvents.add(event); + } + + public synchronized void addQueryCompleted(QueryCompletedEvent event) + { + queryCompletedEvents.add(event); + } + + public synchronized void addSplitCompleted(SplitCompletedEvent event) + { + splitCompletedEvents.add(event); + } + + public List getQueryCreatedEvents() + { + return queryCreatedEvents.build(); + } + + public List getQueryCompletedEvents() + { + return queryCompletedEvents.build(); + } + + public List getSplitCompletedEvents() + { + return splitCompletedEvents.build(); + } + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java index cdee04d9bdc83..92d32cd9cecf8 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java @@ -20,6 +20,7 @@ import com.facebook.presto.cost.StatsAndCosts; import com.facebook.presto.dispatcher.NoOpQueryManager; import com.facebook.presto.event.SplitMonitor; +import com.facebook.presto.eventlistener.EventListenerConfig; import com.facebook.presto.eventlistener.EventListenerManager; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.scheduler.LegacyNetworkTopology; @@ -201,7 +202,7 @@ public static TaskInfo updateTask(SqlTask sqlTask, List taskSources, public static SplitMonitor createTestSplitMonitor() { return new SplitMonitor( - new EventListenerManager(), + new EventListenerManager(new EventListenerConfig()), new JsonObjectMapperProvider().get()); }