Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugin/trino-kafka-event-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
<packaging>trino-plugin</packaging>
<description>Trino - Kafka Event Listener</description>

<properties>
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
</properties>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ public KafkaEventListener(KafkaEventListenerConfig config, KafkaProducerFactory
if (config.getTerminateOnInitializationFailure()) {
throw e;
}
else {
LOG.error(e, "Failed to initialize Kafka publisher.");
stats.kafkaPublisherFailedToInitialize();
}
LOG.error(e, "Failed to initialize Kafka publisher.");
stats.kafkaPublisherFailedToInitialize();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ private MetadataProvider metadataProvider(KafkaEventListenerConfig config)
if (config.getEnvironmentVariablePrefix().isPresent()) {
return new EnvMetadataProvider(config.getEnvironmentVariablePrefix().get());
}
else {
return new NoOpMetadataProvider();
}
return new NoOpMetadataProvider();
}

public void publishCompletedEvent(QueryCompletedEvent queryCompletedEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.trino.plugin.eventlistener.kafka.model;

import com.google.common.collect.ImmutableMap;
import io.trino.spi.eventlistener.QueryCompletedEvent;

import java.util.Map;
Expand All @@ -25,6 +26,6 @@ public record QueryCompletedEventWrapper(QueryCompletedEvent eventPayload, Map<S
public QueryCompletedEventWrapper
{
requireNonNull(eventPayload, "eventPayload is null");
requireNonNull(eventMetadata, "eventMetadata is null");
eventMetadata = ImmutableMap.copyOf(requireNonNull(eventMetadata, "eventMetadata is null"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.trino.plugin.eventlistener.kafka.model;

import com.google.common.collect.ImmutableMap;
import io.trino.spi.eventlistener.QueryCreatedEvent;

import java.util.Map;
Expand All @@ -25,6 +26,6 @@ public record QueryCreatedEventWrapper(QueryCreatedEvent eventPayload, Map<Strin
public QueryCreatedEventWrapper
{
requireNonNull(eventPayload, "eventPayload is null");
requireNonNull(eventMetadata, "eventMetadata is null");
eventMetadata = ImmutableMap.copyOf(requireNonNull(eventMetadata, "eventMetadata is null"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestKafkaEventListenerConfig
final class TestKafkaEventListenerConfig
{
@Test
void testDefaults()
Expand Down Expand Up @@ -84,7 +85,7 @@ void testExplicitPropertyMappings()
}

@Test
public void testExcludedFields()
void testExcludedFields()
{
KafkaEventListenerConfig conf = new KafkaEventListenerConfig();
// check default
Expand All @@ -94,53 +95,47 @@ public void testExcludedFields()
// check setting multiple
conf.setExcludedFields(Set.of("payload", "plan", "user", "groups"));
excludedFields = conf.getExcludedFields();
assertThat(excludedFields.size()).isEqualTo(4);
assertThat(excludedFields.contains("payload")).isTrue();
assertThat(excludedFields.contains("plan")).isTrue();
assertThat(excludedFields.contains("user")).isTrue();
assertThat(excludedFields.contains("groups")).isTrue();
assertThat(excludedFields)
.containsOnly("payload", "plan", "user", "groups");

// setting to empty
conf.setExcludedFields(Set.of(""));
excludedFields = conf.getExcludedFields();
assertThat(excludedFields.size()).isEqualTo(0);
assertThat(excludedFields).isEmpty();

// setting to empty with commas
conf.setExcludedFields(Set.of(" ", ""));
excludedFields = conf.getExcludedFields();
assertThat(excludedFields.size()).isEqualTo(0);
assertThat(excludedFields).isEmpty();
}

@Test
public void testKafkaClientOverrides()
void testKafkaClientOverrides()
{
KafkaEventListenerConfig conf = new KafkaEventListenerConfig();
// check default
Map<String, String> overrides = conf.getKafkaClientOverrides();
assertThat(overrides.size()).isEqualTo(0);
assertThat(overrides).isEmpty();

// check setting just one
conf.setKafkaClientOverrides("buffer.memory=444555");
overrides = conf.getKafkaClientOverrides();
assertThat(overrides.size()).isEqualTo(1);
assertThat(overrides.get("buffer.memory")).isEqualTo("444555");
assertThat(overrides).containsExactly(entry("buffer.memory", "444555"));

// check setting multiple
conf.setKafkaClientOverrides("buffer.memory=444555, compression.type=zstd");
overrides = conf.getKafkaClientOverrides();
assertThat(overrides.size()).isEqualTo(2);
assertThat(overrides.get("buffer.memory")).isEqualTo("444555");
assertThat(overrides.get("compression.type")).isEqualTo("zstd");
assertThat(overrides)
.containsExactly(entry("buffer.memory", "444555"), entry("compression.type", "zstd"));

// check empty trailing param
conf.setKafkaClientOverrides("buffer.memory=555777,");
overrides = conf.getKafkaClientOverrides();
assertThat(overrides.size()).isEqualTo(1);
assertThat(overrides.get("buffer.memory")).isEqualTo("555777");
assertThat(overrides).containsExactly(entry("buffer.memory", "555777"));

conf.setKafkaClientOverrides(",, ,");
overrides = conf.getKafkaClientOverrides();
assertThat(overrides.size()).isEqualTo(0);
assertThat(overrides).isEmpty();

// check missing = throws
assertThatThrownBy(() -> conf.setKafkaClientOverrides("invalid,buffer.memory=555777"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.assertj.core.api.Assertions.assertThat;

public class TestKafkaEventListenerPlugin
final class TestKafkaEventListenerPlugin
{
private static final String CREATED_TOPIC = "query_created";
private static final String COMPLETED_TOPIC = "query_completed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import java.util.Map;
import java.util.Set;

import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;

public class TestKafkaRecordBuilder
final class TestKafkaRecordBuilder
{
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Set<String> EXCLUDED_FIELDS = ImmutableSet.of(
Expand All @@ -44,7 +45,7 @@ public class TestKafkaRecordBuilder
private static final MetadataProvider TEST_PROVIDER = new TestMetadataProvider("TRINO_INSIGHTS");

@Test
public void testBuildKafkaRecord()
void testBuildKafkaRecord()
throws IOException
{
KafkaRecordBuilder builder = new KafkaRecordBuilder("TestQueryStartedEvent", "TestQueryCompletedEvent", EXCLUDED_FIELDS, TEST_PROVIDER);
Expand All @@ -60,7 +61,7 @@ public void testBuildKafkaRecord()
}

@Test
public void testBuildKafkaRecordWithExclusions()
void testBuildKafkaRecordWithExclusions()
throws IOException
{
Set<String> exclude = Sets.union(EXCLUDED_FIELDS, Set.of("query", "principal", "analysisTime", "writtenBytes"));
Expand All @@ -81,7 +82,7 @@ public void testBuildKafkaRecordWithExclusions()
}

@Test
public void testBuildKafkaRecordWithMetadata()
void testBuildKafkaRecordWithMetadata()
throws IOException
{
Set<String> exclude = Sets.union(EXCLUDED_FIELDS, Set.of("context", "payload", "analysisTime"));
Expand All @@ -94,8 +95,7 @@ public void testBuildKafkaRecordWithMetadata()
assertThat(record.key()).isNull();
Map<String, String> metadata = MAPPER.readValue(MAPPER.readTree(record.value()).get("eventMetadata").toString(), Map.class);

assertThat(metadata.size()).isEqualTo(1);
assertThat(metadata.get("baz")).isEqualTo("yoo");
assertThat(metadata).containsExactly(entry("baz", "yoo"));
}

static class TestMetadataProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static java.time.Duration.ofMillis;

public class TestUtils
public final class TestUtils
{
private static final QueryIOMetadata queryIOMetadata;
private static final QueryContext queryContext;
Expand Down