From c25887a9cedfc5eb2e09cef6e0430358446955c7 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Mon, 12 Jun 2023 12:02:01 +0530 Subject: [PATCH 1/7] Remove ProtobufSchema from method signatures Guice calls `Class#getDeclaredMethods` to discover the methods which require injection of an argument. This makes all the classes present in the method signature to get loaded. In 02cc332319a00e8fec5096921f96cb9e0aceacc1 a new method was added whose signature includes `ProtobufSchema` which is present in a Confluent licensed library which Trino doesn't ship with. Since that commit the Kafka connector startup fails with the following error message regardless of whether or not Protobuf functionality is being used: 2023-04-30T05:33:54.710Z ERROR main io.trino.server.Server io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema at java.base/java.lang.Class.getDeclaredMethods0(Native Method) at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3402) at java.base/java.lang.Class.getDeclaredMethods(Class.java:2504) at com.google.inject.internal.DeclaredMembers.getDeclaredMethods(DeclaredMembers.java:48) at com.google.inject.spi.InjectionPoint.getDeclaredMethods(InjectionPoint.java:811) at com.google.inject.spi.InjectionPoint.getInjectionPoints(InjectionPoint.java:730) at com.google.inject.spi.InjectionPoint.forInstanceMethodsAndFields(InjectionPoint.java:430) at com.google.inject.internal.ConstructorBindingImpl.getInternalDependencies(ConstructorBindingImpl.java:177) at com.google.inject.internal.InjectorImpl.getInternalDependencies(InjectorImpl.java:670) at com.google.inject.internal.InjectorImpl.cleanup(InjectorImpl.java:627) at com.google.inject.internal.InjectorImpl.initializeJitBinding(InjectorImpl.java:613) at com.google.inject.internal.InjectorImpl.createJustInTimeBinding(InjectorImpl.java:943) at com.google.inject.internal.InjectorImpl.createJustInTimeBindingRecursive(InjectorImpl.java:863) at com.google.inject.internal.InjectorImpl.getJustInTimeBinding(InjectorImpl.java:300) at com.google.inject.internal.InjectorImpl.getBindingOrThrow(InjectorImpl.java:223) at com.google.inject.internal.InjectorImpl.getInternalFactory(InjectorImpl.java:949) at com.google.inject.internal.FactoryProxy.notify(FactoryProxy.java:48) at com.google.inject.internal.ProcessedBindingData.runCreationListeners(ProcessedBindingData.java:60) at com.google.inject.internal.InternalInjectorCreator.initializeStatically(InternalInjectorCreator.java:137) at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:110) at com.google.inject.Guice.createInjector(Guice.java:87) at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:268) at io.trino.plugin.kafka.KafkaConnectorFactory.create(KafkaConnectorFactory.java:71) at io.trino.connector.DefaultCatalogFactory.createConnector(DefaultCatalogFactory.java:221) at io.trino.connector.DefaultCatalogFactory.createCatalog(DefaultCatalogFactory.java:130) at io.trino.connector.LazyCatalogFactory.createCatalog(LazyCatalogFactory.java:45) at io.trino.connector.StaticCatalogManager.lambda$loadInitialCatalogs$1(StaticCatalogManager.java:158) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) at java.base/java.util.concurrent.ExecutorCompletionService.submit(ExecutorCompletionService.java:184) at io.trino.util.Executors.executeUntilFailure(Executors.java:41) at io.trino.connector.StaticCatalogManager.loadInitialCatalogs(StaticCatalogManager.java:152) at io.trino.server.Server.doStart(Server.java:144) at io.trino.server.Server.lambda$start$0(Server.java:91) at io.trino.$gen.Trino_415____20230430_053321_1.run(Unknown Source) at io.trino.server.Server.start(Server.java:91) at io.trino.server.TrinoServer.main(TrinoServer.java:38) Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587) at io.trino.server.PluginClassLoader.loadClass(PluginClassLoader.java:128) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ... 39 more This change resolves the issue by changing the method signature to not use a class which isn't always on the classpath. The next commit introduces some safeguards to prevent other occurences of this issue. --- .../plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java index fd0ab3310623..f4eebf4695c7 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java @@ -75,7 +75,7 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars Optional.empty(), Optional.of(subject), Streams.concat(getFields(protobufSchema.toDescriptor()), - getOneofs(protobufSchema)) + getOneofs(protobufSchema.toDescriptor())) .collect(toImmutableList())); } @@ -101,9 +101,9 @@ private Stream getFields(Descriptor descriptor) false)); } - private Stream getOneofs(ProtobufSchema protobufSchema) + private Stream getOneofs(Descriptor descriptor) { - return protobufSchema.toDescriptor() + return descriptor .getOneofs() .stream() .map(oneofDescriptor -> From dae19c531c173be953229d32d0b753a2899acbef Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Mon, 12 Jun 2023 12:03:27 +0530 Subject: [PATCH 2/7] Lazily load Confluent licensed ProtobufSchemaParser This avoids loading the Confluent ProtobufSchemaParser unless the protobuf functionality is actually being used avoiding the need to manually copy Confluent licensed libraries onto the Kafka connector classpath. --- .../plugin/kafka/KafkaConnectorFactory.java | 2 +- .../plugin/kafka/KafkaConnectorModule.java | 11 ++++++- .../schema/confluent/ConfluentModule.java | 33 ++++++++++++++++++- .../confluent/ForwardingSchemaParser.java | 30 +++++++++++++++++ .../confluent/TestForwardingSchemaParser.java | 27 +++++++++++++++ 5 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java create mode 100644 plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java index ad5039e35a78..c5bb6bbc812f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorFactory.java @@ -57,7 +57,7 @@ public Connector create(String catalogName, Map config, Connecto new CatalogNameModule(catalogName), new JsonModule(), new TypeDeserializerModule(context.getTypeManager()), - new KafkaConnectorModule(), + new KafkaConnectorModule(context.getTypeManager()), extension, binder -> { binder.bind(ClassLoader.class).toInstance(KafkaConnectorFactory.class.getClassLoader()); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java index 680b5716c703..57961b02c91f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConnectorModule.java @@ -31,15 +31,24 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.type.TypeManager; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static java.util.Objects.requireNonNull; public class KafkaConnectorModule extends AbstractConfigurationAwareModule { + private final TypeManager typeManager; + + public KafkaConnectorModule(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + @Override public void setup(Binder binder) { @@ -58,7 +67,7 @@ public void setup(Binder binder) configBinder(binder).bindConfig(KafkaConfig.class); bindTopicSchemaProviderModule(FileTableDescriptionSupplier.NAME, new FileTableDescriptionSupplierModule()); - bindTopicSchemaProviderModule(ConfluentSchemaRegistryTableDescriptionSupplier.NAME, new ConfluentModule()); + bindTopicSchemaProviderModule(ConfluentSchemaRegistryTableDescriptionSupplier.NAME, new ConfluentModule(typeManager)); newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(KafkaSessionProperties.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class); } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index a927bfb2b11e..b0866673f252 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.Scopes; @@ -52,6 +53,7 @@ import io.trino.plugin.kafka.schema.TableDescriptionSupplier; import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; +import io.trino.spi.type.TypeManager; import java.util.List; import java.util.Map; @@ -74,9 +76,18 @@ public class ConfluentModule extends AbstractConfigurationAwareModule { + private final TypeManager typeManager; + + public ConfluentModule(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + @Override protected void setup(Binder binder) { + binder.bind(TypeManager.class).toInstance(typeManager); + configBinder(binder).bindConfig(ConfluentSchemaRegistryConfig.class); install(new ConfluentDecoderModule()); install(new ConfluentEncoderModule()); @@ -89,7 +100,7 @@ protected void setup(Binder binder) newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(ConfluentSessionProperties.class).in(Scopes.SINGLETON); binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON); - newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(ProtobufSchemaParser.class).in(Scopes.SINGLETON); + newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON); } @Provides @@ -195,4 +206,24 @@ private SchemaProvider create() return schemaProvider; } } + + public static class LazyLoadedProtobufSchemaParser + extends ForwardingSchemaParser + { + // Make JVM to load lazily ProtobufSchemaParser, so Kafka connector can be used + // without protobuf dependency for non protobuf based topics + private final Supplier delegate; + + @Inject + public LazyLoadedProtobufSchemaParser(TypeManager typeManager) + { + this.delegate = Suppliers.memoize(() -> new ProtobufSchemaParser(requireNonNull(typeManager, "typeManager is null"))); + } + + @Override + protected SchemaParser delegate() + { + return delegate.get(); + } + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java new file mode 100644 index 000000000000..e297cf496891 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ForwardingSchemaParser.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.trino.plugin.kafka.KafkaTopicFieldGroup; +import io.trino.spi.connector.ConnectorSession; + +public abstract class ForwardingSchemaParser + implements SchemaParser +{ + protected abstract SchemaParser delegate(); + + @Override + public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, ParsedSchema parsedSchema) + { + return delegate().parse(session, subject, parsedSchema); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java new file mode 100644 index 000000000000..806f81848726 --- /dev/null +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestForwardingSchemaParser.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import org.testng.annotations.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; + +public class TestForwardingSchemaParser +{ + @Test + public void testAllMethodsOverridden() + { + assertAllMethodsOverridden(SchemaParser.class, ForwardingSchemaParser.class); + } +} From 0373446ff83e359cba1f722806e3962ecc3abf8a Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Tue, 13 Jun 2023 11:45:42 +0530 Subject: [PATCH 3/7] Fix comment in ConfluentModule --- .../io/trino/plugin/kafka/schema/confluent/ConfluentModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index b0866673f252..7eb1a48e3fb4 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -168,7 +168,7 @@ private static class LazyLoadedProtobufSchemaProvider implements SchemaProvider { // Make JVM to load lazily ProtobufSchemaProvider, so Kafka connector can be used - // with protobuf dependency for non protobuf based topics + // without protobuf dependency for non protobuf based topics private final Supplier delegate = Suppliers.memoize(this::create); private final AtomicReference> configuration = new AtomicReference<>(); From b36d60c03ff34f8470905ed39d46b816fa9cf9a0 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Mon, 12 Jun 2023 13:59:02 +0530 Subject: [PATCH 4/7] Rename Kafka Protobuf test classes to match Avro test classes --- ...rotobufReads.java => TestKafkaProtobufReadsSmokeTest.java} | 4 ++-- ...fkaProtobuf.java => TestKafkaProtobufWritesSmokeTest.java} | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/{TestKafkaProtobufReads.java => TestKafkaProtobufReadsSmokeTest.java} (99%) rename testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/{TestKafkaProtobuf.java => TestKafkaProtobufWritesSmokeTest.java} (99%) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReads.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java similarity index 99% rename from testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReads.java rename to testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java index f46a27ee0db4..5db694e39a01 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReads.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java @@ -57,7 +57,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; @Test(singleThreaded = true) -public class TestKafkaProtobufReads +public class TestKafkaProtobufReadsSmokeTest extends ProductTest { private static final String KAFKA_SCHEMA = "product_tests"; @@ -179,7 +179,7 @@ public void testProtobufWithSchemaReferences() String timestampTopic = "timestamp"; String timestampProtoFile = "google/protobuf/timestamp.proto"; ProtobufSchema baseSchema = new ProtobufSchema( - Resources.toString(Resources.getResource(TestKafkaProtobufReads.class, "/" + timestampProtoFile), UTF_8), + Resources.toString(Resources.getResource(TestKafkaProtobufReadsSmokeTest.class, "/" + timestampProtoFile), UTF_8), ImmutableList.of(), ImmutableMap.of(), null, diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobuf.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufWritesSmokeTest.java similarity index 99% rename from testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobuf.java rename to testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufWritesSmokeTest.java index 7d4720f166af..77f66dc7fb16 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobuf.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufWritesSmokeTest.java @@ -34,7 +34,7 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; -public class TestKafkaProtobuf +public class TestKafkaProtobufWritesSmokeTest extends ProductTest { private static final String KAFKA_CATALOG = "kafka"; From 417fa0b675373bb3c6cad18f42a35bd1cc6a1908 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Mon, 12 Jun 2023 17:35:42 +0530 Subject: [PATCH 5/7] Refactor KafkaCatalog in Kafka product tests The message serializer being used is strictly coupled to the catalog being used so the serializer also belongs in the KafkaCatalog. This is useful for an upcoming commit to be able to simplify the usages of DataProvider in some product tests. --- .../kafka/TestKafkaAvroReadsSmokeTest.java | 25 ++++++++++++------- .../TestKafkaProtobufReadsSmokeTest.java | 21 ++++++++++------ 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java index b5c8b5016a6b..1294ce4798e0 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java @@ -79,7 +79,7 @@ public class TestKafkaAvroReadsSmokeTest private static final String AVRO_SCHEMA_WITH_REFERENCES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/schema_with_references.avsc"; @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( @@ -88,7 +88,7 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial "a_double", 234.567, "a_boolean", true); String topicName = ALL_DATATYPES_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, record, messageSerializer); + createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { @@ -102,11 +102,11 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial } @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testNullType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testNullType(KafkaCatalog kafkaCatalog) throws Exception { String topicName = ALL_NULL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, ImmutableMap.of(), messageSerializer); + createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, ImmutableMap.of(), kafkaCatalog.getMessageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { @@ -120,14 +120,14 @@ public void testNullType(KafkaCatalog kafkaCatalog, MessageSerializer messageSer } @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( "a_array", ImmutableList.of(100L, 102L), "a_map", ImmutableMap.of("key1", "value1")); String topicName = STRUCTURAL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, record, messageSerializer); + createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { @@ -146,10 +146,10 @@ public static Object[][] catalogs() { return new Object[][] { { - new KafkaCatalog("kafka", "", true), new AvroMessageSerializer(), + new KafkaCatalog("kafka", "", true, new AvroMessageSerializer()), }, { - new KafkaCatalog("kafka_schema_registry", "_schema_registry", false), new SchemaRegistryAvroMessageSerializer(), + new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryAvroMessageSerializer()), }, }; } @@ -159,12 +159,14 @@ private static final class KafkaCatalog private final String catalogName; private final String topicNameSuffix; private final boolean columnMappingSupported; + private final MessageSerializer messageSerializer; - private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported) + private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.topicNameSuffix = requireNonNull(topicNameSuffix, "topicNameSuffix is null"); this.columnMappingSupported = columnMappingSupported; + this.messageSerializer = requireNonNull(messageSerializer, "messageSerializer is null"); } public String getCatalogName() @@ -182,6 +184,11 @@ public boolean isColumnMappingSupported() return columnMappingSupported; } + public MessageSerializer getMessageSerializer() + { + return messageSerializer; + } + @Override public String toString() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java index 5db694e39a01..2f8311e65e39 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java @@ -72,7 +72,7 @@ public class TestKafkaProtobufReadsSmokeTest private static final String ALL_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes.proto"; @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception { Map record = ImmutableMap.builder() @@ -84,7 +84,7 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial .put("f_boolean", true) .buildOrThrow(); String topicName = BASIC_DATATYPES_PROTOBUF_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, record, messageSerializer); + createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); assertEventually( new Duration(30, SECONDS), @@ -101,7 +101,7 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerial } @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) + public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( @@ -110,7 +110,7 @@ public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSeria "key", "key1", "value", 1234567890.123456789)); String topicName = BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, record, messageSerializer); + createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { @@ -129,10 +129,10 @@ public static Object[][] catalogs() { return new Object[][] { { - new KafkaCatalog("kafka", "", true), new ProtobufMessageSerializer(), + new KafkaCatalog("kafka", "", true, new ProtobufMessageSerializer()), }, { - new KafkaCatalog("kafka_schema_registry", "_schema_registry", false), new SchemaRegistryProtobufMessageSerializer(), + new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryProtobufMessageSerializer()), }, }; } @@ -142,12 +142,14 @@ private static final class KafkaCatalog private final String catalogName; private final String topicNameSuffix; private final boolean columnMappingSupported; + private final MessageSerializer messageSerializer; - private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported) + private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.topicNameSuffix = requireNonNull(topicNameSuffix, "topicNameSuffix is null"); this.columnMappingSupported = columnMappingSupported; + this.messageSerializer = requireNonNull(messageSerializer, "messageSerializer is null"); } public String getCatalogName() @@ -165,6 +167,11 @@ public boolean isColumnMappingSupported() return columnMappingSupported; } + public MessageSerializer getMessageSerializer() + { + return messageSerializer; + } + @Override public String toString() { From d952b2cff1a63f73b41a8b42be72374aa29ccbd1 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Mon, 12 Jun 2023 17:44:09 +0530 Subject: [PATCH 6/7] Convert KafkaCatalog to record --- .../kafka/TestKafkaAvroReadsSmokeTest.java | 49 +++++-------------- .../TestKafkaProtobufReadsSmokeTest.java | 43 ++++------------ 2 files changed, 21 insertions(+), 71 deletions(-) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java index 1294ce4798e0..c3680b9db5f2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.java @@ -87,12 +87,12 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) "a_bigint", 127L, "a_double", 234.567, "a_boolean", true); - String topicName = ALL_DATATYPES_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); + String topicName = ALL_DATATYPES_AVRO_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.getCatalogName(), KAFKA_SCHEMA + "." + topicName)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row( "foobar", 127, @@ -105,12 +105,12 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) public void testNullType(KafkaCatalog kafkaCatalog) throws Exception { - String topicName = ALL_NULL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, ImmutableMap.of(), kafkaCatalog.getMessageSerializer()); + String topicName = ALL_NULL_AVRO_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, ImmutableMap.of(), kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.getCatalogName(), KAFKA_SCHEMA + "." + topicName)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row( null, null, @@ -126,16 +126,16 @@ public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) ImmutableMap record = ImmutableMap.of( "a_array", ImmutableList.of(100L, 102L), "a_map", ImmutableMap.of("key1", "value1")); - String topicName = STRUCTURAL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); + String topicName = STRUCTURAL_AVRO_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { QueryResult queryResult = onTrino().executeQuery(format( "SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", - kafkaCatalog.isColumnMappingSupported() ? "c_array" : "a_array", - kafkaCatalog.isColumnMappingSupported() ? "c_map" : "a_map", - kafkaCatalog.getCatalogName(), + kafkaCatalog.columnMappingSupported() ? "c_array" : "a_array", + kafkaCatalog.columnMappingSupported() ? "c_map" : "a_map", + kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row(100, 102, "value1")); }); @@ -154,13 +154,8 @@ public static Object[][] catalogs() }; } - private static final class KafkaCatalog + private record KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { - private final String catalogName; - private final String topicNameSuffix; - private final boolean columnMappingSupported; - private final MessageSerializer messageSerializer; - private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); @@ -169,26 +164,6 @@ private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnM this.messageSerializer = requireNonNull(messageSerializer, "messageSerializer is null"); } - public String getCatalogName() - { - return catalogName; - } - - public String getTopicNameSuffix() - { - return topicNameSuffix; - } - - public boolean isColumnMappingSupported() - { - return columnMappingSupported; - } - - public MessageSerializer getMessageSerializer() - { - return messageSerializer; - } - @Override public String toString() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java index 2f8311e65e39..e6bfa15e68ae 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java @@ -83,13 +83,13 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) .put("e_float", 3.14f) .put("f_boolean", true) .buildOrThrow(); - String topicName = BASIC_DATATYPES_PROTOBUF_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); + String topicName = BASIC_DATATYPES_PROTOBUF_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.getCatalogName(), KAFKA_SCHEMA + "." + topicName)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s", kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row( "foobar", 314, @@ -109,16 +109,16 @@ public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) "a_map", ImmutableMap.of( "key", "key1", "value", 1234567890.123456789)); - String topicName = BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix(); - createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.getMessageSerializer()); + String topicName = BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME + kafkaCatalog.topicNameSuffix(); + createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, record, kafkaCatalog.messageSerializer()); assertEventually( new Duration(30, SECONDS), () -> { QueryResult queryResult = onTrino().executeQuery(format( "SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", - kafkaCatalog.isColumnMappingSupported() ? "c_array" : "a_array", - kafkaCatalog.isColumnMappingSupported() ? "c_map" : "a_map", - kafkaCatalog.getCatalogName(), + kafkaCatalog.columnMappingSupported() ? "c_array" : "a_array", + kafkaCatalog.columnMappingSupported() ? "c_map" : "a_map", + kafkaCatalog.catalogName(), KAFKA_SCHEMA + "." + topicName)); assertThat(queryResult).containsOnly(row(100L, 101L, 1234567890.123456789)); }); @@ -137,13 +137,8 @@ public static Object[][] catalogs() }; } - private static final class KafkaCatalog + private record KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { - private final String catalogName; - private final String topicNameSuffix; - private final boolean columnMappingSupported; - private final MessageSerializer messageSerializer; - private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); @@ -152,26 +147,6 @@ private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnM this.messageSerializer = requireNonNull(messageSerializer, "messageSerializer is null"); } - public String getCatalogName() - { - return catalogName; - } - - public String getTopicNameSuffix() - { - return topicNameSuffix; - } - - public boolean isColumnMappingSupported() - { - return columnMappingSupported; - } - - public MessageSerializer getMessageSerializer() - { - return messageSerializer; - } - @Override public String toString() { From 12eee832d78575d4157889cd2d3007f1af9f9b67 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Mon, 12 Jun 2023 23:11:23 +0530 Subject: [PATCH 7/7] Add product tests for Kafka connector with Confluent licensed libraries The older approach was to copy the Confluent licensed libraries to the Kafka connector classpath for all product environments extending from `Kafka`. This meant that it was not possible to detect changes which introduce a hard-dependency on the Confluent licensed libraries for Kafka connector startup. 02cc332319a00e8fec5096921f96cb9e0aceacc1 inadvertently made it so that the Confluent libraries became required for the Kafka connector to start up and was not caught by tests because all the product test environments already had the Confluent libraries present on the Kafka connector classpath. This commit instead introduces a new product test environment `EnvMultinodeKafkaConfluentLicense` which is the only product test environment that copies the Confluent licensed libraries onto the Kafka connector classpath. There is also a new test group `kafka_confluent_license` which now includes all the tests for the functionality which requires the Confluent licensed libraries. --- .../product/launcher/env/common/Kafka.java | 10 +-- .../EnvMultinodeConfluentKafka.java | 76 +++++++++++++++++++ .../launcher/suite/suites/SuiteKafka.java | 5 ++ .../kafka.properties | 23 ++++++ .../kafka_schema_registry.properties | 5 ++ .../tempto-configuration.yaml | 2 + .../io/trino/tests/product/TestGroups.java | 1 + .../TestKafkaProtobufReadsSmokeTest.java | 56 +++++++++----- 8 files changed, 149 insertions(+), 29 deletions(-) create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java index 7acdbdb6156b..28a03cc726b3 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java @@ -23,16 +23,13 @@ import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; import org.testcontainers.utility.MountableFile; -import java.io.File; import java.time.Duration; -import static io.trino.testing.TestingProperties.getConfluentVersion; import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; import static io.trino.tests.product.launcher.env.EnvironmentContainers.isTrinoContainer; import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; import static java.util.Objects.requireNonNull; import static org.testcontainers.containers.wait.strategy.Wait.forLogMessage; -import static org.testcontainers.utility.MountableFile.forClasspathResource; import static org.testcontainers.utility.MountableFile.forHostPath; public class Kafka @@ -40,8 +37,6 @@ public class Kafka { private static final String CONFLUENT_VERSION = "7.3.1"; private static final int SCHEMA_REGISTRY_PORT = 8081; - private static final File KAFKA_PROTOBUF_PROVIDER = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-provider-" + getConfluentVersion() + ".jar"); - private static final File KAFKA_PROTOBUF_TYPES = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-types-" + getConfluentVersion() + ".jar"); static final String KAFKA = "kafka"; static final String SCHEMA_REGISTRY = "schema-registry"; static final String ZOOKEEPER = "zookeeper"; @@ -69,10 +64,7 @@ public void extendEnvironment(Environment.Builder builder) if (isTrinoContainer(container.getLogicalName())) { MountableFile logConfigFile = forHostPath(configDir.getPath("log.properties")); container - .withCopyFileToContainer(logConfigFile, CONTAINER_TRINO_ETC + "/log.properties") - .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_PROVIDER.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-provider.jar") - .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_TYPES.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-types.jar") - .withCopyFileToContainer(forClasspathResource("install-kafka-protobuf-provider.sh", 0755), "/docker/presto-init.d/install-kafka-protobuf-provider.sh"); + .withCopyFileToContainer(logConfigFile, CONTAINER_TRINO_ETC + "/log.properties"); } }); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java new file mode 100644 index 000000000000..a999c26ca3d0 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeConfluentKafka.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Kafka; +import io.trino.tests.product.launcher.env.common.StandardMultinode; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import java.io.File; + +import static io.trino.testing.TestingProperties.getConfluentVersion; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.isTrinoContainer; +import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forClasspathResource; +import static org.testcontainers.utility.MountableFile.forHostPath; + +/** + * {@link EnvMultinodeConfluentKafka} is intended to be the only Kafka product test environment which copies the non-free Confluent License libraries to the Kafka connector + * classpath to test functionality which requires those classes. + * The other {@link Kafka} environments MUST NOT copy these jars otherwise it's not possible to verify the out of box Trino setup which doesn't ship with the Confluent licensed + * libraries. + */ +@TestsEnvironment +public final class EnvMultinodeConfluentKafka + extends EnvironmentProvider +{ + private static final File KAFKA_PROTOBUF_PROVIDER = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-provider-" + getConfluentVersion() + ".jar"); + private static final File KAFKA_PROTOBUF_TYPES = new File("testing/trino-product-tests-launcher/target/kafka-protobuf-types-" + getConfluentVersion() + ".jar"); + + private final ResourceProvider configDir; + + @Inject + public EnvMultinodeConfluentKafka(Kafka kafka, StandardMultinode standardMultinode, DockerFiles dockerFiles) + { + super(ImmutableList.of(standardMultinode, kafka)); + requireNonNull(dockerFiles, "dockerFiles is null"); + configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-kafka-confluent-license/"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainers(container -> { + if (isTrinoContainer(container.getLogicalName())) { + builder.addConnector("kafka", forHostPath(configDir.getPath("kafka.properties")), CONTAINER_TRINO_ETC + "/catalog/kafka.properties"); + builder.addConnector("kafka", forHostPath(configDir.getPath("kafka_schema_registry.properties")), CONTAINER_TRINO_ETC + "/catalog/kafka_schema_registry.properties"); + container + .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_PROVIDER.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-provider.jar") + .withCopyFileToContainer(forHostPath(KAFKA_PROTOBUF_TYPES.getAbsolutePath()), "/docker/kafka-protobuf-provider/kafka-protobuf-types.jar") + .withCopyFileToContainer(forClasspathResource("install-kafka-protobuf-provider.sh", 0755), "/docker/presto-init.d/install-kafka-protobuf-provider.sh"); + } + }); + + configureTempto(builder, configDir); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java index bd4acb69d7aa..f541f510ac08 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteKafka.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeConfluentKafka; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafka; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafkaSaslPlaintext; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKafkaSsl; @@ -35,6 +36,10 @@ public List getTestRuns(EnvironmentConfig config) testOnEnvironment(EnvMultinodeKafka.class) .withGroups("configured_features", "kafka") .build(), + testOnEnvironment(EnvMultinodeConfluentKafka.class) + // testing kafka group with this env is slightly redundant but helps verify that copying confluent libraries doesn't break non-confluent functionality + .withGroups("configured_features", "kafka", "kafka_confluent_license") + .build(), testOnEnvironment(EnvMultinodeKafkaSsl.class) .withGroups("configured_features", "kafka") .build(), diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties new file mode 100644 index 000000000000..bd6631b76e9c --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka.properties @@ -0,0 +1,23 @@ +connector.name=kafka +kafka.table-names=product_tests.read_simple_key_and_value,\ + product_tests.read_all_datatypes_raw,\ + product_tests.read_all_datatypes_csv,\ + product_tests.read_all_datatypes_json,\ + product_tests.read_all_datatypes_avro,\ + product_tests.read_all_null_avro,\ + product_tests.read_structural_datatype_avro,\ + product_tests.write_simple_key_and_value,\ + product_tests.write_all_datatypes_raw,\ + product_tests.write_all_datatypes_csv,\ + product_tests.write_all_datatypes_json,\ + product_tests.write_all_datatypes_avro,\ + product_tests.write_structural_datatype_avro,\ + product_tests.pushdown_partition,\ + product_tests.pushdown_offset,\ + product_tests.pushdown_create_time,\ + product_tests.all_datatypes_protobuf,\ + product_tests.structural_datatype_protobuf,\ + product_tests.read_basic_datatypes_protobuf,\ + product_tests.read_basic_structural_datatypes_protobuf +kafka.nodes=kafka:9092 +kafka.table-description-dir=/docker/presto-product-tests/conf/presto/etc/catalog/kafka diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties new file mode 100644 index 000000000000..2fe25b5c653b --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/kafka_schema_registry.properties @@ -0,0 +1,5 @@ +connector.name=kafka +kafka.nodes=kafka:9092 +kafka.table-description-supplier=confluent +kafka.confluent-schema-registry-url=http://schema-registry:8081 +kafka.default-schema=product_tests diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml new file mode 100644 index 000000000000..8a101f06e8a1 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-kafka-confluent-license/tempto-configuration.yaml @@ -0,0 +1,2 @@ +schema-registry: + url: http://schema-registry:8081 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 524576ce1773..6a469d607bb7 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -69,6 +69,7 @@ public final class TestGroups public static final String CANCEL_QUERY = "cancel_query"; public static final String LARGE_QUERY = "large_query"; public static final String KAFKA = "kafka"; + public static final String KAFKA_CONFLUENT_LICENSE = "kafka_confluent_license"; public static final String TWO_HIVES = "two_hives"; public static final String ICEBERG = "iceberg"; public static final String ICEBERG_FORMAT_VERSION_COMPATIBILITY = "iceberg_format_version_compatibility"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java index e6bfa15e68ae..e35efa03f963 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java @@ -30,7 +30,6 @@ import io.trino.tempto.fulfillment.table.kafka.KafkaTableManager; import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource; import io.trino.tempto.query.QueryResult; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.ByteArrayOutputStream; @@ -47,6 +46,7 @@ import static io.trino.tempto.fulfillment.table.TableHandle.tableHandle; import static io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder.contentsBuilder; import static io.trino.tests.product.TestGroups.KAFKA; +import static io.trino.tests.product.TestGroups.KAFKA_CONFLUENT_LICENSE; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.utils.QueryAssertions.assertEventually; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -71,8 +71,24 @@ public class TestKafkaProtobufReadsSmokeTest private static final String ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY = "all_datatypes_protobuf_schema_registry"; private static final String ALL_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes.proto"; - @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) + private static final KafkaCatalog KAFKA_CATALOG = new KafkaCatalog("kafka", "", true, new ProtobufMessageSerializer()); + private static final KafkaCatalog KAFKA_SCHEMA_REGISTRY_CATALOG = new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryProtobufMessageSerializer()); + + @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}) + public void testSelectPrimitiveDataType() + throws Exception + { + selectPrimitiveDataType(KAFKA_CATALOG); + } + + @Test(groups = {KAFKA_CONFLUENT_LICENSE, PROFILE_SPECIFIC_TESTS}) + public void testSelectPrimitiveDataTypeWithSchemaRegistry() + throws Exception + { + selectPrimitiveDataType(KAFKA_SCHEMA_REGISTRY_CATALOG); + } + + private void selectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception { Map record = ImmutableMap.builder() @@ -100,8 +116,21 @@ public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog) }); } - @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs") - public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) + @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}) + public void testSelectStructuralDataType() + throws Exception + { + selectStructuralDataType(KAFKA_CATALOG); + } + + @Test(groups = {KAFKA_CONFLUENT_LICENSE, PROFILE_SPECIFIC_TESTS}) + public void testSelectStructuralDataTypeWithSchemaRegistry() + throws Exception + { + selectStructuralDataType(KAFKA_SCHEMA_REGISTRY_CATALOG); + } + + private void selectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception { ImmutableMap record = ImmutableMap.of( @@ -124,19 +153,6 @@ public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog) }); } - @DataProvider - public static Object[][] catalogs() - { - return new Object[][] { - { - new KafkaCatalog("kafka", "", true, new ProtobufMessageSerializer()), - }, - { - new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryProtobufMessageSerializer()), - }, - }; - } - private record KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) { private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) @@ -154,7 +170,7 @@ public String toString() } } - @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {KAFKA_CONFLUENT_LICENSE, PROFILE_SPECIFIC_TESTS}) public void testProtobufWithSchemaReferences() throws Exception { @@ -198,7 +214,7 @@ public void testProtobufWithSchemaReferences() assertEventually( new Duration(30, SECONDS), () -> { - QueryResult queryResult = onTrino().executeQuery(format("select * from kafka_schema_registry.%s.%s", KAFKA_SCHEMA, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY)); + QueryResult queryResult = onTrino().executeQuery(format("select * from %s.%s.%s", KAFKA_SCHEMA_REGISTRY_CATALOG.catalogName(), KAFKA_SCHEMA, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY)); assertThat(queryResult).containsOnly(row( "foobar", 2,