diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java index f1a5ebf55ed8..0d17180ae1bb 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java @@ -13,8 +13,6 @@ */ package io.trino.plugin.kafka.schema.confluent; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import dev.failsafe.Failsafe; @@ -93,7 +91,7 @@ public void testBasicTopic() { String topic = "topic-basic-MixedCase-" + randomNameSuffix(); assertTopic( - testingKafka, topic, + topic, format("SELECT col_1, col_2 FROM %s", toDoubleQuoted(topic)), format("SELECT col_1, col_2, col_3 FROM %s", toDoubleQuoted(topic)), false, @@ -108,7 +106,7 @@ public void testTopicWithKeySubject() { String topic = "topic-Key-Subject-" + randomNameSuffix(); assertTopic( - testingKafka, topic, + topic, format("SELECT \"%s-key\", col_1, col_2 FROM %s", topic, toDoubleQuoted(topic)), format("SELECT \"%s-key\", col_1, col_2, col_3 FROM %s", topic, toDoubleQuoted(topic)), true, @@ -182,7 +180,7 @@ public void testTopicWithRecordNameStrategy() { String topic = "topic-Record-Name-Strategy-" + randomNameSuffix(); assertTopic( - testingKafka, topic, + topic, format("SELECT \"%1$s-key\", col_1, col_2 FROM \"%1$s&value-subject=%2$s\"", topic, RECORD_NAME), format("SELECT \"%1$s-key\", col_1, col_2, col_3 FROM \"%1$s&value-subject=%2$s\"", topic, RECORD_NAME), true, @@ -198,7 +196,7 @@ public void testTopicWithTopicRecordNameStrategy() { String topic = "topic-Topic-Record-Name-Strategy-" + randomNameSuffix(); assertTopic( - testingKafka, topic, + topic, format("SELECT \"%1$s-key\", col_1, col_2 FROM \"%1$s&value-subject=%1$s-%2$s\"", topic, RECORD_NAME), format("SELECT \"%1$s-key\", col_1, col_2, col_3 FROM \"%1$s&value-subject=%1$s-%2$s\"", topic, RECORD_NAME), true, @@ -263,7 +261,6 @@ private static ImmutableMap.Builder schemaRegistryAwareProducer( } private void assertTopic( - TestingKafka testingKafka, String topicName, String initialQuery, String evolvedQuery, @@ -278,10 +275,7 @@ private void assertTopic( waitUntilTableExists(topicName); assertCount(topicName, MESSAGE_COUNT); - QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner()); - queryAssertions.query(initialQuery) - .assertThat() - .containsAll(getExpectedValues(messages, INITIAL_SCHEMA, isKeyIncluded)); + assertThat(query(initialQuery)).matches(getExpectedValues(messages, INITIAL_SCHEMA, isKeyIncluded)); List> newMessages = createMessages(topicName, MESSAGE_COUNT, false); testingKafka.sendMessages(newMessages.stream(), producerConfig); @@ -291,9 +285,8 @@ private void assertTopic( .addAll(newMessages) .build(); assertCount(topicName, allMessages.size()); - queryAssertions.query(evolvedQuery) - .assertThat() - .containsAll(getExpectedValues(messages, EVOLVED_SCHEMA, isKeyIncluded)); + + assertThat(query(evolvedQuery)).containsAll(getExpectedValues(messages, EVOLVED_SCHEMA, isKeyIncluded)); } private static String getExpectedValues(List> messages, Schema schema, boolean isKeyIncluded) @@ -365,25 +358,25 @@ private void assertNotExists(String tableName) private void waitUntilTableExists(String tableName) { Failsafe.with( - RetryPolicy.builder() - .withMaxAttempts(10) - .withDelay(Duration.ofMillis(100)) - .build()) + RetryPolicy.builder() + .withMaxAttempts(10) + .withDelay(Duration.ofMillis(100)) + .build()) .run(() -> assertThat(schemaExists()).isTrue()); Failsafe.with( - RetryPolicy.builder() - .withMaxAttempts(10) - .withDelay(Duration.ofMillis(100)) - .build()) + RetryPolicy.builder() + .withMaxAttempts(10) + .withDelay(Duration.ofMillis(100)) + .build()) .run(() -> assertThat(tableExists(tableName)).isTrue()); } private boolean schemaExists() { return getQueryRunner().execute(format( - "SHOW SCHEMAS FROM %s LIKE '%s'", - getSession().getCatalog().orElseThrow(), - getSession().getSchema().orElseThrow())) + "SHOW SCHEMAS FROM %s LIKE '%s'", + getSession().getCatalog().orElseThrow(), + getSession().getSchema().orElseThrow())) .getRowCount() == 1; } @@ -441,30 +434,11 @@ private static GenericRecord createRecordWithEvolvedSchema(long key) .build(); } - private static class JsonValue + private record JsonValue(int id, String value) { - private final int id; - private final String value; - - @JsonCreator - public JsonValue( - @JsonProperty("id") int id, - @JsonProperty("value") String value) - { - this.id = id; - this.value = requireNonNull(value, "value is null"); - } - - @JsonProperty("id") - public int getId() - { - return id; - } - - @JsonProperty("value") - public String getValue() + private JsonValue { - return value; + requireNonNull(value, "value is null"); } } }