Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,11 @@ public KafkaEventListener(KafkaEventListenerConfig config, KafkaProducerFactory
publishSplitCompletedEvent = config.getPublishSplitCompletedEvent();
isAnonymizationEnabled = config.isAnonymizationEnabled();

try {
if (publishCreatedEvent || publishCompletedEvent) {
kafkaPublisher = new KafkaEventPublisher(config, producerFactory, stats);
}
else {
LOG.warn("Event listener will be no-op, as neither created events nor completed events are published.");
}
if (publishCreatedEvent || publishCompletedEvent) {
kafkaPublisher = new KafkaEventPublisher(config, producerFactory, stats);
}
catch (Exception e) {
if (config.getTerminateOnInitializationFailure()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #23606 (comment). I don't really think we need to complicate it further

throw e;
}
LOG.error(e, "Failed to initialize Kafka publisher.");
stats.kafkaPublisherFailedToInitialize();
else {
LOG.warn("Event listener will be no-op, as neither created events nor completed events are published.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class KafkaEventListenerConfig
private Set<String> excludedFields = Collections.emptySet();
private Map<String, String> kafkaClientOverrides = Collections.emptyMap();
private Duration requestTimeout = new Duration(10, SECONDS);
private boolean terminateOnInitializationFailure = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably could just keep this commit, but I still don't understand the rationale? Does it fail in tests or something else?

Copy link
Copy Markdown
Contributor

@marton-bod marton-bod Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a 100% sure this is a good change, but I can be convinced. I think that if a user explicitly configures a kafka-event-listener, they would want to know if the brokers are unreachable instead of the plugin silently "failing". They can always set this flag to false explicitly, if they are fine with ignoring this problem.

private boolean terminateOnInitializationFailure;
private Optional<String> environmentVariablePrefix = Optional.empty();

public boolean isAnonymizationEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package io.trino.plugin.eventlistener.kafka;

import com.google.common.base.Supplier;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.log.Logger;
import io.trino.plugin.eventlistener.kafka.metadata.EnvMetadataProvider;
import io.trino.plugin.eventlistener.kafka.metadata.MetadataProvider;
Expand All @@ -25,28 +27,38 @@
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class KafkaEventPublisher
{
private static final Logger LOG = Logger.get(KafkaEventPublisher.class);

private final KafkaProducer<String, String> kafkaProducer;
private final KafkaRecordBuilder kafkaRecordBuilder;
private final KafkaEventListenerJmxStats stats;
private final Supplier<KafkaProducer<String, String>> kafkaProducerSupplier;
@GuardedBy("this")
private KafkaProducer<String, String> kafkaProducer;
@GuardedBy("this")
private RuntimeException lastKafkaProducerFailure = new RuntimeException();
private final ScheduledExecutorService connectExecutor = Executors.newSingleThreadScheduledExecutor(daemonThreadsNamed("kafka-connect-%s"));

public KafkaEventPublisher(KafkaEventListenerConfig config, KafkaProducerFactory producerFactory, KafkaEventListenerJmxStats stats)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrapped in try catch in io.trino.plugin.eventlistener.kafka.KafkaEventListener#KafkaEventListener. Why it's not sufficient?

throws Exception
{
this.stats = requireNonNull(stats, "stats cannot be null");
requireNonNull(config, "config cannot be null");
Expand All @@ -56,22 +68,86 @@ public KafkaEventPublisher(KafkaEventListenerConfig config, KafkaProducerFactory
String createdTopic = config.getCreatedTopicName().orElse("");
String completedTopic = config.getCompletedTopicName().orElse("");
String splitCompletedTopic = config.getSplitCompletedTopicName().orElse("");
kafkaProducerSupplier = () -> {
Map<String, String> configOverrides = config.getKafkaClientOverrides();
LOG.info("Creating Kafka publisher (SSL=%s) for topics: %s/%s with excluded fields: %s and kafka config overrides: %s",
producerFactory instanceof SSLKafkaProducerFactory, createdTopic, completedTopic, config.getExcludedFields(), configOverrides);
KafkaProducer<String, String> kafkaProducer = producerFactory.producer(configOverrides);
try {
checkConnectivityToBrokers(config.getPublishCreatedEvent() ? createdTopic : completedTopic, config.getRequestTimeout().toMillis());
}
catch (Throwable e) {
try {
kafkaProducer.close();
}
catch (Throwable closeException) {
e.addSuppressed(closeException);
}
LOG.error(e, "Failed to initialize Kafka publisher.");
stats.kafkaPublisherFailedToInitialize();
throw new RuntimeException("Cannot connect to kafka broker", e);
}
return kafkaProducer;
};

try {
kafkaProducer = kafkaProducerSupplier.get();
}
catch (Exception e) {
if (config.getTerminateOnInitializationFailure()) {
throw e;
}
}

// schedule reconnecting
if (kafkaProducer == null) {
connectExecutor.schedule(new Runnable()
{
@Override
public void run()
{
while (true) {
try {
// success
KafkaProducer<String, String> kafkaProducer = kafkaProducerSupplier.get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to recreate it? Will kafkaProducer stop working permanently in case of some intermittent issues?

synchronized (KafkaEventPublisher.this) {
KafkaEventPublisher.this.kafkaProducer = kafkaProducer;
}
return;
}
catch (RuntimeException e) {
LOG.error("Could not create Kafka producer", e);
synchronized (KafkaEventPublisher.this) {
KafkaEventPublisher.this.lastKafkaProducerFailure = e;
}
// reschedule
connectExecutor.schedule(this, KAFKA_CONNECT_INTERVAL.toMillis(), MILLISECONDS);
}
}
}
}, KAFKA_CONNECT_INTERVAL.toMillis(), MILLISECONDS);
}

Map<String, String> configOverrides = config.getKafkaClientOverrides();
LOG.info("Creating Kafka publisher (SSL=%s) for topics: %s/%s with excluded fields: %s and kafka config overrides: %s",
producerFactory instanceof SSLKafkaProducerFactory, createdTopic, completedTopic, config.getExcludedFields(), configOverrides);
kafkaProducer = producerFactory.producer(configOverrides);
checkConnectivityToBrokers(config.getPublishCreatedEvent() ? createdTopic : completedTopic, config.getRequestTimeout().toMillis());
kafkaRecordBuilder = new KafkaRecordBuilder(createdTopic, completedTopic, splitCompletedTopic, config.getExcludedFields(), metadataProvider(config));
LOG.info("Successfully created Kafka publisher.");
}

private static final java.time.Duration KAFKA_CONNECT_INTERVAL = Duration.of(10, SECONDS);

private synchronized Producer<String, String> getProducer()
{
if (kafkaProducer != null) {
return kafkaProducer;
}
throw new RuntimeException("Could not initialize Kafka producer; waiting for retry", lastKafkaProducerFailure);
}

private void checkConnectivityToBrokers(String topic, long requestTimeout)
throws Exception
{
LOG.info("checking connectivity to brokers (fetching partitions for topic=%s).", topic);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> kafkaProducer.partitionsFor(topic));
future.get(requestTimeout, TimeUnit.MILLISECONDS);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> getProducer().partitionsFor(topic));
future.get(requestTimeout, MILLISECONDS);
LOG.info("connectivity check succeeded.");
}

Expand All @@ -97,12 +173,12 @@ record = kafkaRecordBuilder.buildCompletedRecord(queryCompletedEvent);
LOG.warn(e, "unable to build QueryCompletedEvent for query id: %s", queryId);
}
if (record != null) {
kafkaProducer.send(record, (metadata, exception) -> {
getProducer().send(record, (metadata, exception) -> {
if (exception != null) {
switch (exception) {
case TimeoutException e -> stats.completedEventSendFailureTimeout();
case RecordTooLargeException e -> stats.completedEventSendFailureTooLarge();
case InvalidRecordException e -> stats.completedEventSendFailureInvalidRecord();
case TimeoutException _ -> stats.completedEventSendFailureTimeout();
case RecordTooLargeException _ -> stats.completedEventSendFailureTooLarge();
case InvalidRecordException _ -> stats.completedEventSendFailureInvalidRecord();
default -> stats.completedEventSendFailureOther();
}
LOG.warn(exception, "failed to send QueryCompletedEvent for query id: %s. Uncompressed message size: %s. Partition: %s",
Expand Down Expand Up @@ -130,12 +206,12 @@ record = kafkaRecordBuilder.buildStartedRecord(queryCreatedEvent);
LOG.warn(e, "unable to build QueryCreatedEvent for query id: %s", queryId);
}
if (record != null) {
kafkaProducer.send(record, (metadata, exception) -> {
getProducer().send(record, (metadata, exception) -> {
if (exception != null) {
switch (exception) {
case TimeoutException e -> stats.createdEventSendFailureTimeout();
case RecordTooLargeException e -> stats.createdEventSendFailureTooLarge();
case InvalidRecordException e -> stats.createdEventSendFailureInvalidRecord();
case TimeoutException _ -> stats.createdEventSendFailureTimeout();
case RecordTooLargeException _ -> stats.createdEventSendFailureTooLarge();
case InvalidRecordException _ -> stats.createdEventSendFailureInvalidRecord();
default -> stats.createdEventSendFailureOther();
}
LOG.warn(exception, "failed to send QueryCreatedEvent for query id: %s. Uncompressed message size: %s. Partition: %s",
Expand Down Expand Up @@ -163,12 +239,12 @@ record = kafkaRecordBuilder.buildSplitCompletedRecord(splitCompletedEvent);
LOG.warn(e, "unable to build SplitCompletedEvent for query id: %s", queryId);
}
if (record != null) {
kafkaProducer.send(record, (metadata, exception) -> {
getProducer().send(record, (metadata, exception) -> {
if (exception != null) {
switch (exception) {
case TimeoutException e -> stats.splitCompletedEventSendFailureTimeout();
case RecordTooLargeException e -> stats.splitCompletedEventSendFailureTooLarge();
case InvalidRecordException e -> stats.splitCompletedEventSendFailureInvalidRecord();
case TimeoutException _ -> stats.splitCompletedEventSendFailureTimeout();
case RecordTooLargeException _ -> stats.splitCompletedEventSendFailureTooLarge();
case InvalidRecordException _ -> stats.splitCompletedEventSendFailureInvalidRecord();
default -> stats.splitCompletedEventSendFailureOther();
}
LOG.warn(exception, "failed to send SplitCompletedEvent for query id: %s. Uncompressed message size: %s. Partition: %s",
Expand All @@ -184,6 +260,6 @@ record = kafkaRecordBuilder.buildSplitCompletedRecord(splitCompletedEvent);

public void shutdown()
{
kafkaProducer.close();
getProducer().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void testDefaults()
.setExcludedFields(Set.of())
.setKafkaClientOverrides("")
.setRequestTimeout(new Duration(10, TimeUnit.SECONDS))
.setTerminateOnInitializationFailure(true)
.setTerminateOnInitializationFailure(false)
.setEnvironmentVariablePrefix(null));
}

Expand All @@ -68,7 +68,7 @@ void testExplicitPropertyMappings()
.put("kafka-event-listener.request-timeout", "3s")
.put("kafka-event-listener.env-var-prefix", "INSIGHTS_")
.put("kafka-event-listener.anonymization.enabled", "true")
.put("kafka-event-listener.terminate-on-initialization-failure", "false")
.put("kafka-event-listener.terminate-on-initialization-failure", "true")
.buildOrThrow();

KafkaEventListenerConfig expected = new KafkaEventListenerConfig()
Expand All @@ -85,7 +85,7 @@ void testExplicitPropertyMappings()
.setKafkaClientOverrides("foo=bar,baz=yoo")
.setRequestTimeout(new Duration(3, TimeUnit.SECONDS))
.setEnvironmentVariablePrefix("INSIGHTS_")
.setTerminateOnInitializationFailure(false);
.setTerminateOnInitializationFailure(true);

assertFullMapping(properties, expected);
}
Expand Down