From 31ac84ae7ce1673eca4c3b7ec5d0623d712a4c39 Mon Sep 17 00:00:00 2001 From: Maxim Lukyanenko Date: Mon, 5 Dec 2022 14:18:51 +0200 Subject: [PATCH 1/5] Test Kafka connector with recursive protobuf schemas Kafka protobuf schema cannot be translated to a Trino type when the protobuf schema uses recursive reference to some types. Co-authored-by: Maxim Lukyanenko Co-authored-by: Ashhar Hasan --- plugin/trino-kafka/pom.xml | 40 +++++++++++++++++++ ...ithSchemaRegistryMinimalFunctionality.java | 26 ++++++++++++ .../unsupported_recursive.proto | 24 +++++++++++ 3 files changed, 90 insertions(+) create mode 100644 plugin/trino-kafka/src/test/resources/protobuf-sources/unsupported_recursive.proto diff --git a/plugin/trino-kafka/pom.xml b/plugin/trino-kafka/pom.xml index 6522d028607f..c2b0367e4c39 100644 --- a/plugin/trino-kafka/pom.xml +++ b/plugin/trino-kafka/pom.xml @@ -339,6 +339,46 @@ + + + com.github.os72 + protoc-jar-maven-plugin + + + generate-test-sources + generate-test-sources + + run + + + ${dep.protobuf.version} + none + + src/test/resources/protobuf-sources + + target/generated-test-sources/ + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-test-sources + generate-test-sources + + add-test-source + + + + ${basedir}/target/generated-test-sources + + + + + diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index 02d6815122ca..432080a27def 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -36,6 +36,7 @@ import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.ENUM; import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.STRING; +import static io.airlift.units.Duration.succinctDuration; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY; import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE; @@ -45,6 +46,7 @@ import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.testng.Assert.assertTrue; @@ -140,6 +142,30 @@ public void testBasicTopicForInsert() "Insert is not supported for schema registry based tables"); } + @Test + public void testUnsupportedRecursiveDataTypes() + throws Exception + { + String topic = "topic-unsupported-recursive"; + assertNotExists(topic); + + UnsupportedRecursiveTypes.schema message = UnsupportedRecursiveTypes.schema.newBuilder() + .setRecursiveValueOne(UnsupportedRecursiveTypes.RecursiveValue.newBuilder().setStringValue("Value1").build()) + .build(); + + ImmutableList.Builder> producerRecordBuilder = ImmutableList.builder(); + producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message)); + List> messages = producerRecordBuilder.build(); + testingKafka.sendMessages( + messages.stream(), + producerProperties()); + + // Any operation which performs schema parsing leads to failure so we cannot use #waitUntilTableExists + assertQueryFailsEventually("SELECT * FROM " + toDoubleQuoted(topic), + "\\Qstatement is too large (stack overflow during analysis)\\E", + succinctDuration(2, SECONDS)); + } + private Map producerProperties() { return ImmutableMap.of( diff --git a/plugin/trino-kafka/src/test/resources/protobuf-sources/unsupported_recursive.proto b/plugin/trino-kafka/src/test/resources/protobuf-sources/unsupported_recursive.proto new file mode 100644 index 000000000000..f1bae41ccbe1 --- /dev/null +++ b/plugin/trino-kafka/src/test/resources/protobuf-sources/unsupported_recursive.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package io.trino.protobuf; + +option java_package = "io.trino.plugin.kafka.protobuf"; +option java_outer_classname = "UnsupportedRecursiveTypes"; + +message schema { + RecursiveValue recursive_value_one = 1; +} + +message RecursiveStruct { + map fields = 1; +} + +message RecursiveValue { + string string_value = 1; + RecursiveStruct struct_value = 2; + RecursiveListValue list_value = 3; +} + +message RecursiveListValue { + repeated RecursiveValue values = 1; +} From 311010473ad12b648bfa0b9d5e425babdfca315c Mon Sep 17 00:00:00 2001 From: Maxim Lukyanenko Date: Wed, 7 Dec 2022 18:41:22 +0200 Subject: [PATCH 2/5] Test writing protobuf messages containing imports using TestingKafka Writing protobuf messages using TestingKafka throws NullPointerException if the schema uses an `import` directive. Co-authored-by: Maxim Lukyanenko Co-authored-by: Ashhar Hasan --- ...ithSchemaRegistryMinimalFunctionality.java | 78 +++++++++++++++++++ .../protobuf/structural_datatypes.proto | 31 ++++++++ 2 files changed, 109 insertions(+) create mode 100644 plugin/trino-kafka/src/test/resources/protobuf/structural_datatypes.proto diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index 432080a27def..0cb5d71269e9 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -18,19 +18,23 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Timestamp; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; import io.trino.plugin.kafka.schema.confluent.KafkaWithConfluentSchemaRegistryQueryRunner; +import io.trino.spi.type.SqlTimestamp; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import io.trino.testing.kafka.TestingKafka; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.testng.annotations.Test; import java.time.Duration; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -42,13 +46,20 @@ import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE; import static io.trino.decoder.protobuf.ProtobufUtils.getFileDescriptor; import static io.trino.decoder.protobuf.ProtobufUtils.getProtoFile; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; +import static io.trino.testing.DateTimeTestingUtils.sqlTimestampOf; +import static java.lang.Math.floorDiv; import static java.lang.Math.multiplyExact; +import static java.lang.StrictMath.floorMod; import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) @@ -166,6 +177,73 @@ public void testUnsupportedRecursiveDataTypes() succinctDuration(2, SECONDS)); } + @Test + public void testSchemaWithImportDataTypes() + throws Exception + { + String topic = "topic-schema-with-import"; + assertNotExists(topic); + + Descriptor descriptor = getDescriptor("structural_datatypes.proto"); + + Timestamp timestamp = getTimestamp(sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923"))); + DynamicMessage message = buildDynamicMessage( + descriptor, + ImmutableMap.builder() + .put("list", ImmutableList.of("Search")) + .put("map", ImmutableList.of(buildDynamicMessage( + descriptor.findFieldByName("map").getMessageType(), + ImmutableMap.of("key", "Key1", "value", "Value1")))) + .put("row", ImmutableMap.builder() + .put("string_column", "Trino") + .put("integer_column", 1) + .put("long_column", 493857959588286460L) + .put("double_column", 3.14159265358979323846) + .put("float_column", 3.14f) + .put("boolean_column", true) + .put("number_column", descriptor.findEnumTypeByName("Number").findValueByName("ONE")) + .put("timestamp_column", timestamp) + .put("bytes_column", "Trino".getBytes(UTF_8)) + .buildOrThrow()) + .buildOrThrow()); + + ImmutableList.Builder> producerRecordBuilder = ImmutableList.builder(); + producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message)); + List> messages = producerRecordBuilder.build(); + assertThatThrownBy(() -> testingKafka.sendMessages( + messages.stream(), + producerProperties())) + .isInstanceOf(SerializationException.class) + .hasMessage("Error serializing Protobuf message") + .getCause() + .isInstanceOf(NullPointerException.class) + .hasMessage("Cannot invoke \"com.squareup.wire.schema.internal.parser.ProtoFileElement.getImports()\" because \"protoFileElement\" is null"); + } + + private DynamicMessage buildDynamicMessage(Descriptor descriptor, Map data) + { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + for (Map.Entry entry : data.entrySet()) { + FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey()); + if (entry.getValue() instanceof Map) { + builder.setField(fieldDescriptor, buildDynamicMessage(fieldDescriptor.getMessageType(), (Map) entry.getValue())); + } + else { + builder.setField(fieldDescriptor, entry.getValue()); + } + } + + return builder.build(); + } + + protected static Timestamp getTimestamp(SqlTimestamp sqlTimestamp) + { + return Timestamp.newBuilder() + .setSeconds(floorDiv(sqlTimestamp.getEpochMicros(), MICROSECONDS_PER_SECOND)) + .setNanos(floorMod(sqlTimestamp.getEpochMicros(), MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND) + .build(); + } + private Map producerProperties() { return ImmutableMap.of( diff --git a/plugin/trino-kafka/src/test/resources/protobuf/structural_datatypes.proto b/plugin/trino-kafka/src/test/resources/protobuf/structural_datatypes.proto new file mode 100644 index 000000000000..854bd674dec5 --- /dev/null +++ b/plugin/trino-kafka/src/test/resources/protobuf/structural_datatypes.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message schema { + repeated string list = 1; + map map = 2; + enum Number { + ZERO = 0; + ONE = 1; + TWO = 2; + }; + message Row { + string string_column = 1; + uint32 integer_column = 2; + uint64 long_column = 3; + double double_column = 4; + float float_column = 5; + bool boolean_column = 6; + Number number_column = 7; + google.protobuf.Timestamp timestamp_column = 8; + bytes bytes_column = 9; + }; + Row row = 3; + message NestedRow { + repeated Row nested_list = 1; + map nested_map = 2; + Row row = 3; + }; + NestedRow nested_row = 4; +} From ba41e56ce1afbfa63503685b05e845ad0b638b46 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Tue, 17 Jan 2023 14:51:02 +0530 Subject: [PATCH 3/5] Fix writing protobuf messages containing `import` using TestingKafka Co-authored-by: Maxim Lukyanenko Co-authored-by: Ashhar Hasan --- .../trino/decoder/protobuf/ProtobufUtils.java | 10 +++++-- ...ithSchemaRegistryMinimalFunctionality.java | 30 +++++++++++++------ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufUtils.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufUtils.java index 02082c37c612..bfdaaff59aee 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufUtils.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufUtils.java @@ -71,9 +71,15 @@ private ProtobufUtils() public static FileDescriptor getFileDescriptor(String protoFile) throws DescriptorValidationException + { + return getFileDescriptor(Optional.empty(), protoFile); + } + + public static FileDescriptor getFileDescriptor(Optional fileName, String protoFile) + throws DescriptorValidationException { ProtoFileElement protoFileElement = ProtoParser.Companion.parse(Location.get(""), protoFile); - return getFileDescriptor(Optional.empty(), protoFileElement); + return getFileDescriptor(fileName, protoFileElement); } public static FileDescriptor getFileDescriptor(Optional fileName, ProtoFileElement protoFileElement) @@ -84,7 +90,7 @@ public static FileDescriptor getFileDescriptor(Optional fileName, ProtoF int index = 0; for (String importStatement : protoFileElement.getImports()) { try { - FileDescriptor fileDescriptor = getFileDescriptor(getProtoFile(importStatement)); + FileDescriptor fileDescriptor = getFileDescriptor(Optional.of(importStatement), getProtoFile(importStatement)); fileDescriptor.getMessageTypes().stream() .map(Descriptor::getFullName) .forEach(definedMessages::add); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index 0cb5d71269e9..f90bbaa07095 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -30,7 +30,6 @@ import io.trino.testing.QueryRunner; import io.trino.testing.kafka.TestingKafka; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.testng.annotations.Test; import java.time.Duration; @@ -59,7 +58,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) @@ -210,14 +209,27 @@ public void testSchemaWithImportDataTypes() ImmutableList.Builder> producerRecordBuilder = ImmutableList.builder(); producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message)); List> messages = producerRecordBuilder.build(); - assertThatThrownBy(() -> testingKafka.sendMessages( + testingKafka.sendMessages( messages.stream(), - producerProperties())) - .isInstanceOf(SerializationException.class) - .hasMessage("Error serializing Protobuf message") - .getCause() - .isInstanceOf(NullPointerException.class) - .hasMessage("Cannot invoke \"com.squareup.wire.schema.internal.parser.ProtoFileElement.getImports()\" because \"protoFileElement\" is null"); + producerProperties()); + waitUntilTableExists(topic); + + assertThat(query(format("SELECT list, map, row FROM %s", toDoubleQuoted(topic)))) + .matches(""" + VALUES ( + ARRAY[CAST('Search' AS VARCHAR)], + MAP(CAST(ARRAY['Key1'] AS ARRAY(VARCHAR)), CAST(ARRAY['Value1'] AS ARRAY(VARCHAR))), + CAST(ROW('Trino', 1, 493857959588286460, 3.14159265358979323846, 3.14, True, 'ONE', TIMESTAMP '2020-12-12 15:35:45.923', to_utf8('Trino')) + AS ROW( + string_column VARCHAR, + integer_column INTEGER, + long_column BIGINT, + double_column DOUBLE, + float_column REAL, + boolean_column BOOLEAN, + number_column VARCHAR, + timestamp_column TIMESTAMP(6), + bytes_column VARBINARY)))"""); } private DynamicMessage buildDynamicMessage(Descriptor descriptor, Map data) From 97e6ee3ee2900d1f1cefb0938917e5e1181aea6e Mon Sep 17 00:00:00 2001 From: Maxim Lukyanenko Date: Wed, 7 Dec 2022 18:44:30 +0200 Subject: [PATCH 4/5] Use declared variable instead of a long method chain --- .../plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java index d9b3e808c5a9..6187962ef51e 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java @@ -98,7 +98,7 @@ private Type getType(FieldDescriptor fieldDescriptor) private Type getTypeForMessage(FieldDescriptor fieldDescriptor) { Descriptor descriptor = fieldDescriptor.getMessageType(); - if (fieldDescriptor.getMessageType().getFullName().equals(TIMESTAMP_TYPE_NAME)) { + if (descriptor.getFullName().equals(TIMESTAMP_TYPE_NAME)) { return createTimestampType(6); } if (fieldDescriptor.isMapField()) { @@ -108,7 +108,7 @@ private Type getTypeForMessage(FieldDescriptor fieldDescriptor) typeManager.getTypeOperators()); } return RowType.from( - fieldDescriptor.getMessageType().getFields().stream() + descriptor.getFields().stream() .map(field -> RowType.field(field.getName(), getType(field))) .collect(toImmutableList())); } From 24d3033cc5cac07e2391ed6450986170d7876927 Mon Sep 17 00:00:00 2001 From: Maxim Lukyanenko Date: Mon, 5 Dec 2022 23:09:05 +0200 Subject: [PATCH 5/5] Improve error message in Kafka connector with recursive protobuf schemas Kafka connector throws a StackOverflowError when the protobuf schema includes self referenced object types. To improve user experience the schema parsing now throws TrinoException saying that the type is unsupported. Co-authored-by: Maxim Lukyanenko Co-authored-by: Ashhar Hasan --- .../protobuf/ProtobufSchemaParser.java | 55 ++++++++++++++++--- ...ithSchemaRegistryMinimalFunctionality.java | 13 +++-- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java index 6187962ef51e..6360f82b1a94 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.kafka.encoder.protobuf; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import io.confluent.kafka.schemaregistry.ParsedSchema; @@ -21,6 +23,7 @@ import io.trino.plugin.kafka.KafkaTopicFieldDescription; import io.trino.plugin.kafka.KafkaTopicFieldGroup; import io.trino.plugin.kafka.schema.confluent.SchemaParser; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.ArrayType; import io.trino.spi.type.MapType; @@ -30,9 +33,14 @@ import javax.inject.Inject; +import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -42,6 +50,7 @@ import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; public class ProtobufSchemaParser implements SchemaParser @@ -66,7 +75,7 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars protobufSchema.toDescriptor().getFields().stream() .map(field -> new KafkaTopicFieldDescription( field.getName(), - getType(field), + getType(field, ImmutableList.of()), field.getName(), null, null, @@ -75,7 +84,7 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars .collect(toImmutableList())); } - private Type getType(FieldDescriptor fieldDescriptor) + private Type getType(FieldDescriptor fieldDescriptor, List processedMessages) { Type baseType = switch (fieldDescriptor.getJavaType()) { case BOOLEAN -> BOOLEAN; @@ -85,31 +94,61 @@ private Type getType(FieldDescriptor fieldDescriptor) case DOUBLE -> DOUBLE; case BYTE_STRING -> VARBINARY; case STRING, ENUM -> createUnboundedVarcharType(); - case MESSAGE -> getTypeForMessage(fieldDescriptor); + case MESSAGE -> getTypeForMessage(fieldDescriptor, processedMessages); }; - // Protobuf does not support adding repeated label for map type but schema registry incorrecty adds it + // Protobuf does not support adding repeated label for map type but schema registry incorrectly adds it if (fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { return new ArrayType(baseType); } return baseType; } - private Type getTypeForMessage(FieldDescriptor fieldDescriptor) + private Type getTypeForMessage(FieldDescriptor fieldDescriptor, List processedMessages) { Descriptor descriptor = fieldDescriptor.getMessageType(); if (descriptor.getFullName().equals(TIMESTAMP_TYPE_NAME)) { return createTimestampType(6); } + + // We MUST check just the type names since same type can be present with different field names which is also recursive + Set processedMessagesFullTypeNames = processedMessages.stream() + .map(FieldAndType::fullTypeName) + .collect(toImmutableSet()); + if (processedMessagesFullTypeNames.contains(descriptor.getFullName())) { + throw new TrinoException(NOT_SUPPORTED, "Protobuf schema containing fields with self-reference are not supported because they cannot be mapped to a Trino type: %s" + .formatted(Streams.concat(processedMessages.stream(), Stream.of(new FieldAndType(fieldDescriptor))) + .map(FieldAndType::toString) + .collect(joining(" > ")))); + } + List newProcessedMessages = ImmutableList.builderWithExpectedSize(processedMessages.size() + 1) + .addAll(processedMessages) + .add(new FieldAndType(fieldDescriptor)) + .build(); + if (fieldDescriptor.isMapField()) { return new MapType( - getType(descriptor.findFieldByNumber(1)), - getType(descriptor.findFieldByNumber(2)), + getType(descriptor.findFieldByNumber(1), newProcessedMessages), + getType(descriptor.findFieldByNumber(2), newProcessedMessages), typeManager.getTypeOperators()); } return RowType.from( descriptor.getFields().stream() - .map(field -> RowType.field(field.getName(), getType(field))) + .map(field -> RowType.field(field.getName(), getType(field, newProcessedMessages))) .collect(toImmutableList())); } + + public record FieldAndType(String fullFieldName, String fullTypeName) + { + public FieldAndType(FieldDescriptor fieldDescriptor) + { + this(fieldDescriptor.getFullName(), fieldDescriptor.getMessageType().getFullName()); + } + + @Override + public String toString() + { + return fullFieldName + ": " + fullTypeName; + } + } } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index f90bbaa07095..c23b34942aa9 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -39,7 +39,6 @@ import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.ENUM; import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.STRING; -import static io.airlift.units.Duration.succinctDuration; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY; import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE; @@ -55,7 +54,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; @@ -170,10 +168,13 @@ public void testUnsupportedRecursiveDataTypes() messages.stream(), producerProperties()); - // Any operation which performs schema parsing leads to failure so we cannot use #waitUntilTableExists - assertQueryFailsEventually("SELECT * FROM " + toDoubleQuoted(topic), - "\\Qstatement is too large (stack overflow during analysis)\\E", - succinctDuration(2, SECONDS)); + waitUntilTableExists(topic); + assertQueryFails("SELECT * FROM " + toDoubleQuoted(topic), + "Protobuf schema containing fields with self-reference are not supported because they cannot be mapped to a Trino type: " + + "io.trino.protobuf.schema.recursive_value_one: io.trino.protobuf.RecursiveValue > " + + "io.trino.protobuf.RecursiveValue.struct_value: io.trino.protobuf.RecursiveStruct > " + + "io.trino.protobuf.RecursiveStruct.fields: io.trino.protobuf.RecursiveStruct.FieldsEntry > " + + "io.trino.protobuf.RecursiveStruct.FieldsEntry.value: io.trino.protobuf.RecursiveValue"); } @Test