From c459cf8de90e536ea8bda0ccdc7a8010be8cc32c Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 14 Aug 2024 11:50:55 +0900 Subject: [PATCH 1/6] Remove redundant else --- .../plugin/eventlistener/kafka/KafkaEventListener.java | 6 ++---- .../plugin/eventlistener/kafka/KafkaEventPublisher.java | 4 +--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java index 98133b2a92a2..320631d90796 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListener.java @@ -59,10 +59,8 @@ public KafkaEventListener(KafkaEventListenerConfig config, KafkaProducerFactory if (config.getTerminateOnInitializationFailure()) { throw e; } - else { - LOG.error(e, "Failed to initialize Kafka publisher."); - stats.kafkaPublisherFailedToInitialize(); - } + LOG.error(e, "Failed to initialize Kafka publisher."); + stats.kafkaPublisherFailedToInitialize(); } } diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java index 8678c184c659..7f94adfc6782 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.java @@ -78,9 +78,7 @@ private MetadataProvider metadataProvider(KafkaEventListenerConfig config) if (config.getEnvironmentVariablePrefix().isPresent()) { return new EnvMetadataProvider(config.getEnvironmentVariablePrefix().get()); } - else { - return new NoOpMetadataProvider(); - } + return new NoOpMetadataProvider(); } public void publishCompletedEvent(QueryCompletedEvent queryCompletedEvent) From 9b57e052711ff2d0ef91a89ab7fe4082183b9c29 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 14 Aug 2024 11:51:14 +0900 Subject: [PATCH 2/6] Use ImmutableMap.copyOf --- .../eventlistener/kafka/model/QueryCompletedEventWrapper.java | 3 ++- .../eventlistener/kafka/model/QueryCreatedEventWrapper.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java index ecd23096a612..4c9c12f536c8 100644 --- a/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java +++ b/plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/model/QueryCompletedEventWrapper.java @@ -14,6 +14,7 @@ package io.trino.plugin.eventlistener.kafka.model; +import com.google.common.collect.ImmutableMap; import io.trino.spi.eventlistener.QueryCompletedEvent; import java.util.Map; @@ -25,6 +26,6 @@ public record QueryCompletedEventWrapper(QueryCompletedEvent eventPayload, Map Date: Wed, 14 Aug 2024 11:51:56 +0900 Subject: [PATCH 3/6] Follow test guideline in Kafka event listener --- .../eventlistener/kafka/TestKafkaEventListenerConfig.java | 6 +++--- .../eventlistener/kafka/TestKafkaEventListenerPlugin.java | 2 +- .../eventlistener/kafka/TestKafkaRecordBuilder.java | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java index f9f1624e22bd..ace5f852f470 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java @@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -public class TestKafkaEventListenerConfig +final class TestKafkaEventListenerConfig { @Test void testDefaults() @@ -84,7 +84,7 @@ void testExplicitPropertyMappings() } @Test - public void testExcludedFields() + void testExcludedFields() { KafkaEventListenerConfig conf = new KafkaEventListenerConfig(); // check default @@ -112,7 +112,7 @@ public void testExcludedFields() } @Test - public void testKafkaClientOverrides() + void testKafkaClientOverrides() { KafkaEventListenerConfig conf = new KafkaEventListenerConfig(); // check default diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java index 7bd55ecc705d..ec3e4551222f 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.java @@ -39,7 +39,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static org.assertj.core.api.Assertions.assertThat; -public class TestKafkaEventListenerPlugin +final class TestKafkaEventListenerPlugin { private static final String CREATED_TOPIC = "query_created"; private static final String COMPLETED_TOPIC = "query_completed"; diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java index 1d7d0e7eb19b..be0554372088 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java @@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; -public class TestKafkaRecordBuilder +final class TestKafkaRecordBuilder { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final Set EXCLUDED_FIELDS = ImmutableSet.of( @@ -44,7 +44,7 @@ public class TestKafkaRecordBuilder private static final MetadataProvider TEST_PROVIDER = new TestMetadataProvider("TRINO_INSIGHTS"); @Test - public void testBuildKafkaRecord() + void testBuildKafkaRecord() throws IOException { KafkaRecordBuilder builder = new KafkaRecordBuilder("TestQueryStartedEvent", "TestQueryCompletedEvent", EXCLUDED_FIELDS, TEST_PROVIDER); @@ -60,7 +60,7 @@ public void testBuildKafkaRecord() } @Test - public void testBuildKafkaRecordWithExclusions() + void testBuildKafkaRecordWithExclusions() throws IOException { Set exclude = Sets.union(EXCLUDED_FIELDS, Set.of("query", "principal", "analysisTime", "writtenBytes")); @@ -81,7 +81,7 @@ public void testBuildKafkaRecordWithExclusions() } @Test - public void testBuildKafkaRecordWithMetadata() + void testBuildKafkaRecordWithMetadata() throws IOException { Set exclude = Sets.union(EXCLUDED_FIELDS, Set.of("context", "payload", "analysisTime")); From fac0f2513ad3161b3bdb6e0e43526ac2e4e63065 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 14 Aug 2024 11:52:25 +0900 Subject: [PATCH 4/6] Simplify assertions in Kafka event listener --- .../kafka/TestKafkaEventListenerConfig.java | 27 ++++++++----------- .../kafka/TestKafkaRecordBuilder.java | 4 +-- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java index ace5f852f470..7b2ac5f4d851 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java @@ -25,6 +25,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -94,21 +95,18 @@ void testExcludedFields() // check setting multiple conf.setExcludedFields(Set.of("payload", "plan", "user", "groups")); excludedFields = conf.getExcludedFields(); - assertThat(excludedFields.size()).isEqualTo(4); - assertThat(excludedFields.contains("payload")).isTrue(); - assertThat(excludedFields.contains("plan")).isTrue(); - assertThat(excludedFields.contains("user")).isTrue(); - assertThat(excludedFields.contains("groups")).isTrue(); + assertThat(excludedFields) + .containsOnly("payload", "plan", "user", "groups"); // setting to empty conf.setExcludedFields(Set.of("")); excludedFields = conf.getExcludedFields(); - assertThat(excludedFields.size()).isEqualTo(0); + assertThat(excludedFields).isEmpty(); // setting to empty with commas conf.setExcludedFields(Set.of(" ", "")); excludedFields = conf.getExcludedFields(); - assertThat(excludedFields.size()).isEqualTo(0); + assertThat(excludedFields).isEmpty(); } @Test @@ -117,30 +115,27 @@ void testKafkaClientOverrides() KafkaEventListenerConfig conf = new KafkaEventListenerConfig(); // check default Map overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(0); + assertThat(overrides).isEmpty(); // check setting just one conf.setKafkaClientOverrides("buffer.memory=444555"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(1); - assertThat(overrides.get("buffer.memory")).isEqualTo("444555"); + assertThat(overrides).containsExactly(entry("buffer.memory", "444555")); // check setting multiple conf.setKafkaClientOverrides("buffer.memory=444555, compression.type=zstd"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(2); - assertThat(overrides.get("buffer.memory")).isEqualTo("444555"); - assertThat(overrides.get("compression.type")).isEqualTo("zstd"); + assertThat(overrides) + .containsExactly(entry("buffer.memory", "444555"), entry("compression.type", "zstd")); // check empty trailing param conf.setKafkaClientOverrides("buffer.memory=555777,"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(1); - assertThat(overrides.get("buffer.memory")).isEqualTo("555777"); + assertThat(overrides).containsExactly(entry("buffer.memory", "555777")); conf.setKafkaClientOverrides(",, ,"); overrides = conf.getKafkaClientOverrides(); - assertThat(overrides.size()).isEqualTo(0); + assertThat(overrides).isEmpty(); // check missing = throws assertThatThrownBy(() -> conf.setKafkaClientOverrides("invalid,buffer.memory=555777")) diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java index be0554372088..b8ac412959db 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaRecordBuilder.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; final class TestKafkaRecordBuilder @@ -94,8 +95,7 @@ void testBuildKafkaRecordWithMetadata() assertThat(record.key()).isNull(); Map metadata = MAPPER.readValue(MAPPER.readTree(record.value()).get("eventMetadata").toString(), Map.class); - assertThat(metadata.size()).isEqualTo(1); - assertThat(metadata.get("baz")).isEqualTo("yoo"); + assertThat(metadata).containsExactly(entry("baz", "yoo")); } static class TestMetadataProvider From 2da8be68803cd0a253968d24ee56c838317a1e9c Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 14 Aug 2024 11:52:37 +0900 Subject: [PATCH 5/6] Make TestUtils final --- .../java/io/trino/plugin/eventlistener/kafka/TestUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java index a0df8f1d51b6..7c1f56cb6bb8 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java @@ -46,7 +46,7 @@ import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static java.time.Duration.ofMillis; -public class TestUtils +public final class TestUtils { private static final QueryIOMetadata queryIOMetadata; private static final QueryContext queryContext; From aebcb853604b783e48073bd0bb85fa75bb22bbb5 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 14 Aug 2024 11:52:55 +0900 Subject: [PATCH 6/6] Enable air.compiler.fail-warnings in Kafka event listener --- plugin/trino-kafka-event-listener/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugin/trino-kafka-event-listener/pom.xml b/plugin/trino-kafka-event-listener/pom.xml index d322443e93f2..dcba886d4a6d 100644 --- a/plugin/trino-kafka-event-listener/pom.xml +++ b/plugin/trino-kafka-event-listener/pom.xml @@ -13,6 +13,10 @@ trino-plugin Trino - Kafka Event Listener + + true + + com.fasterxml.jackson.core