diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java index b267f9015c55..5af6f7a1c933 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java @@ -49,20 +49,11 @@ public KafkaEventListener(KafkaEventListenerConfig config, KafkaProducerFactory publishSplitCompletedEvent = config.getPublishSplitCompletedEvent(); isAnonymizationEnabled = config.isAnonymizationEnabled(); - try { - if (publishCreatedEvent || publishCompletedEvent) { - kafkaPublisher = new KafkaEventPublisher(config, producerFactory, stats); - } - else { - LOG.warn("Event listener will be no-op, as neither created events nor completed events are published."); - } + if (publishCreatedEvent || publishCompletedEvent) { + kafkaPublisher = new KafkaEventPublisher(config, producerFactory, stats); } - catch (Exception e) { - if (config.getTerminateOnInitializationFailure()) { - throw e; - } - LOG.error(e, "Failed to initialize Kafka publisher."); - stats.kafkaPublisherFailedToInitialize(); + else { + LOG.warn("Event listener will be no-op, as neither created events nor completed events are published."); } } diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java index bf7389280181..27656d7e452e 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java @@ -47,7 +47,7 @@ public class KafkaEventListenerConfig private Set excludedFields = Collections.emptySet(); private Map kafkaClientOverrides = Collections.emptyMap(); private Duration requestTimeout = new Duration(10, SECONDS); - private boolean terminateOnInitializationFailure = true; + private boolean terminateOnInitializationFailure; private Optional environmentVariablePrefix = Optional.empty(); public boolean isAnonymizationEnabled() diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java index 2e340efd751d..82bdbfbb2916 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java @@ -14,6 +14,8 @@ package io.trino.plugin.eventlistener.kafka; +import com.google.common.base.Supplier; +import com.google.errorprone.annotations.concurrent.GuardedBy; import io.airlift.log.Logger; import io.trino.plugin.eventlistener.kafka.metadata.EnvMetadataProvider; import io.trino.plugin.eventlistener.kafka.metadata.MetadataProvider; @@ -25,28 +27,38 @@ import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.SplitCompletedEvent; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; +import java.time.Duration; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class KafkaEventPublisher { private static final Logger LOG = Logger.get(KafkaEventPublisher.class); - private final KafkaProducer kafkaProducer; private final KafkaRecordBuilder kafkaRecordBuilder; private final KafkaEventListenerJmxStats stats; + private final Supplier> kafkaProducerSupplier; + @GuardedBy("this") + private KafkaProducer kafkaProducer; + @GuardedBy("this") + private RuntimeException lastKafkaProducerFailure = new RuntimeException(); + private final ScheduledExecutorService connectExecutor = Executors.newSingleThreadScheduledExecutor(daemonThreadsNamed("kafka-connect-%s")); public KafkaEventPublisher(KafkaEventListenerConfig config, KafkaProducerFactory producerFactory, KafkaEventListenerJmxStats stats) - throws Exception { this.stats = requireNonNull(stats, "stats cannot be null"); requireNonNull(config, "config cannot be null"); @@ -56,22 +68,86 @@ public KafkaEventPublisher(KafkaEventListenerConfig config, KafkaProducerFactory String createdTopic = config.getCreatedTopicName().orElse(""); String completedTopic = config.getCompletedTopicName().orElse(""); String splitCompletedTopic = config.getSplitCompletedTopicName().orElse(""); + kafkaProducerSupplier = () -> { + Map configOverrides = config.getKafkaClientOverrides(); + LOG.info("Creating Kafka publisher (SSL=%s) for topics: %s/%s with excluded fields: %s and kafka config overrides: %s", + producerFactory instanceof SSLKafkaProducerFactory, createdTopic, completedTopic, config.getExcludedFields(), configOverrides); + KafkaProducer kafkaProducer = producerFactory.producer(configOverrides); + try { + checkConnectivityToBrokers(config.getPublishCreatedEvent() ? createdTopic : completedTopic, config.getRequestTimeout().toMillis()); + } + catch (Throwable e) { + try { + kafkaProducer.close(); + } + catch (Throwable closeException) { + e.addSuppressed(closeException); + } + LOG.error(e, "Failed to initialize Kafka publisher."); + stats.kafkaPublisherFailedToInitialize(); + throw new RuntimeException("Cannot connect to kafka broker", e); + } + return kafkaProducer; + }; + + try { + kafkaProducer = kafkaProducerSupplier.get(); + } + catch (Exception e) { + if (config.getTerminateOnInitializationFailure()) { + throw e; + } + } + + // schedule reconnecting + if (kafkaProducer == null) { + connectExecutor.schedule(new Runnable() + { + @Override + public void run() + { + while (true) { + try { + // success + KafkaProducer kafkaProducer = kafkaProducerSupplier.get(); + synchronized (KafkaEventPublisher.this) { + KafkaEventPublisher.this.kafkaProducer = kafkaProducer; + } + return; + } + catch (RuntimeException e) { + LOG.error("Could not create Kafka producer", e); + synchronized (KafkaEventPublisher.this) { + KafkaEventPublisher.this.lastKafkaProducerFailure = e; + } + // reschedule + connectExecutor.schedule(this, KAFKA_CONNECT_INTERVAL.toMillis(), MILLISECONDS); + } + } + } + }, KAFKA_CONNECT_INTERVAL.toMillis(), MILLISECONDS); + } - Map configOverrides = config.getKafkaClientOverrides(); - LOG.info("Creating Kafka publisher (SSL=%s) for topics: %s/%s with excluded fields: %s and kafka config overrides: %s", - producerFactory instanceof SSLKafkaProducerFactory, createdTopic, completedTopic, config.getExcludedFields(), configOverrides); - kafkaProducer = producerFactory.producer(configOverrides); - checkConnectivityToBrokers(config.getPublishCreatedEvent() ? createdTopic : completedTopic, config.getRequestTimeout().toMillis()); kafkaRecordBuilder = new KafkaRecordBuilder(createdTopic, completedTopic, splitCompletedTopic, config.getExcludedFields(), metadataProvider(config)); LOG.info("Successfully created Kafka publisher."); } + private static final java.time.Duration KAFKA_CONNECT_INTERVAL = Duration.of(10, SECONDS); + + private synchronized Producer getProducer() + { + if (kafkaProducer != null) { + return kafkaProducer; + } + throw new RuntimeException("Could not initialize Kafka producer; waiting for retry", lastKafkaProducerFailure); + } + private void checkConnectivityToBrokers(String topic, long requestTimeout) throws Exception { LOG.info("checking connectivity to brokers (fetching partitions for topic=%s).", topic); - CompletableFuture future = CompletableFuture.runAsync(() -> kafkaProducer.partitionsFor(topic)); - future.get(requestTimeout, TimeUnit.MILLISECONDS); + CompletableFuture future = CompletableFuture.runAsync(() -> getProducer().partitionsFor(topic)); + future.get(requestTimeout, MILLISECONDS); LOG.info("connectivity check succeeded."); } @@ -97,12 +173,12 @@ record = kafkaRecordBuilder.buildCompletedRecord(queryCompletedEvent); LOG.warn(e, "unable to build QueryCompletedEvent for query id: %s", queryId); } if (record != null) { - kafkaProducer.send(record, (metadata, exception) -> { + getProducer().send(record, (metadata, exception) -> { if (exception != null) { switch (exception) { - case TimeoutException e -> stats.completedEventSendFailureTimeout(); - case RecordTooLargeException e -> stats.completedEventSendFailureTooLarge(); - case InvalidRecordException e -> stats.completedEventSendFailureInvalidRecord(); + case TimeoutException _ -> stats.completedEventSendFailureTimeout(); + case RecordTooLargeException _ -> stats.completedEventSendFailureTooLarge(); + case InvalidRecordException _ -> stats.completedEventSendFailureInvalidRecord(); default -> stats.completedEventSendFailureOther(); } LOG.warn(exception, "failed to send QueryCompletedEvent for query id: %s. Uncompressed message size: %s. Partition: %s", @@ -130,12 +206,12 @@ record = kafkaRecordBuilder.buildStartedRecord(queryCreatedEvent); LOG.warn(e, "unable to build QueryCreatedEvent for query id: %s", queryId); } if (record != null) { - kafkaProducer.send(record, (metadata, exception) -> { + getProducer().send(record, (metadata, exception) -> { if (exception != null) { switch (exception) { - case TimeoutException e -> stats.createdEventSendFailureTimeout(); - case RecordTooLargeException e -> stats.createdEventSendFailureTooLarge(); - case InvalidRecordException e -> stats.createdEventSendFailureInvalidRecord(); + case TimeoutException _ -> stats.createdEventSendFailureTimeout(); + case RecordTooLargeException _ -> stats.createdEventSendFailureTooLarge(); + case InvalidRecordException _ -> stats.createdEventSendFailureInvalidRecord(); default -> stats.createdEventSendFailureOther(); } LOG.warn(exception, "failed to send QueryCreatedEvent for query id: %s. Uncompressed message size: %s. Partition: %s", @@ -163,12 +239,12 @@ record = kafkaRecordBuilder.buildSplitCompletedRecord(splitCompletedEvent); LOG.warn(e, "unable to build SplitCompletedEvent for query id: %s", queryId); } if (record != null) { - kafkaProducer.send(record, (metadata, exception) -> { + getProducer().send(record, (metadata, exception) -> { if (exception != null) { switch (exception) { - case TimeoutException e -> stats.splitCompletedEventSendFailureTimeout(); - case RecordTooLargeException e -> stats.splitCompletedEventSendFailureTooLarge(); - case InvalidRecordException e -> stats.splitCompletedEventSendFailureInvalidRecord(); + case TimeoutException _ -> stats.splitCompletedEventSendFailureTimeout(); + case RecordTooLargeException _ -> stats.splitCompletedEventSendFailureTooLarge(); + case InvalidRecordException _ -> stats.splitCompletedEventSendFailureInvalidRecord(); default -> stats.splitCompletedEventSendFailureOther(); } LOG.warn(exception, "failed to send SplitCompletedEvent for query id: %s. Uncompressed message size: %s. Partition: %s", @@ -184,6 +260,6 @@ record = kafkaRecordBuilder.buildSplitCompletedRecord(splitCompletedEvent); public void shutdown() { - kafkaProducer.close(); + getProducer().close(); } } diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java index 62213686b633..4120d491a03e 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java @@ -47,7 +47,7 @@ void testDefaults() .setExcludedFields(Set.of()) .setKafkaClientOverrides("") .setRequestTimeout(new Duration(10, TimeUnit.SECONDS)) - .setTerminateOnInitializationFailure(true) + .setTerminateOnInitializationFailure(false) .setEnvironmentVariablePrefix(null)); } @@ -68,7 +68,7 @@ void testExplicitPropertyMappings() .put("kafka-event-listener.request-timeout", "3s") .put("kafka-event-listener.env-var-prefix", "INSIGHTS_") .put("kafka-event-listener.anonymization.enabled", "true") - .put("kafka-event-listener.terminate-on-initialization-failure", "false") + .put("kafka-event-listener.terminate-on-initialization-failure", "true") .buildOrThrow(); KafkaEventListenerConfig expected = new KafkaEventListenerConfig() @@ -85,7 +85,7 @@ void testExplicitPropertyMappings() .setKafkaClientOverrides("foo=bar,baz=yoo") .setRequestTimeout(new Duration(3, TimeUnit.SECONDS)) .setEnvironmentVariablePrefix("INSIGHTS_") - .setTerminateOnInitializationFailure(false); + .setTerminateOnInitializationFailure(true); assertFullMapping(properties, expected); }