diff --git a/docs/src/main/sphinx/connector/kafka.rst b/docs/src/main/sphinx/connector/kafka.rst index 17d283ca0215..030efeec79cb 100644 --- a/docs/src/main/sphinx/connector/kafka.rst +++ b/docs/src/main/sphinx/connector/kafka.rst @@ -106,6 +106,8 @@ Property name Description ``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not. ``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_`` ``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``. +``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf ``any`` types to ``JSON`` by setting the property to ``true``, + defaults to ``false``. ``kafka.timestamp-upper-bound-force-push-down-enabled`` Controls if upper bound timestamp pushdown is enabled for topics using ``CreateTime`` mode. ``kafka.security-protocol`` Security protocol for connection to Kafka cluster; defaults to ``PLAINTEXT``. ``kafka.ssl.keystore.location`` Location of the keystore file. @@ -438,7 +440,7 @@ table description supplier are: * New tables can be defined without a cluster restart. * Schema updates are detected automatically. * There is no need to define tables manually. -* Some Protobuf specific types like ``oneof`` are supported and mapped to JSON. +* Some Protobuf specific types like ``oneof`` and ``any`` are supported and mapped to JSON. When using Protobuf decoder with the Confluent table description supplier, some additional steps are necessary. For details, refer to :ref:`kafka-requirements`. @@ -1453,8 +1455,61 @@ Trino data type Allowed Protobuf data type ``ARRAY`` Protobuf type with ``repeated`` field ``MAP`` ``Map`` ``TIMESTAMP`` ``Timestamp``, predefined in ``timestamp.proto`` +``JSON`` ``oneof`` (Confluent table supplier only), ``Any`` ===================================== ======================================= +any ++++ + +Message types with an `Any `_ +field contain an arbitrary serialized message as bytes and a type URL to resolve +that message's type with a scheme of ``file://``, ``http://``, or ``https://``. +The connector reads the contents of the URL to create the type descriptor +for the ``Any`` message and convert the message to JSON. This behavior is enabled +by setting ``kafka.protobuf-any-support-enabled`` to ``true``. + +The descriptors for each distinct URL are cached for performance reasons and +any modifications made to the type returned by the URL requires a restart of +Trino. + +For example, given the following Protobuf schema which defines ``MyMessage`` +with three columns: + +.. code-block:: text + + syntax = "proto3"; + + message MyMessage { + string stringColumn = 1; + uint32 integerColumn = 2; + uint64 longColumn = 3; + } + +And a separate schema which uses an ``Any`` type which is a packed message +of the above type and a valid URL: + +.. code-block:: text + + syntax = "proto3"; + + import "google/protobuf/any.proto"; + + message schema { + google.protobuf.Any any_message = 1; + } + +The corresponding Trino column is named ``any_message`` of type ``JSON`` +containing a JSON-serialized representation of the Protobuf message: + +.. code-block:: text + + { + "@type":"file:///path/to/schemas/MyMessage", + "longColumn":"493857959588286460", + "numberColumn":"ONE", + "stringColumn":"Trino" + } + Protobuf schema evolution +++++++++++++++++++++++++ @@ -1481,7 +1536,6 @@ The schema evolution behavior is as follows: Protobuf limitations ++++++++++++++++++++ -* Protobuf specific types like ``any``, ``oneof`` are not supported. * Protobuf Timestamp has a nanosecond precision but Trino supports decoding/encoding at microsecond precision. diff --git a/lib/trino-record-decoder/pom.xml b/lib/trino-record-decoder/pom.xml index b6e21f57ef04..fdb77c52e68e 100644 --- a/lib/trino-record-decoder/pom.xml +++ b/lib/trino-record-decoder/pom.xml @@ -17,6 +17,12 @@ + + + com.fasterxml.jackson.core + jackson-core + + com.fasterxml.jackson.core jackson-databind @@ -63,6 +69,12 @@ slice + + io.trino + trino-cache + + + io.trino trino-spi @@ -116,6 +128,7 @@ io.confluent kafka-protobuf-provider + test diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/DescriptorProvider.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/DescriptorProvider.java new file mode 100644 index 000000000000..f0d063d6c9aa --- /dev/null +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/DescriptorProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.decoder.protobuf; + +import com.google.protobuf.Descriptors.Descriptor; +import io.trino.spi.TrinoException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Optional; + +import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +public interface DescriptorProvider +{ + Optional getDescriptorFromTypeUrl(String url); + + default String getContents(String url) + { + requireNonNull(url, "url is null"); + ByteArrayOutputStream typeBytes = new ByteArrayOutputStream(); + try (InputStream stream = new URL(url).openStream()) { + stream.transferTo(typeBytes); + } + catch (IOException e) { + throw new TrinoException(GENERIC_USER_ERROR, "Failed to read schema from URL", e); + } + return typeBytes.toString(UTF_8); + } +} diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/DummyDescriptorProvider.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/DummyDescriptorProvider.java new file mode 100644 index 000000000000..2bdcf1519810 --- /dev/null +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/DummyDescriptorProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.decoder.protobuf; + +import com.google.protobuf.Descriptors.Descriptor; + +import java.util.Optional; + +public class DummyDescriptorProvider + implements DescriptorProvider +{ + @Override + public Optional getDescriptorFromTypeUrl(String url) + { + return Optional.empty(); + } +} diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/FileDescriptorProvider.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/FileDescriptorProvider.java new file mode 100644 index 000000000000..28009bedf929 --- /dev/null +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/FileDescriptorProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.decoder.protobuf; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.Descriptor; +import io.trino.spi.TrinoException; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.cache.SafeCaches.buildNonEvictableCache; +import static io.trino.decoder.protobuf.ProtobufErrorCode.INVALID_PROTO_FILE; +import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class FileDescriptorProvider + implements DescriptorProvider +{ + private final LoadingCache protobufTypeUrlCache; + + public FileDescriptorProvider() + { + protobufTypeUrlCache = buildNonEvictableCache( + CacheBuilder.newBuilder().maximumSize(1000), + CacheLoader.from(this::loadDescriptorFromType)); + } + + @Override + public Optional getDescriptorFromTypeUrl(String url) + { + try { + requireNonNull(url, "url is null"); + return Optional.of(protobufTypeUrlCache.get(url)); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Descriptor loadDescriptorFromType(String url) + { + try { + Descriptor descriptor = ProtobufUtils.getFileDescriptor(getContents(url)).findMessageTypeByName(DEFAULT_MESSAGE); + checkState(descriptor != null, format("Message %s not found", DEFAULT_MESSAGE)); + return descriptor; + } + catch (Descriptors.DescriptorValidationException e) { + throw new TrinoException(INVALID_PROTO_FILE, "Unable to parse protobuf schema", e); + } + } +} diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java index 1e53759307dc..609fd5cb00df 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java @@ -13,13 +13,19 @@ */ package io.trino.decoder.protobuf; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableSet; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Descriptors.OneofDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.util.JsonFormat.TypeRegistry; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.decoder.DecoderColumnHandle; @@ -45,10 +51,12 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -60,6 +68,9 @@ public class ProtobufColumnDecoder { + // Trino JSON types are expected to be sorted by key + private static final ObjectMapper mapper = JsonMapper.builder().configure(ORDER_MAP_ENTRIES_BY_KEYS, true).build(); + private static final String ANY_TYPE_NAME = "google.protobuf.Any"; private static final Slice EMPTY_JSON = Slices.utf8Slice("{}"); private static final Set SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of( @@ -76,13 +87,15 @@ public class ProtobufColumnDecoder private final String columnMapping; private final String columnName; private final TypeManager typeManager; + private final DescriptorProvider descriptorProvider; private final Type jsonType; - public ProtobufColumnDecoder(DecoderColumnHandle columnHandle, TypeManager typeManager) + public ProtobufColumnDecoder(DecoderColumnHandle columnHandle, TypeManager typeManager, DescriptorProvider descriptorProvider) { try { requireNonNull(columnHandle, "columnHandle is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.descriptorProvider = requireNonNull(descriptorProvider, "descriptorProvider is null"); this.jsonType = typeManager.getType(new TypeSignature(JSON)); this.columnType = columnHandle.getType(); this.columnMapping = columnHandle.getMapping(); @@ -141,7 +154,7 @@ public FieldValueProvider decodeField(DynamicMessage dynamicMessage) } @Nullable - private static Object locateField(DynamicMessage message, String columnMapping) + private Object locateField(DynamicMessage message, String columnMapping) { Object value = message; Optional valueDescriptor = Optional.of(message.getDescriptorForType()); @@ -160,6 +173,11 @@ private static Object locateField(DynamicMessage message, String columnMapping) value = ((DynamicMessage) value).getField(fieldDescriptor); valueDescriptor = getDescriptor(fieldDescriptor); } + + if (valueDescriptor.isPresent() && valueDescriptor.get().getFullName().equals(ANY_TYPE_NAME)) { + return createAnyJson((Message) value, valueDescriptor.get()); + } + return value; } @@ -206,4 +224,34 @@ private static Object createOneofJson(DynamicMessage message, OneofDescriptor de } return EMPTY_JSON; } + + private Object createAnyJson(Message value, Descriptor valueDescriptor) + { + try { + String typeUrl = (String) value.getField(valueDescriptor.findFieldByName("type_url")); + Optional descriptor = descriptorProvider.getDescriptorFromTypeUrl(typeUrl); + if (descriptor.isPresent()) { + return Slices.utf8Slice(sorted(JsonFormat.printer() + .usingTypeRegistry(TypeRegistry.newBuilder().add(descriptor.get()).build()) + .omittingInsignificantWhitespace() + .print(value))); + } + return null; + } + catch (InvalidProtocolBufferException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to print JSON from 'any' message type", e); + } + } + + private static String sorted(String json) + { + try { + // Trino JSON types are expected to be sorted by key + // This routine takes an input JSON string and sorts the entire tree by key, including nested maps + return mapper.writeValueAsString(mapper.treeToValue(mapper.readTree(json), Map.class)); + } + catch (JsonProcessingException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to process JSON", e); + } + } } diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufDecoderModule.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufDecoderModule.java index e924905128aa..471c36156a9a 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufDecoderModule.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufDecoderModule.java @@ -27,6 +27,7 @@ public class ProtobufDecoderModule public void configure(Binder binder) { binder.bind(DynamicMessageProvider.Factory.class).to(FixedSchemaDynamicMessageProvider.Factory.class).in(SINGLETON); + binder.bind(DescriptorProvider.class).to(DummyDescriptorProvider.class).in(SINGLETON); newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(ProtobufRowDecoder.NAME).to(ProtobufRowDecoderFactory.class).in(SINGLETON); } } diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java index c68950339d02..4aa9ba4bca90 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java @@ -35,13 +35,13 @@ public class ProtobufRowDecoder private final DynamicMessageProvider dynamicMessageProvider; private final Map columnDecoders; - public ProtobufRowDecoder(DynamicMessageProvider dynamicMessageProvider, Set columns, TypeManager typeManager) + public ProtobufRowDecoder(DynamicMessageProvider dynamicMessageProvider, Set columns, TypeManager typeManager, DescriptorProvider descriptorProvider) { this.dynamicMessageProvider = requireNonNull(dynamicMessageProvider, "dynamicMessageSupplier is null"); this.columnDecoders = columns.stream() .collect(toImmutableMap( identity(), - column -> new ProtobufColumnDecoder(column, typeManager))); + column -> new ProtobufColumnDecoder(column, typeManager, descriptorProvider))); } @Override diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java index 4edc07f6ba0b..3870412c0b6f 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java @@ -33,12 +33,14 @@ public class ProtobufRowDecoderFactory private final Factory dynamicMessageProviderFactory; private final TypeManager typeManager; + private final DescriptorProvider descriptorProvider; @Inject - public ProtobufRowDecoderFactory(Factory dynamicMessageProviderFactory, TypeManager typeManager) + public ProtobufRowDecoderFactory(Factory dynamicMessageProviderFactory, TypeManager typeManager, DescriptorProvider descriptorProvider) { this.dynamicMessageProviderFactory = requireNonNull(dynamicMessageProviderFactory, "dynamicMessageProviderFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.descriptorProvider = requireNonNull(descriptorProvider, "descriptorProvider is null"); } @Override @@ -47,6 +49,7 @@ public RowDecoder create(Map decoderParams, Set decodedRow = decoder .decodeRow(message.toByteArray()) @@ -282,6 +287,87 @@ private void assertOneof(DynamicMessage.Builder messageBuilder, assertEquals(decodedRow.get(testOneofColumn).getSlice().toStringUtf8(), expected); } + @Test + public void testAnyTypeWithDummyDescriptor() + throws Exception + { + String stringData = "Trino"; + + Descriptor allDataTypesDescriptor = getDescriptor("all_datatypes.proto"); + DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(allDataTypesDescriptor); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("stringColumn"), stringData); + + Descriptor anyTypeDescriptor = getDescriptor("test_any.proto"); + DynamicMessage.Builder testAnyBuilder = DynamicMessage.newBuilder(anyTypeDescriptor); + testAnyBuilder.setField(anyTypeDescriptor.findFieldByName("id"), 1); + testAnyBuilder.setField(anyTypeDescriptor.findFieldByName("anyMessage"), Any.pack(messageBuilder.build())); + DynamicMessage testAny = testAnyBuilder.build(); + + DecoderTestColumnHandle testAnyColumn = new DecoderTestColumnHandle(0, "anyMessage", JsonType.JSON, "anyMessage", null, null, false, false, false); + ProtobufRowDecoder decoder = new ProtobufRowDecoder(new FixedSchemaDynamicMessageProvider(anyTypeDescriptor), ImmutableSet.of(testAnyColumn), TESTING_TYPE_MANAGER, new DummyDescriptorProvider()); + + Map decodedRow = decoder + .decodeRow(testAny.toByteArray()) + .orElseThrow(AssertionError::new); + + assertTrue(decodedRow.get(testAnyColumn).isNull()); + } + + @Test + public void testAnyTypeWithFileDescriptor() + throws Exception + { + String stringData = "Trino"; + int integerData = 1; + long longData = 493857959588286460L; + double doubleData = PI; + float floatData = 3.14f; + boolean booleanData = true; + String enumData = "ONE"; + SqlTimestamp sqlTimestamp = sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923")); + byte[] bytesData = "X'65683F'".getBytes(UTF_8); + + Descriptor allDataTypesDescriptor = getDescriptor("all_datatypes.proto"); + DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(allDataTypesDescriptor); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("stringColumn"), stringData); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("integerColumn"), integerData); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("longColumn"), longData); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("doubleColumn"), doubleData); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("floatColumn"), floatData); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("booleanColumn"), booleanData); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("numberColumn"), allDataTypesDescriptor.findEnumTypeByName("Number").findValueByName(enumData)); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("timestampColumn"), getTimestamp(sqlTimestamp)); + messageBuilder.setField(allDataTypesDescriptor.findFieldByName("bytesColumn"), bytesData); + + // Get URI of parent directory of the descriptor file + // Any.pack concatenates the message type's full name to the given prefix + URI anySchemaTypeUrl = new File(Resources.getResource("decoder/protobuf/any/all_datatypes/schema").getFile()).getParentFile().toURI(); + Descriptor descriptor = getDescriptor("test_any.proto"); + DynamicMessage.Builder testAnyBuilder = DynamicMessage.newBuilder(descriptor); + testAnyBuilder.setField(descriptor.findFieldByName("id"), 1); + testAnyBuilder.setField(descriptor.findFieldByName("anyMessage"), Any.pack(messageBuilder.build(), anySchemaTypeUrl.toString())); + DynamicMessage testAny = testAnyBuilder.build(); + + DecoderTestColumnHandle testOneOfColumn = new DecoderTestColumnHandle(0, "anyMessage", JsonType.JSON, "anyMessage", null, null, false, false, false); + ProtobufRowDecoder decoder = new ProtobufRowDecoder(new FixedSchemaDynamicMessageProvider(descriptor), ImmutableSet.of(testOneOfColumn), TESTING_TYPE_MANAGER, new FileDescriptorProvider()); + + Map decodedRow = decoder + .decodeRow(testAny.toByteArray()) + .orElseThrow(AssertionError::new); + + JsonNode actual = new ObjectMapper().readTree(decodedRow.get(testOneOfColumn).getSlice().toStringUtf8()); + assertTrue(actual.get("@type").textValue().contains("schema")); + assertEquals(actual.get("stringColumn").textValue(), stringData); + assertEquals(actual.get("integerColumn").intValue(), integerData); + assertEquals(actual.get("longColumn").textValue(), Long.toString(longData)); + assertEquals(actual.get("doubleColumn").doubleValue(), doubleData); + assertEquals(actual.get("floatColumn").floatValue(), floatData); + assertEquals(actual.get("booleanColumn").booleanValue(), booleanData); + assertEquals(actual.get("numberColumn").textValue(), enumData); + assertEquals(actual.get("timestampColumn").textValue(), "2020-12-12T15:35:45.923Z"); + assertEquals(actual.get("bytesColumn").binaryValue(), bytesData); + } + @Test(dataProvider = "allTypesDataProvider", dataProviderClass = ProtobufDataProviders.class) public void testStructuralDataTypes(String stringData, Integer integerData, Long longData, Double doubleData, Float floatData, Boolean booleanData, String enumData, SqlTimestamp sqlTimestamp, byte[] bytesData) throws Exception diff --git a/lib/trino-record-decoder/src/test/resources/decoder/protobuf/any/all_datatypes/schema b/lib/trino-record-decoder/src/test/resources/decoder/protobuf/any/all_datatypes/schema new file mode 100644 index 000000000000..f74aa197f167 --- /dev/null +++ b/lib/trino-record-decoder/src/test/resources/decoder/protobuf/any/all_datatypes/schema @@ -0,0 +1,21 @@ +// Copy of all_datatypes.proto which is a resolvable URL when packed into a google.protobuf.Any type + +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message schema { + string stringColumn = 1 ; + uint32 integerColumn = 2; + uint64 longColumn = 3; + double doubleColumn = 4; + float floatColumn = 5; + bool booleanColumn = 6; + enum Number { + ZERO = 0; + ONE = 1; + }; + Number numberColumn = 7; + google.protobuf.Timestamp timestampColumn = 8; + bytes bytesColumn = 9; +} diff --git a/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_any.proto b/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_any.proto new file mode 100644 index 000000000000..748764be4f43 --- /dev/null +++ b/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_any.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +import "google/protobuf/any.proto"; + +message schema { + int32 id = 1; + google.protobuf.Any anyMessage = 2; +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java index f4eebf4695c7..e228515a57c1 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java @@ -24,6 +24,7 @@ import io.trino.decoder.protobuf.ProtobufRowDecoder; import io.trino.plugin.kafka.KafkaTopicFieldDescription; import io.trino.plugin.kafka.KafkaTopicFieldGroup; +import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig; import io.trino.plugin.kafka.schema.confluent.SchemaParser; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -57,13 +58,16 @@ public class ProtobufSchemaParser implements SchemaParser { + private static final String ANY_TYPE_NAME = "google.protobuf.Any"; private static final String TIMESTAMP_TYPE_NAME = "google.protobuf.Timestamp"; private final TypeManager typeManager; + private final boolean isProtobufAnySupportEnabled; @Inject - public ProtobufSchemaParser(TypeManager typeManager) + public ProtobufSchemaParser(TypeManager typeManager, ProtobufAnySupportConfig config) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.isProtobufAnySupportEnabled = requireNonNull(config, "config is null").isProtobufAnySupportEnabled(); } @Override @@ -143,6 +147,9 @@ private Type getTypeForMessage(FieldDescriptor fieldDescriptor, List processedMessagesFullTypeNames = processedMessages.stream() diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/ProtobufAnySupportConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/ProtobufAnySupportConfig.java new file mode 100644 index 000000000000..659bf995c24b --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/ProtobufAnySupportConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +public class ProtobufAnySupportConfig +{ + private boolean protobufAnySupportEnabled; + + public boolean isProtobufAnySupportEnabled() + { + return protobufAnySupportEnabled; + } + + @Config("kafka.protobuf-any-support-enabled") + @ConfigDescription("True to enable supporting encoding google.protobuf.Any types as JSON") + public ProtobufAnySupportConfig setProtobufAnySupportEnabled(boolean protobufAnySupportEnabled) + { + this.protobufAnySupportEnabled = protobufAnySupportEnabled; + return this; + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentDescriptorProvider.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentDescriptorProvider.java new file mode 100644 index 000000000000..501ac73cd613 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentDescriptorProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.Descriptors.Descriptor; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import io.trino.decoder.protobuf.DescriptorProvider; +import io.trino.spi.TrinoException; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static io.trino.cache.SafeCaches.buildNonEvictableCache; +import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static java.util.Objects.requireNonNull; + +public class ConfluentDescriptorProvider + implements DescriptorProvider +{ + private final LoadingCache protobufTypeUrlCache; + + public ConfluentDescriptorProvider() + { + protobufTypeUrlCache = buildNonEvictableCache( + CacheBuilder.newBuilder().maximumSize(1000), + CacheLoader.from(this::loadDescriptorFromType)); + } + + @Override + public Optional getDescriptorFromTypeUrl(String url) + { + try { + requireNonNull(url, "url is null"); + return Optional.of(protobufTypeUrlCache.get(url)); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Descriptor loadDescriptorFromType(String url) + { + try { + return ((ProtobufSchema) new ProtobufSchemaProvider() + .parseSchema(getContents(url), List.of(), true) + .orElseThrow()) + .toDescriptor(); + } + catch (NoSuchElementException e) { + throw new TrinoException(GENERIC_USER_ERROR, "Failed to parse protobuf schema"); + } + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index 7eb1a48e3fb4..b3981feba164 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -40,6 +40,8 @@ import io.trino.decoder.avro.AvroRowDecoderFactory; import io.trino.decoder.dummy.DummyRowDecoder; import io.trino.decoder.dummy.DummyRowDecoderFactory; +import io.trino.decoder.protobuf.DescriptorProvider; +import io.trino.decoder.protobuf.DummyDescriptorProvider; import io.trino.decoder.protobuf.DynamicMessageProvider; import io.trino.decoder.protobuf.ProtobufRowDecoder; import io.trino.decoder.protobuf.ProtobufRowDecoderFactory; @@ -50,6 +52,7 @@ import io.trino.plugin.kafka.encoder.protobuf.ProtobufRowEncoder; import io.trino.plugin.kafka.encoder.protobuf.ProtobufSchemaParser; import io.trino.plugin.kafka.schema.ContentSchemaReader; +import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig; import io.trino.plugin.kafka.schema.TableDescriptionSupplier; import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; @@ -68,6 +71,7 @@ import static com.google.inject.Scopes.SINGLETON; import static com.google.inject.multibindings.MapBinder.newMapBinder; import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.kafka.encoder.EncoderModule.encoderFactory; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -132,7 +136,7 @@ public static SchemaRegistryClient createSchemaRegistryClient( classLoader); } - private static class ConfluentDecoderModule + private class ConfluentDecoderModule implements Module { @Override @@ -144,6 +148,12 @@ public void configure(Binder binder) newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(ProtobufRowDecoder.NAME).to(ProtobufRowDecoderFactory.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(DummyRowDecoder.NAME).to(DummyRowDecoderFactory.class).in(SINGLETON); binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON); + + configBinder(binder).bindConfig(ProtobufAnySupportConfig.class); + install(conditionalModule(ProtobufAnySupportConfig.class, + ProtobufAnySupportConfig::isProtobufAnySupportEnabled, + new ConfluentDesciptorProviderModule(), + new DummyDescriptorProviderModule())); } } @@ -215,9 +225,9 @@ public static class LazyLoadedProtobufSchemaParser private final Supplier delegate; @Inject - public LazyLoadedProtobufSchemaParser(TypeManager typeManager) + public LazyLoadedProtobufSchemaParser(TypeManager typeManager, ProtobufAnySupportConfig config) { - this.delegate = Suppliers.memoize(() -> new ProtobufSchemaParser(requireNonNull(typeManager, "typeManager is null"))); + this.delegate = Suppliers.memoize(() -> new ProtobufSchemaParser(requireNonNull(typeManager, "typeManager is null"), config)); } @Override @@ -226,4 +236,24 @@ protected SchemaParser delegate() return delegate.get(); } } + + private static class ConfluentDesciptorProviderModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(DescriptorProvider.class).to(ConfluentDescriptorProvider.class).in(SINGLETON); + } + } + + private static class DummyDescriptorProviderModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(DescriptorProvider.class).to(DummyDescriptorProvider.class).in(SINGLETON); + } + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplierModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplierModule.java index cd34f9843856..4509db5cd767 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplierModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplierModule.java @@ -14,13 +14,19 @@ package io.trino.plugin.kafka.schema.file; import com.google.inject.Binder; +import com.google.inject.Module; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.decoder.DecoderModule; +import io.trino.decoder.protobuf.DescriptorProvider; +import io.trino.decoder.protobuf.FileDescriptorProvider; import io.trino.plugin.kafka.encoder.EncoderModule; import io.trino.plugin.kafka.schema.ContentSchemaReader; +import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig; import io.trino.plugin.kafka.schema.TableDescriptionSupplier; +import static com.google.inject.Scopes.SINGLETON; +import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; public class FileTableDescriptionSupplierModule @@ -34,5 +40,20 @@ protected void setup(Binder binder) install(new DecoderModule()); install(new EncoderModule()); binder.bind(ContentSchemaReader.class).to(FileContentSchemaReader.class).in(Scopes.SINGLETON); + + configBinder(binder).bindConfig(ProtobufAnySupportConfig.class); + install(conditionalModule(ProtobufAnySupportConfig.class, + ProtobufAnySupportConfig::isProtobufAnySupportEnabled, + new FileDescriptorProviderModule())); + } + + private static class FileDescriptorProviderModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(DescriptorProvider.class).to(FileDescriptorProvider.class).in(SINGLETON); + } } } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index e8a104b2d3dc..6b9f6d8d1024 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; +import com.google.protobuf.Any; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; @@ -35,6 +36,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.testng.annotations.Test; +import java.io.File; +import java.net.URI; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -265,6 +268,60 @@ public void testOneof() """.formatted(stringData)); } + @Test + public void testAny() + throws Exception + { + String topic = "topic-schema-with-any"; + assertNotExists(topic); + + Descriptor structuralDataTypesDescriptor = getDescriptor("structural_datatypes.proto"); + + Timestamp timestamp = getTimestamp(sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923"))); + DynamicMessage structuralDataTypeMessage = buildDynamicMessage( + structuralDataTypesDescriptor, + ImmutableMap.builder() + .put("list", ImmutableList.of("Search")) + .put("map", ImmutableList.of(buildDynamicMessage( + structuralDataTypesDescriptor.findFieldByName("map").getMessageType(), + ImmutableMap.of("key", "Key1", "value", "Value1")))) + .put("row", ImmutableMap.builder() + .put("string_column", "Trino") + .put("integer_column", 1) + .put("long_column", 493857959588286460L) + .put("double_column", 3.14159265358979323846) + .put("float_column", 3.14f) + .put("boolean_column", true) + .put("number_column", structuralDataTypesDescriptor.findEnumTypeByName("Number").findValueByName("ONE")) + .put("timestamp_column", timestamp) + .put("bytes_column", "Trino".getBytes(UTF_8)) + .buildOrThrow()) + .buildOrThrow()); + + ProtobufSchema schema = (ProtobufSchema) new ProtobufSchemaProvider().parseSchema(Resources.toString(getResource("protobuf/test_any.proto"), UTF_8), List.of(), true).get(); + + // Get URI of parent directory of the descriptor file + // Any.pack concatenates the message type's full name to the given prefix + URI anySchemaTypeUrl = new File(Resources.getResource("protobuf/any/structural_datatypes/schema").getFile()).getParentFile().toURI(); + Descriptor descriptor = schema.toDescriptor(); + DynamicMessage message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("id"), 1) + .setField(descriptor.findFieldByName("anyMessage"), Any.pack(structuralDataTypeMessage, anySchemaTypeUrl.toString())) + .build(); + + ImmutableList.Builder> producerRecordBuilder = ImmutableList.builder(); + producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message)); + List> messages = producerRecordBuilder.build(); + testingKafka.sendMessages(messages.stream(), producerProperties()); + waitUntilTableExists(topic); + + URI anySchemaFile = new File(Resources.getResource("protobuf/any/structural_datatypes/schema").getFile()).toURI(); + assertThat(query(format("SELECT id, anyMessage FROM %s", toDoubleQuoted(topic)))) + .matches(""" + VALUES (1, JSON '{"@type":"%s","list":["Search"],"map":{"Key1":"Value1"},"row":{"booleanColumn":true,"bytesColumn":"VHJpbm8=","doubleColumn":3.141592653589793,"floatColumn":3.14,"integerColumn":1,"longColumn":"493857959588286460","numberColumn":"ONE","stringColumn":"Trino","timestampColumn":"2020-12-12T15:35:45.923Z"}}') + """.formatted(anySchemaFile)); + } + private DynamicMessage buildDynamicMessage(Descriptor descriptor, Map data) { DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java index 5225bde3a538..629510a3254e 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java @@ -47,6 +47,7 @@ public void preInit(DistributedQueryRunner queryRunner) Map properties = new HashMap<>(extraKafkaProperties); properties.putIfAbsent("kafka.table-description-supplier", "confluent"); properties.putIfAbsent("kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString()); + properties.putIfAbsent("kafka.protobuf-any-support-enabled", "true"); setExtraKafkaProperties(properties); } } diff --git a/plugin/trino-kafka/src/test/resources/protobuf/any/structural_datatypes/schema b/plugin/trino-kafka/src/test/resources/protobuf/any/structural_datatypes/schema new file mode 100644 index 000000000000..854bd674dec5 --- /dev/null +++ b/plugin/trino-kafka/src/test/resources/protobuf/any/structural_datatypes/schema @@ -0,0 +1,31 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message schema { + repeated string list = 1; + map map = 2; + enum Number { + ZERO = 0; + ONE = 1; + TWO = 2; + }; + message Row { + string string_column = 1; + uint32 integer_column = 2; + uint64 long_column = 3; + double double_column = 4; + float float_column = 5; + bool boolean_column = 6; + Number number_column = 7; + google.protobuf.Timestamp timestamp_column = 8; + bytes bytes_column = 9; + }; + Row row = 3; + message NestedRow { + repeated Row nested_list = 1; + map nested_map = 2; + Row row = 3; + }; + NestedRow nested_row = 4; +} diff --git a/plugin/trino-kafka/src/test/resources/protobuf/test_any.proto b/plugin/trino-kafka/src/test/resources/protobuf/test_any.proto new file mode 100644 index 000000000000..748764be4f43 --- /dev/null +++ b/plugin/trino-kafka/src/test/resources/protobuf/test_any.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +import "google/protobuf/any.proto"; + +message schema { + int32 id = 1; + google.protobuf.Any anyMessage = 2; +}