diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java index ad5039e35a78..c5bb6bbc812f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java @@ -57,7 +57,7 @@ public Connector create(String catalogName, Map config, Connecto new CatalogNameModule(catalogName), new JsonModule(), new TypeDeserializerModule(context.getTypeManager()), - new KafkaConnectorModule(), + new KafkaConnectorModule(context.getTypeManager()), extension, binder -> { binder.bind(ClassLoader.class).toInstance(KafkaConnectorFactory.class.getClassLoader()); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java index 680b5716c703..57961b02c91f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java @@ -31,15 +31,24 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.type.TypeManager; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static java.util.Objects.requireNonNull; public class KafkaConnectorModule extends AbstractConfigurationAwareModule { + private final TypeManager typeManager; + + public KafkaConnectorModule(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + @Override public void setup(Binder binder) { @@ -58,7 +67,7 @@ public void setup(Binder binder) configBinder(binder).bindConfig(KafkaConfig.class); bindTopicSchemaProviderModule(FileTableDescriptionSupplier.NAME, new FileTableDescriptionSupplierModule()); - bindTopicSchemaProviderModule(ConfluentSchemaRegistryTableDescriptionSupplier.NAME, new ConfluentModule()); + bindTopicSchemaProviderModule(ConfluentSchemaRegistryTableDescriptionSupplier.NAME, new ConfluentModule(typeManager)); newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(KafkaSessionProperties.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class); } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java index fd0ab3310623..f4eebf4695c7 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java @@ -75,7 +75,7 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars Optional.empty(), Optional.of(subject), Streams.concat(getFields(protobufSchema.toDescriptor()), - getOneofs(protobufSchema)) + getOneofs(protobufSchema.toDescriptor())) .collect(toImmutableList())); } @@ -101,9 +101,9 @@ private Stream getFields(Descriptor descriptor) false)); } - private Stream getOneofs(ProtobufSchema protobufSchema) + private Stream getOneofs(Descriptor descriptor) { - return protobufSchema.toDescriptor() + return descriptor .getOneofs() .stream() .map(oneofDescriptor -> diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index a927bfb2b11e..7eb1a48e3fb4 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.Scopes; @@ -52,6 +53,7 @@ import io.trino.plugin.kafka.schema.TableDescriptionSupplier; import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; +import io.trino.spi.type.TypeManager; import java.util.List; import java.util.Map; @@ -74,9 +76,18 @@ public class ConfluentModule extends AbstractConfigurationAwareModule { + private final TypeManager typeManager; + + public ConfluentModule(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + @Override protected void setup(Binder binder) { + binder.bind(TypeManager.class).toInstance(typeManager); + configBinder(binder).bindConfig(ConfluentSchemaRegistryConfig.class); install(new ConfluentDecoderModule()); install(new ConfluentEncoderModule()); @@ -89,7 +100,7 @@ protected void setup(Binder binder) newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(ConfluentSessionProperties.class).in(Scopes.SINGLETON); binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON); - newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(ProtobufSchemaParser.class).in(Scopes.SINGLETON); + newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON); } @Provides @@ -157,7 +168,7 @@ private static class LazyLoadedProtobufSchemaProvider implements SchemaProvider { // Make JVM to load lazily ProtobufSchemaProvider, so Kafka connector can be used - // with protobuf dependency for non protobuf based topics + // without protobuf dependency for non protobuf based topics private final Supplier delegate = Suppliers.memoize(this::create); private final AtomicReference> configuration = new AtomicReference<>(); @@ -195,4 +206,24 @@ private SchemaProvider create() return schemaProvider; } } + + public static class LazyLoadedProtobufSchemaParser + extends ForwardingSchemaParser + { + // Make JVM to load lazily ProtobufSchemaParser, so Kafka connector can be used + // without protobuf dependency for non protobuf based topics + private final Supplier delegate; + + @Inject + public LazyLoadedProtobufSchemaParser(TypeManager typeManager) + { + this.delegate = Suppliers.memoize(() -> new ProtobufSchemaParser(requireNonNull(typeManager, "typeManager is null"))); + } + + @Override + protected SchemaParser delegate() + { + return delegate.get(); + } + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java new file mode 100644 index 000000000000..e297cf496891 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.trino.plugin.kafka.KafkaTopicFieldGroup; +import io.trino.spi.connector.ConnectorSession; + +public abstract class ForwardingSchemaParser + implements SchemaParser +{ + protected abstract SchemaParser delegate(); + + @Override + public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, ParsedSchema parsedSchema) + { + return delegate().parse(session, subject, parsedSchema); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java new file mode 100644 index 000000000000..806f81848726 --- /dev/null +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import org.testng.annotations.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; + +public class TestForwardingSchemaParser +{ + @Test + public void testAllMethodsOverridden() + { + assertAllMethodsOverridden(SchemaParser.class, ForwardingSchemaParser.class); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java index 7acdbdb6156b..28a03cc726b3 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java @@ -23,16 +23,13 @@ import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; import org.testcontainers.utility.MountableFile; -import java.io.File; import java.time.Duration; -import static io.trino.testing.TestingProperties.getConfluentVersion; import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; import static io.trino.tests.product.launcher.env.EnvironmentContainers.isTrinoContainer; import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; import static java.util.Objects.requireNonNull; import static org.testcontainers.containers.wait.strategy.Wait.forLogMessage; -import static org.testcontainers.utility.MountableFile.forClasspathResource; import static org.testcontainers.utility.MountableFile.forHostPath; public class Kafka @@ -40,8 +37,6 @@ public class Kafka { private static final String CONFLUENT_VERSION = "7.3.1"; private static final int SCHEMA_REGISTRY_PORT = 8081; - private static final File KAFKA_PROTOBUF_PROVIDER = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-provider-" + getConfluentVersion() + ".jar"); - private static final File KAFKA_PROTOBUF_TYPES = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-types-" + getConfluentVersion() + ".jar"); static final String KAFKA = "kafka"; static final String SCHEMA_REGISTRY = "schema-registry"; static final String ZOOKEEPER = "zookeeper"; @@ -69,10 +64,7 @@ public void extendEnvironment(Environment.Builder builder) if (isTrinoContainer(container.getLogicalName())) { MountableFile logConfigFile = forHostPath(configDir.getPath("log.properties")); container - .withCopyFileToContainer(logConfigFile, CONTAINER_TRINO_ETC + "/log.properties") - .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_PROVIDER.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-provider.jar") - .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_TYPES.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-types.jar") - .withCopyFileToContainer(forClasspathResource("install-kafka-protobuf-provider.sh", 0755), "/docker/presto-init.d/install-kafka-protobuf-provider.sh"); + .withCopyFileToContainer(logConfigFile, CONTAINER_TRINO_ETC + "/log.properties"); } }); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java new file mode 100644 index 000000000000..a999c26ca3d0 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Kafka; +import io.trino.tests.product.launcher.env.common.StandardMultinode; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import java.io.File; + +import static io.trino.testing.TestingProperties.getConfluentVersion; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.isTrinoContainer; +import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forClasspathResource; +import static org.testcontainers.utility.MountableFile.forHostPath; + +/** + * {@link EnvMultinodeConfluentKafka} is intended to be the only Kafka product test environment which copies the non-free Confluent License libraries to the Kafka connector + * classpath to test functionality which requires those classes. + * The other {@link Kafka} environments MUST NOT copy these jars otherwise it's not possible to verify the out of box Trino setup which doesn't ship with the Confluent licensed + * libraries. + */ +@TestsEnvironment +public final class EnvMultinodeConfluentKafka + extends EnvironmentProvider +{ + private static final File KAFKA_PROTOBUF_PROVIDER = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-provider-" + getConfluentVersion() + ".jar"); + private static final File KAFKA_PROTOBUF_TYPES = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-types-" + getConfluentVersion() + ".jar"); + + private final ResourceProvider configDir; + + @Inject + public EnvMultinodeConfluentKafka(Kafka kafka, StandardMultinode standardMultinode, DockerFiles dockerFiles) + { + super(ImmutableList.of(standardMultinode, kafka)); + requireNonNull(dockerFiles, "dockerFiles is null"); + configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-kafka-confluent-license/"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainers(container -> { + if (isTrinoContainer(container.getLogicalName())) { + builder.addConnector("kafka", forHostPath(configDir.getPath("kafka.properties")), CONTAINER_TRINO_ETC + "/catalog/kafka.properties"); + builder.addConnector("kafka", forHostPath(configDir.getPath("kafka_schema_registry.properties")), CONTAINER_TRINO_ETC + "/catalog/kafka_schema_registry.properties"); + container + .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_PROVIDER.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-provider.jar") + .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_TYPES.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-types.jar") + .withCopyFileToContainer(forClasspathResource("install-kafka-protobuf-provider.sh", 0755), "/docker/presto-init.d/install-kafka-protobuf-provider.sh"); + } + }); + + configureTempto(builder, configDir); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java index bd4acb69d7aa..f541f510ac08 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeConfluentKafka; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafka; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafkaSaslPlaintext; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafkaSsl; @@ -35,6 +36,10 @@ public List getTestRuns(EnvironmentConfig config) testOnEnvironment(EnvMultinodeKafka.class) .withGroups("configured_features", "kafka") .build(), + testOnEnvironment(EnvMultinodeConfluentKafka.class) + // testing kafka group with this env is slightly redundant but helps verify that copying confluent libraries doesn't break non-confluent functionality + .withGroups("configured_features", "kafka", "kafka_confluent_license") + .build(), testOnEnvironment(EnvMultinodeKafkaSsl.class) .withGroups("configured_features", "kafka") .build(), diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties new file mode 100644 index 000000000000..bd6631b76e9c --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties @@ -0,0 +1,23 @@ +connector.name=kafka +kafka.table-names=product_tests.read_simple_key_and_value,\ + product_tests.read_all_datatypes_raw,\ + product_tests.read_all_datatypes_csv,\ + product_tests.read_all_datatypes_json,\ + product_tests.read_all_datatypes_avro,\ + product_tests.read_all_null_avro,\ + product_tests.read_structural_datatype_avro,\ + product_tests.write_simple_key_and_value,\ + product_tests.write_all_datatypes_raw,\ + product_tests.write_all_datatypes_csv,\ + product_tests.write_all_datatypes_json,\ + product_tests.write_all_datatypes_avro,\ + product_tests.write_structural_datatype_avro,\ + product_tests.pushdown_partition,\ + product_tests.pushdown_offset,\ + product_tests.pushdown_create_time,\ + product_tests.all_datatypes_protobuf,\ + product_tests.structural_datatype_protobuf,\ + product_tests.read_basic_datatypes_protobuf,\ + product_tests.read_basic_structural_datatypes_protobuf +kafka.nodes=kafka:9092 +kafka.table-description-dir=/docker/presto-product-tests/conf/presto/etc/catalog/kafka diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties new file mode 100644 index 000000000000..2fe25b5c653b --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties @@ -0,0 +1,5 @@ +connector.name=kafka +kafka.nodes=kafka:9092 +kafka.table-description-supplier=confluent +kafka.confluent-schema-registry-url=http://schema-registry:8081 +kafka.default-schema=product_tests diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml new file mode 100644 index 000000000000..8a101f06e8a1 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml @@ -0,0 +1,2 @@ +schema-registry: + url: http://schema-registry:8081 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 524576ce1773..6a469d607bb7 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -69,6 +69,7 @@ public final class TestGroups public static final String CANCEL_QUERY = "cancel_query"; public static final String LARGE_QUERY = "large_query"; public static final String KAFKA = "kafka"; + public static final String KAFKA_CONFLUENT_LICENSE = "kafka_confluent_license"; public static final String TWO_HIVES = "two_hives"; public static final String ICEBERG = "iceberg"; public static final String ICEBERG_FORMAT_VERSION_COMPATIBILITY = "iceberg_format_version_compatibility"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java index b5c8b5016a6b..c3680b9db5f2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java @@ -79,7 +79,7 @@ public class TestKafkaAvroReadsSmokeTest private static final String AVRO_SCHEMA_WITH_REFERENCES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/schema_with_references.avsc"; @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( @@ -87,12 +87,12 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial "a_bigint", 127L, "a_double", 234.567, "a_boolean", true); - String topicName = ALL_DATATYPES_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, record, messageSerializer); + String topicName = ALL_DATATYPES_AVRO_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.getCatalogName(), KAFKA_SCHEMA + "." + topicName)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row( "foobar", 127, @@ -102,15 +102,15 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial } @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testNullType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testNullType(KafkaCatalog kafkaCatalog) throws Exception { - String topicName = ALL_NULL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, ImmutableMap.of(), messageSerializer); + String topicName = ALL_NULL_AVRO_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, ImmutableMap.of(), kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.getCatalogName(), KAFKA_SCHEMA + "." + topicName)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row( null, null, @@ -120,22 +120,22 @@ public void testNullType(KafkaCatalog kafkaCatalog, MessageSerializer messageSer } @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( "a_array", ImmutableList.of(100L, 102L), "a_map", ImmutableMap.of("key1", "value1")); - String topicName = STRUCTURAL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, record, messageSerializer); + String topicName = STRUCTURAL_AVRO_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { QueryResult queryResult = onTrino().executeQuery(format( "SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", - kafkaCatalog.isColumnMappingSupported() ? "c_array" : "a_array", - kafkaCatalog.isColumnMappingSupported() ? "c_map" : "a_map", - kafkaCatalog.getCatalogName(), + kafkaCatalog.columnMappingSupported() ? "c_array" : "a_array", + kafkaCatalog.columnMappingSupported() ? "c_map" : "a_map", + kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row(100, 102, "value1")); }); @@ -146,40 +146,22 @@ public static Object[][] catalogs() { return new Object[][] { { - new KafkaCatalog("kafka", "", true), new AvroMessageSerializer(), + new KafkaCatalog("kafka", "", true, new AvroMessageSerializer()), }, { - new KafkaCatalog("kafka_schema_registry", "_schema_registry", false), new SchemaRegistryAvroMessageSerializer(), + new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryAvroMessageSerializer()), }, }; } - private static final class KafkaCatalog + private record KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { - private final String catalogName; - private final String topicNameSuffix; - private final boolean columnMappingSupported; - - private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported) + private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.topicNameSuffix = requireNonNull(topicNameSuffix, "topicNameSuffix is null"); this.columnMappingSupported = columnMappingSupported; - } - - public String getCatalogName() - { - return catalogName; - } - - public String getTopicNameSuffix() - { - return topicNameSuffix; - } - - public boolean isColumnMappingSupported() - { - return columnMappingSupported; + this.messageSerializer = requireNonNull(messageSerializer, "messageSerializer is null"); } @Override diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReads.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java similarity index 82% rename from testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReads.java rename to testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java index f46a27ee0db4..e35efa03f963 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReads.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java @@ -30,7 +30,6 @@ import io.trino.tempto.fulfillment.table.kafka.KafkaTableManager; import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource; import io.trino.tempto.query.QueryResult; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.ByteArrayOutputStream; @@ -47,6 +46,7 @@ import static io.trino.tempto.fulfillment.table.TableHandle.tableHandle; import static io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder.contentsBuilder; import static io.trino.tests.product.TestGroups.KAFKA; +import static io.trino.tests.product.TestGroups.KAFKA_CONFLUENT_LICENSE; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.utils.QueryAssertions.assertEventually; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -57,7 +57,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; @Test(singleThreaded = true) -public class TestKafkaProtobufReads +public class TestKafkaProtobufReadsSmokeTest extends ProductTest { private static final String KAFKA_SCHEMA = "product_tests"; @@ -71,8 +71,24 @@ public class TestKafkaProtobufReads private static final String ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY = "all_datatypes_protobuf_schema_registry"; private static final String ALL_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes.proto"; - @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + private static final KafkaCatalog KAFKA_CATALOG = new KafkaCatalog("kafka", "", true, new ProtobufMessageSerializer()); + private static final KafkaCatalog KAFKA_SCHEMA_REGISTRY_CATALOG = new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryProtobufMessageSerializer()); + + @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}) + public void testSelectPrimitiveDataType() + throws Exception + { + selectPrimitiveDataType(KAFKA_CATALOG); + } + + @Test(groups = {KAFKA_CONFLUENT_LICENSE, PROFILE_SPECIFIC_TESTS}) + public void testSelectPrimitiveDataTypeWithSchemaRegistry() + throws Exception + { + selectPrimitiveDataType(KAFKA_SCHEMA_REGISTRY_CATALOG); + } + + private void selectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception { Map record = ImmutableMap.builder() @@ -83,13 +99,13 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial .put("e_float", 3.14f) .put("f_boolean", true) .buildOrThrow(); - String topicName = BASIC_DATATYPES_PROTOBUF_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, record, messageSerializer); + String topicName = BASIC_DATATYPES_PROTOBUF_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.getCatalogName(), KAFKA_SCHEMA + "." + topicName)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row( "foobar", 314, @@ -100,8 +116,21 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial }); } - @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}) + public void testSelectStructuralDataType() + throws Exception + { + selectStructuralDataType(KAFKA_CATALOG); + } + + @Test(groups = {KAFKA_CONFLUENT_LICENSE, PROFILE_SPECIFIC_TESTS}) + public void testSelectStructuralDataTypeWithSchemaRegistry() + throws Exception + { + selectStructuralDataType(KAFKA_SCHEMA_REGISTRY_CATALOG); + } + + private void selectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( @@ -109,60 +138,29 @@ public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSeria "a_map", ImmutableMap.of( "key", "key1", "value", 1234567890.123456789)); - String topicName = BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, record, messageSerializer); + String topicName = BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { QueryResult queryResult = onTrino().executeQuery(format( "SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", - kafkaCatalog.isColumnMappingSupported() ? "c_array" : "a_array", - kafkaCatalog.isColumnMappingSupported() ? "c_map" : "a_map", - kafkaCatalog.getCatalogName(), + kafkaCatalog.columnMappingSupported() ? "c_array" : "a_array", + kafkaCatalog.columnMappingSupported() ? "c_map" : "a_map", + kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row(100L, 101L, 1234567890.123456789)); }); } - @DataProvider - public static Object[][] catalogs() - { - return new Object[][] { - { - new KafkaCatalog("kafka", "", true), new ProtobufMessageSerializer(), - }, - { - new KafkaCatalog("kafka_schema_registry", "_schema_registry", false), new SchemaRegistryProtobufMessageSerializer(), - }, - }; - } - - private static final class KafkaCatalog + private record KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { - private final String catalogName; - private final String topicNameSuffix; - private final boolean columnMappingSupported; - - private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported) + private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.topicNameSuffix = requireNonNull(topicNameSuffix, "topicNameSuffix is null"); this.columnMappingSupported = columnMappingSupported; - } - - public String getCatalogName() - { - return catalogName; - } - - public String getTopicNameSuffix() - { - return topicNameSuffix; - } - - public boolean isColumnMappingSupported() - { - return columnMappingSupported; + this.messageSerializer = requireNonNull(messageSerializer, "messageSerializer is null"); } @Override @@ -172,14 +170,14 @@ public String toString() } } - @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {KAFKA_CONFLUENT_LICENSE, PROFILE_SPECIFIC_TESTS}) public void testProtobufWithSchemaReferences() throws Exception { String timestampTopic = "timestamp"; String timestampProtoFile = "google/protobuf/timestamp.proto"; ProtobufSchema baseSchema = new ProtobufSchema( - Resources.toString(Resources.getResource(TestKafkaProtobufReads.class, "/" + timestampProtoFile), UTF_8), + Resources.toString(Resources.getResource(TestKafkaProtobufReadsSmokeTest.class, "/" + timestampProtoFile), UTF_8), ImmutableList.of(), ImmutableMap.of(), null, @@ -216,7 +214,7 @@ public void testProtobufWithSchemaReferences() assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from kafka_schema_registry.%s.%s", KAFKA_SCHEMA, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s.%s", KAFKA_SCHEMA_REGISTRY_CATALOG.catalogName(), KAFKA_SCHEMA, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY)); assertThat(queryResult).containsOnly(row( "foobar", 2, diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobuf.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufWritesSmokeTest.java similarity index 99% rename from testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobuf.java rename to testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufWritesSmokeTest.java index 7d4720f166af..77f66dc7fb16 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobuf.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufWritesSmokeTest.java @@ -34,7 +34,7 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; -public class TestKafkaProtobuf +public class TestKafkaProtobufWritesSmokeTest extends ProductTest { private static final String KAFKA_CATALOG = "kafka";