Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,15 @@ private ProtobufUtils()

public static FileDescriptor getFileDescriptor(String protoFile)
throws DescriptorValidationException
{
return getFileDescriptor(Optional.empty(), protoFile);
}

public static FileDescriptor getFileDescriptor(Optional<String> fileName, String protoFile)
throws DescriptorValidationException
{
ProtoFileElement protoFileElement = ProtoParser.Companion.parse(Location.get(""), protoFile);
return getFileDescriptor(Optional.empty(), protoFileElement);
return getFileDescriptor(fileName, protoFileElement);
}

public static FileDescriptor getFileDescriptor(Optional<String> fileName, ProtoFileElement protoFileElement)
Expand All @@ -84,7 +90,7 @@ public static FileDescriptor getFileDescriptor(Optional<String> fileName, ProtoF
int index = 0;
for (String importStatement : protoFileElement.getImports()) {
try {
FileDescriptor fileDescriptor = getFileDescriptor(getProtoFile(importStatement));
FileDescriptor fileDescriptor = getFileDescriptor(Optional.of(importStatement), getProtoFile(importStatement));
fileDescriptor.getMessageTypes().stream()
.map(Descriptor::getFullName)
.forEach(definedMessages::add);
Expand Down
40 changes: 40 additions & 0 deletions plugin/trino-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,46 @@
</ignoredResourcePatterns>
</configuration>
</plugin>
<!-- TODO: Instead of using this plugin to generate classes we should invoke the parser directly (https://github.com/trinodb/trino/issues/16039) -->
<plugin>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we add a issue to remove this workaround on generating the code ? The way we invoke parser should support it out of the box, I have a rough fix for it and will be working on a PR for the same.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll create an issue + add TODO.

cc: @adamjshook regarding some schema parser improvements Praveen plans to make which might be relevant for you.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that'd be relevant as I am using the same plugin. I'll keep an eye out for it.

<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a specific version of this plugin - #16043

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

<executions>
<execution>
<id>generate-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${dep.protobuf.version}</protocVersion>
<addSources>none</addSources>
<inputDirectories>
<include>src/test/resources/protobuf-sources</include>
</inputDirectories>
<outputDirectory>target/generated-test-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-test-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.kafka.encoder.protobuf;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import io.confluent.kafka.schemaregistry.ParsedSchema;
Expand All @@ -21,6 +23,7 @@
import io.trino.plugin.kafka.KafkaTopicFieldDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.plugin.kafka.schema.confluent.SchemaParser;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
Expand All @@ -30,9 +33,14 @@

import javax.inject.Inject;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DoubleType.DOUBLE;
Expand All @@ -42,6 +50,7 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class ProtobufSchemaParser
implements SchemaParser
Expand All @@ -66,7 +75,7 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars
protobufSchema.toDescriptor().getFields().stream()
.map(field -> new KafkaTopicFieldDescription(
field.getName(),
getType(field),
getType(field, ImmutableList.of()),
field.getName(),
null,
null,
Expand All @@ -75,7 +84,7 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars
.collect(toImmutableList()));
}

private Type getType(FieldDescriptor fieldDescriptor)
private Type getType(FieldDescriptor fieldDescriptor, List<FieldAndType> processedMessages)
{
Type baseType = switch (fieldDescriptor.getJavaType()) {
case BOOLEAN -> BOOLEAN;
Expand All @@ -85,31 +94,61 @@ private Type getType(FieldDescriptor fieldDescriptor)
case DOUBLE -> DOUBLE;
case BYTE_STRING -> VARBINARY;
case STRING, ENUM -> createUnboundedVarcharType();
case MESSAGE -> getTypeForMessage(fieldDescriptor);
case MESSAGE -> getTypeForMessage(fieldDescriptor, processedMessages);
};

// Protobuf does not support adding repeated label for map type but schema registry incorrecty adds it
// Protobuf does not support adding repeated label for map type but schema registry incorrectly adds it
if (fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) {
return new ArrayType(baseType);
}
return baseType;
}

private Type getTypeForMessage(FieldDescriptor fieldDescriptor)
private Type getTypeForMessage(FieldDescriptor fieldDescriptor, List<FieldAndType> processedMessages)
{
Descriptor descriptor = fieldDescriptor.getMessageType();
if (fieldDescriptor.getMessageType().getFullName().equals(TIMESTAMP_TYPE_NAME)) {
if (descriptor.getFullName().equals(TIMESTAMP_TYPE_NAME)) {
return createTimestampType(6);
}

// We MUST check just the type names since same type can be present with different field names which is also recursive
Set<String> processedMessagesFullTypeNames = processedMessages.stream()
.map(FieldAndType::fullTypeName)
.collect(toImmutableSet());
if (processedMessagesFullTypeNames.contains(descriptor.getFullName())) {
throw new TrinoException(NOT_SUPPORTED, "Protobuf schema containing fields with self-reference are not supported because they cannot be mapped to a Trino type: %s"
.formatted(Streams.concat(processedMessages.stream(), Stream.of(new FieldAndType(fieldDescriptor)))
.map(FieldAndType::toString)
.collect(joining(" > "))));
}
List<FieldAndType> newProcessedMessages = ImmutableList.<FieldAndType>builderWithExpectedSize(processedMessages.size() + 1)
.addAll(processedMessages)
.add(new FieldAndType(fieldDescriptor))
.build();

if (fieldDescriptor.isMapField()) {
return new MapType(
getType(descriptor.findFieldByNumber(1)),
getType(descriptor.findFieldByNumber(2)),
getType(descriptor.findFieldByNumber(1), newProcessedMessages),
getType(descriptor.findFieldByNumber(2), newProcessedMessages),
typeManager.getTypeOperators());
}
return RowType.from(
fieldDescriptor.getMessageType().getFields().stream()
.map(field -> RowType.field(field.getName(), getType(field)))
descriptor.getFields().stream()
.map(field -> RowType.field(field.getName(), getType(field, newProcessedMessages)))
.collect(toImmutableList()));
}

public record FieldAndType(String fullFieldName, String fullTypeName)
{
public FieldAndType(FieldDescriptor fieldDescriptor)
{
this(fieldDescriptor.getFullName(), fieldDescriptor.getMessageType().getFullName());
}

@Override
public String toString()
{
return fullFieldName + ": " + fullTypeName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Timestamp;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import io.trino.plugin.kafka.schema.confluent.KafkaWithConfluentSchemaRegistryQueryRunner;
import io.trino.spi.type.SqlTimestamp;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.TestingKafka;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.testng.annotations.Test;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

Expand All @@ -41,12 +44,19 @@
import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE;
import static io.trino.decoder.protobuf.ProtobufUtils.getFileDescriptor;
import static io.trino.decoder.protobuf.ProtobufUtils.getProtoFile;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.testing.DateTimeTestingUtils.sqlTimestampOf;
import static java.lang.Math.floorDiv;
import static java.lang.Math.multiplyExact;
import static java.lang.StrictMath.floorMod;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
Expand Down Expand Up @@ -140,6 +150,113 @@ public void testBasicTopicForInsert()
"Insert is not supported for schema registry based tables");
}

@Test
public void testUnsupportedRecursiveDataTypes()
throws Exception
{
String topic = "topic-unsupported-recursive";
assertNotExists(topic);

UnsupportedRecursiveTypes.schema message = UnsupportedRecursiveTypes.schema.newBuilder()
.setRecursiveValueOne(UnsupportedRecursiveTypes.RecursiveValue.newBuilder().setStringValue("Value1").build())
.build();

ImmutableList.Builder<ProducerRecord<DynamicMessage, UnsupportedRecursiveTypes.schema>> producerRecordBuilder = ImmutableList.builder();
producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message));
List<ProducerRecord<DynamicMessage, UnsupportedRecursiveTypes.schema>> messages = producerRecordBuilder.build();
testingKafka.sendMessages(
messages.stream(),
producerProperties());

waitUntilTableExists(topic);
assertQueryFails("SELECT * FROM " + toDoubleQuoted(topic),
"Protobuf schema containing fields with self-reference are not supported because they cannot be mapped to a Trino type: " +
"io.trino.protobuf.schema.recursive_value_one: io.trino.protobuf.RecursiveValue > " +
"io.trino.protobuf.RecursiveValue.struct_value: io.trino.protobuf.RecursiveStruct > " +
"io.trino.protobuf.RecursiveStruct.fields: io.trino.protobuf.RecursiveStruct.FieldsEntry > " +
"io.trino.protobuf.RecursiveStruct.FieldsEntry.value: io.trino.protobuf.RecursiveValue");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message goes to user, they are not much aware of trino java packages. Should use field names from protobuf schema?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Test
public void testSchemaWithImportDataTypes()
throws Exception
{
String topic = "topic-schema-with-import";
assertNotExists(topic);

Descriptor descriptor = getDescriptor("structural_datatypes.proto");

Timestamp timestamp = getTimestamp(sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923")));
DynamicMessage message = buildDynamicMessage(
descriptor,
ImmutableMap.<String, Object>builder()
.put("list", ImmutableList.of("Search"))
.put("map", ImmutableList.of(buildDynamicMessage(
descriptor.findFieldByName("map").getMessageType(),
ImmutableMap.of("key", "Key1", "value", "Value1"))))
.put("row", ImmutableMap.<String, Object>builder()
.put("string_column", "Trino")
.put("integer_column", 1)
.put("long_column", 493857959588286460L)
.put("double_column", 3.14159265358979323846)
.put("float_column", 3.14f)
.put("boolean_column", true)
.put("number_column", descriptor.findEnumTypeByName("Number").findValueByName("ONE"))
.put("timestamp_column", timestamp)
.put("bytes_column", "Trino".getBytes(UTF_8))
.buildOrThrow())
.buildOrThrow());

ImmutableList.Builder<ProducerRecord<DynamicMessage, DynamicMessage>> producerRecordBuilder = ImmutableList.builder();
producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message));
List<ProducerRecord<DynamicMessage, DynamicMessage>> messages = producerRecordBuilder.build();
testingKafka.sendMessages(
messages.stream(),
producerProperties());
waitUntilTableExists(topic);

assertThat(query(format("SELECT list, map, row FROM %s", toDoubleQuoted(topic))))
.matches("""
VALUES (
ARRAY[CAST('Search' AS VARCHAR)],
MAP(CAST(ARRAY['Key1'] AS ARRAY(VARCHAR)), CAST(ARRAY['Value1'] AS ARRAY(VARCHAR))),
CAST(ROW('Trino', 1, 493857959588286460, 3.14159265358979323846, 3.14, True, 'ONE', TIMESTAMP '2020-12-12 15:35:45.923', to_utf8('Trino'))
AS ROW(
string_column VARCHAR,
integer_column INTEGER,
long_column BIGINT,
double_column DOUBLE,
float_column REAL,
boolean_column BOOLEAN,
number_column VARCHAR,
timestamp_column TIMESTAMP(6),
bytes_column VARBINARY)))""");
}

private DynamicMessage buildDynamicMessage(Descriptor descriptor, Map<String, Object> data)
{
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (Map.Entry<String, Object> entry : data.entrySet()) {
FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey());
if (entry.getValue() instanceof Map<?, ?>) {
builder.setField(fieldDescriptor, buildDynamicMessage(fieldDescriptor.getMessageType(), (Map<String, Object>) entry.getValue()));
}
else {
builder.setField(fieldDescriptor, entry.getValue());
}
}

return builder.build();
}

protected static Timestamp getTimestamp(SqlTimestamp sqlTimestamp)
{
return Timestamp.newBuilder()
.setSeconds(floorDiv(sqlTimestamp.getEpochMicros(), MICROSECONDS_PER_SECOND))
.setNanos(floorMod(sqlTimestamp.getEpochMicros(), MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND)
.build();
}

private Map<String, String> producerProperties()
{
return ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
syntax = "proto3";

package io.trino.protobuf;

option java_package = "io.trino.plugin.kafka.protobuf";
option java_outer_classname = "UnsupportedRecursiveTypes";

message schema {
RecursiveValue recursive_value_one = 1;
}

message RecursiveStruct {
map<string, RecursiveValue> fields = 1;
}

message RecursiveValue {
string string_value = 1;
RecursiveStruct struct_value = 2;
RecursiveListValue list_value = 3;
}

message RecursiveListValue {
repeated RecursiveValue values = 1;
}
Loading