Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.validation.FileExists;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import java.io.File;
Expand All @@ -26,6 +27,7 @@
public class EventListenerConfig
{
private List<File> eventListenerFiles = ImmutableList.of();
private int maxConcurrentQueryCompletedEvents = 100;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 100? IMO, the default should be unlimited. The specific number would depend on which event listeners you're using and whether they can handle a given number of concurrent calls.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@NotNull
public List<@FileExists File> getEventListenerFiles()
Expand All @@ -41,4 +43,17 @@ public EventListenerConfig setEventListenerFiles(List<String> eventListenerFiles
.collect(toImmutableList());
return this;
}

@Min(1)
public int getMaxConcurrentQueryCompletedEvents()
{
return maxConcurrentQueryCompletedEvents;
}

@Config("event-listener.max-concurrent-query-completed-events")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think "concurrent" is a good name for this. It doesn't convey the right concept or suggest a good mental model. An event is a point in time concept, so concurrent over what period? I would suggest using the term "pending" instead.

Copy link
Copy Markdown
Member Author

@sopel39 sopel39 Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending might suggest there is limit on queue size of "query completed events", but there isn't

"concurrent completed events" metric is only bumped when there are multiple (multithreaded) calls to event listener at the same time. Even if there could be 1000 query completed events scheduled to be run by single threaded executor, "concurrent completed events" metric will be at most 1. The backpressure in this case comes not from executor queue, but from max number of concurrent queries. If event listener is slow to consume events, then excessive events are simply ignored preventing system from blowing up.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. In that case, I'm not sure I understand what problem this PR is trying to solve. If an event listener cannot handle a certain concurrent load, it should limit (or discard) events internally.

Copy link
Copy Markdown
Member Author

@sopel39 sopel39 Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem are badly written custom event listeners. Usually they are not written by core Trino experts or concurrency experts which leads to deployment issues and poor experience

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a problem for whoever is deploying those poorly written event listeners. I'm not sure I agree with Trino having to work around plugins that are not designed to handle how Trino works.

Also, as I described elsewhere, the way this is implemented in the PR, a poorly written listener will affect the ability to collect events in listeners that don't have those issues.

public EventListenerConfig setMaxConcurrentQueryCompletedEvents(int maxConcurrentQueryCompletedEvents)
{
this.maxConcurrentQueryCompletedEvents = maxConcurrentQueryCompletedEvents;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.stats.TimeStat;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.eventlistener.EventListener;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -55,10 +57,13 @@ public class EventListenerManager
private static final File CONFIG_FILE = new File("etc/event-listener.properties");
private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name";
private final List<File> configFiles;
private final int maxConcurrentQueryCompletedEvents;
private final Map<String, EventListenerFactory> eventListenerFactories = new ConcurrentHashMap<>();
private final List<EventListener> providedEventListeners = Collections.synchronizedList(new ArrayList<>());
private final AtomicReference<List<EventListener>> configuredEventListeners = new AtomicReference<>(ImmutableList.of());
private final AtomicBoolean loading = new AtomicBoolean(false);
private final AtomicInteger concurrentQueryCompletedEvents = new AtomicInteger();
private final CounterStat skippedQueryCompletedEvents = new CounterStat();

private final TimeStat queryCreatedTime = new TimeStat(MILLISECONDS);
private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS);
Expand All @@ -68,6 +73,7 @@ public class EventListenerManager
public EventListenerManager(EventListenerConfig config)
{
this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles());
this.maxConcurrentQueryCompletedEvents = config.getMaxConcurrentQueryCompletedEvents();
}

public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
Expand Down Expand Up @@ -144,7 +150,13 @@ private static Map<String, String> loadEventListenerProperties(File configFile)
public void queryCompleted(Function<Boolean, QueryCompletedEvent> queryCompletedEventProvider)
{
try (TimeStat.BlockTimer _ = queryCompletedTime.time()) {
if (concurrentQueryCompletedEvents.incrementAndGet() > maxConcurrentQueryCompletedEvents) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit is being applied globally, so if you have multiple listeners and only one of the is slow, it will cause events to be missed for the other listeners.

concurrentQueryCompletedEvents.decrementAndGet();
skippedQueryCompletedEvents.update(1);
return;
}
doQueryCompleted(queryCompletedEventProvider);
concurrentQueryCompletedEvents.decrementAndGet();
}
}

Expand Down Expand Up @@ -220,6 +232,19 @@ public TimeStat getSplitCompletedTime()
return splitCompletedTime;
}

@Managed
public int getConcurrentQueryCompletedEvents()
{
return concurrentQueryCompletedEvents.get();
}

@Managed
@Nested
public CounterStat getSkippedQueryCompletedEvents()
{
return skippedQueryCompletedEvents;
}

@PreDestroy
public void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TestEventListenerConfig
public void testDefaults()
{
assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class)
.setMaxConcurrentQueryCompletedEvents(100)
.setEventListenerFiles(ImmutableList.of()));
}

Expand All @@ -42,10 +43,13 @@ public void testExplicitPropertyMappings()
Path config1 = Files.createTempFile(null, null);
Path config2 = Files.createTempFile(null, null);

Map<String, String> properties = ImmutableMap.of("event-listener.config-files", config1.toString() + "," + config2.toString());
Map<String, String> properties = ImmutableMap.of(
"event-listener.config-files", config1.toString() + "," + config2.toString(),
"event-listener.max-concurrent-query-completed-events", "1");

EventListenerConfig expected = new EventListenerConfig()
.setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath()));
.setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath()))
.setMaxConcurrentQueryCompletedEvents(1);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,139 @@
package io.trino.eventlistener;

import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
import io.trino.spi.eventlistener.QueryIOMetadata;
import io.trino.spi.eventlistener.QueryMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.session.ResourceEstimates;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static java.time.Duration.ofMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;

class TestEventListenerManager
{
private static final QueryMetadata QUERY_METADATA = new QueryMetadata(
"minimal_query",
Optional.empty(),
"query",
Optional.empty(),
Optional.empty(),
"queryState",
// not stored
List.of(),
// not stored
List.of(),
URI.create("http://localhost"),
Optional.empty(),
Optional.empty(),
Optional.empty());

private static final QueryStatistics QUERY_STATISTICS = new QueryStatistics(
ofMillis(101),
ofMillis(102),
ofMillis(103),
ofMillis(104),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
115L,
116L,
117L,
118L,
119L,
1191L,
1192L,
120L,
121L,
122L,
123L,
124L,
125L,
126L,
127L,
1271L,
128.0,
129.0,
// not stored
Collections.emptyList(),
130,
false,
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
// not stored
Optional.empty());

private static final QueryContext QUERY_CONTEXT = new QueryContext(
"user",
"originalUser",
Optional.empty(),
Set.of(),
Set.of(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Set.of(),
// not stored
Set.of(),
Optional.empty(),
UTC_KEY.getId(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Map.of(),
// not stored
new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty()),
"serverAddress",
"serverVersion",
"environment",
Optional.empty(),
"NONE");

private static final QueryIOMetadata QUERY_IO_METADATA = new QueryIOMetadata(List.of(), Optional.empty());

private static final QueryCompletedEvent QUERY_COMPLETED_EVENT = new QueryCompletedEvent(
QUERY_METADATA,
QUERY_STATISTICS,
QUERY_CONTEXT,
QUERY_IO_METADATA,
Optional.empty(),
List.of(),
Instant.now(),
Instant.now(),
Instant.now());

@Test
public void testShutdownIsForwardedToListeners()
{
Expand All @@ -42,4 +167,47 @@ public void shutdown()

assertThat(wasCalled.get()).isTrue();
}

@Test
public void testMaxConcurrentQueryCompletedEvents()
throws InterruptedException
{
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig().setMaxConcurrentQueryCompletedEvents(1));
eventListenerManager.addEventListener(new BlockingEventListener());
eventListenerManager.loadEventListeners();
ExecutorService executor = newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
Runnable queryCompletedEvent = () -> {
eventListenerManager.queryCompleted(_ -> QUERY_COMPLETED_EVENT);
countDownLatch.countDown();
};
executor.submit(queryCompletedEvent);
executor.submit(queryCompletedEvent);

countDownLatch.await();
assertThat(eventListenerManager.getSkippedQueryCompletedEvents().getTotalCount()).isEqualTo(1);
assertThat(eventListenerManager.getConcurrentQueryCompletedEvents()).isEqualTo(1);
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
}
}

private static final class BlockingEventListener
implements EventListener
{
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
try {
// sleep forever
Thread.sleep(100_000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/develop/event-listener.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ custom-property1=custom-value1
custom-property2=custom-value2
```

Maximum number of concurrent query completed events
can be configured using `event-listener.max-concurrent-query-completed-events` property
(`100` by default). Excessive events are dropped.

(multiple-listeners)=
## Multiple event listeners

Expand Down