diff --git a/plugin/trino-kafka-event-listener/pom.xml b/plugin/trino-kafka-event-listener/pom.xml index d322443e93f2..dcba886d4a6d 100644 --- a/plugin/trino-kafka-event-listener/pom.xml +++ b/plugin/trino-kafka-event-listener/pom.xml @@ -13,6 +13,10 @@ trino-plugin Trino - Kafka Event Listener + + true + + com.fasterxml.jackson.core diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java index 98133b2a92a2..320631d90796 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java @@ -59,10 +59,8 @@ public KafkaEventListener(KafkaEventListenerConfig config, KafkaProducerFactory if (config.getTerminateOnInitializationFailure()) { throw e; } - else { - LOG.error(e, "Failed to initialize Kafka publisher."); - stats.kafkaPublisherFailedToInitialize(); - } + LOG.error(e, "Failed to initialize Kafka publisher."); + stats.kafkaPublisherFailedToInitialize(); } } diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java index 8678c184c659..7f94adfc6782 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java @@ -78,9 +78,7 @@ private MetadataProvider metadataProvider(KafkaEventListenerConfig config) if (config.getEnvironmentVariablePrefix().isPresent()) { return new EnvMetadataProvider(config.getEnvironmentVariablePrefix().get()); } - else { - return new NoOpMetadataProvider(); - } + return new NoOpMetadataProvider(); } public void publishCompletedEvent(QueryCompletedEvent queryCompletedEvent) diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java index ecd23096a612..4c9c12f536c8 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java @@ -14,6 +14,7 @@ package io.trino.plugin.eventlistener.kafka.model; +import com.google.common.collect.ImmutableMap; import io.trino.spi.eventlistener.QueryCompletedEvent; import java.util.Map; @@ -25,6 +26,6 @@ public record QueryCompletedEventWrapper(QueryCompletedEvent eventPayload, Map overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(0); + assertThat(overrides).isEmpty(); // check setting just one conf.setKafkaClientOverrides("buffer.memory=444555"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(1); - assertThat(overrides.get("buffer.memory")).isEqualTo("444555"); + assertThat(overrides).containsExactly(entry("buffer.memory", "444555")); // check setting multiple conf.setKafkaClientOverrides("buffer.memory=444555, compression.type=zstd"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(2); - assertThat(overrides.get("buffer.memory")).isEqualTo("444555"); - assertThat(overrides.get("compression.type")).isEqualTo("zstd"); + assertThat(overrides) + .containsExactly(entry("buffer.memory", "444555"), entry("compression.type", "zstd")); // check empty trailing param conf.setKafkaClientOverrides("buffer.memory=555777,"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(1); - assertThat(overrides.get("buffer.memory")).isEqualTo("555777"); + assertThat(overrides).containsExactly(entry("buffer.memory", "555777")); conf.setKafkaClientOverrides(",, ,"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(0); + assertThat(overrides).isEmpty(); // check missing = throws assertThatThrownBy(() -> conf.setKafkaClientOverrides("invalid,buffer.memory=555777")) diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java index 7bd55ecc705d..ec3e4551222f 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java @@ -39,7 +39,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static org.assertj.core.api.Assertions.assertThat; -public class TestKafkaEventListenerPlugin +final class TestKafkaEventListenerPlugin { private static final String CREATED_TOPIC = "query_created"; private static final String COMPLETED_TOPIC = "query_completed"; diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java index 1d7d0e7eb19b..b8ac412959db 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java @@ -28,9 +28,10 @@ import java.util.Map; import java.util.Set; +import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; -public class TestKafkaRecordBuilder +final class TestKafkaRecordBuilder { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final Set EXCLUDED_FIELDS = ImmutableSet.of( @@ -44,7 +45,7 @@ public class TestKafkaRecordBuilder private static final MetadataProvider TEST_PROVIDER = new TestMetadataProvider("TRINO_INSIGHTS"); @Test - public void testBuildKafkaRecord() + void testBuildKafkaRecord() throws IOException { KafkaRecordBuilder builder = new KafkaRecordBuilder("TestQueryStartedEvent", "TestQueryCompletedEvent", EXCLUDED_FIELDS, TEST_PROVIDER); @@ -60,7 +61,7 @@ public void testBuildKafkaRecord() } @Test - public void testBuildKafkaRecordWithExclusions() + void testBuildKafkaRecordWithExclusions() throws IOException { Set exclude = Sets.union(EXCLUDED_FIELDS, Set.of("query", "principal", "analysisTime", "writtenBytes")); @@ -81,7 +82,7 @@ public void testBuildKafkaRecordWithExclusions() } @Test - public void testBuildKafkaRecordWithMetadata() + void testBuildKafkaRecordWithMetadata() throws IOException { Set exclude = Sets.union(EXCLUDED_FIELDS, Set.of("context", "payload", "analysisTime")); @@ -94,8 +95,7 @@ public void testBuildKafkaRecordWithMetadata() assertThat(record.key()).isNull(); Map metadata = MAPPER.readValue(MAPPER.readTree(record.value()).get("eventMetadata").toString(), Map.class); - assertThat(metadata.size()).isEqualTo(1); - assertThat(metadata.get("baz")).isEqualTo("yoo"); + assertThat(metadata).containsExactly(entry("baz", "yoo")); } static class TestMetadataProvider diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java index a0df8f1d51b6..7c1f56cb6bb8 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java @@ -46,7 +46,7 @@ import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static java.time.Duration.ofMillis; -public class TestUtils +public final class TestUtils { private static final QueryIOMetadata queryIOMetadata; private static final QueryContext queryContext;