Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ Property name Description
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not.
``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_``
``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``.
``kafka.protobuf-any-support-enabled`` Enable support for encoding Protobuf ``any`` types to ``JSON`` by setting the property to ``true``,
defaults to ``false``.
``kafka.timestamp-upper-bound-force-push-down-enabled`` Controls if upper bound timestamp pushdown is enabled for topics using ``CreateTime`` mode.
``kafka.security-protocol`` Security protocol for connection to Kafka cluster; defaults to ``PLAINTEXT``.
``kafka.ssl.keystore.location`` Location of the keystore file.
Expand Down Expand Up @@ -438,7 +440,7 @@ table description supplier are:
* New tables can be defined without a cluster restart.
* Schema updates are detected automatically.
* There is no need to define tables manually.
* Some Protobuf specific types like ``oneof`` are supported and mapped to JSON.
* Some Protobuf specific types like ``oneof`` and ``any`` are supported and mapped to JSON.

When using Protobuf decoder with the Confluent table description supplier, some
additional steps are necessary. For details, refer to :ref:`kafka-requirements`.
Expand Down Expand Up @@ -1453,8 +1455,61 @@ Trino data type Allowed Protobuf data type
``ARRAY`` Protobuf type with ``repeated`` field
``MAP`` ``Map``
``TIMESTAMP`` ``Timestamp``, predefined in ``timestamp.proto``
``JSON`` ``oneof`` (Confluent table supplier only), ``Any``
===================================== =======================================

any
+++

Message types with an `Any <https://protobuf.dev/programming-guides/proto3/#any>`_
field contain an arbitrary serialized message as bytes and a type URL to resolve
that message's type with a scheme of ``file://``, ``http://``, or ``https://``.
The connector reads the contents of the URL to create the type descriptor
for the ``Any`` message and convert the message to JSON. This behavior is enabled
by setting ``kafka.protobuf-any-support-enabled`` to ``true``.

The descriptors for each distinct URL are cached for performance reasons and
any modifications made to the type returned by the URL requires a restart of
Trino.

For example, given the following Protobuf schema which defines ``MyMessage``
with three columns:

.. code-block:: text

syntax = "proto3";

message MyMessage {
string stringColumn = 1;
uint32 integerColumn = 2;
uint64 longColumn = 3;
}

And a separate schema which uses an ``Any`` type which is a packed message
of the above type and a valid URL:

.. code-block:: text

syntax = "proto3";

import "google/protobuf/any.proto";

message schema {
google.protobuf.Any any_message = 1;
}

The corresponding Trino column is named ``any_message`` of type ``JSON``
containing a JSON-serialized representation of the Protobuf message:

.. code-block:: text

{
"@type":"file:///path/to/schemas/MyMessage",
"longColumn":"493857959588286460",
"numberColumn":"ONE",
"stringColumn":"Trino"
}

Protobuf schema evolution
+++++++++++++++++++++++++

Expand All @@ -1481,7 +1536,6 @@ The schema evolution behavior is as follows:
Protobuf limitations
++++++++++++++++++++

* Protobuf specific types like ``any``, ``oneof`` are not supported.
* Protobuf Timestamp has a nanosecond precision but Trino supports
decoding/encoding at microsecond precision.

Expand Down
13 changes: 13 additions & 0 deletions lib/trino-record-decoder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
</properties>

<dependencies>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -63,6 +69,12 @@
<artifactId>slice</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
</dependency>

<!-- Trino SPI -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down Expand Up @@ -116,6 +128,7 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<!-- This is under Confluent Community License and it should not be used with compile scope -->
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.protobuf.Descriptors.Descriptor;
import io.trino.spi.TrinoException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Optional;

import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

public interface DescriptorProvider
{
Optional<Descriptor> getDescriptorFromTypeUrl(String url);

default String getContents(String url)
{
requireNonNull(url, "url is null");
ByteArrayOutputStream typeBytes = new ByteArrayOutputStream();
try (InputStream stream = new URL(url).openStream()) {
stream.transferTo(typeBytes);
}
catch (IOException e) {
throw new TrinoException(GENERIC_USER_ERROR, "Failed to read schema from URL", e);
}
return typeBytes.toString(UTF_8);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.protobuf.Descriptors.Descriptor;

import java.util.Optional;

public class DummyDescriptorProvider
implements DescriptorProvider
{
@Override
public Optional<Descriptor> getDescriptorFromTypeUrl(String url)
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import io.trino.spi.TrinoException;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.decoder.protobuf.ProtobufErrorCode.INVALID_PROTO_FILE;
import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class FileDescriptorProvider
implements DescriptorProvider
{
private final LoadingCache<String, Descriptor> protobufTypeUrlCache;

public FileDescriptorProvider()
{
protobufTypeUrlCache = buildNonEvictableCache(
CacheBuilder.newBuilder().maximumSize(1000),
CacheLoader.from(this::loadDescriptorFromType));
}

@Override
public Optional<Descriptor> getDescriptorFromTypeUrl(String url)
{
try {
requireNonNull(url, "url is null");
return Optional.of(protobufTypeUrlCache.get(url));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private Descriptor loadDescriptorFromType(String url)
{
try {
Descriptor descriptor = ProtobufUtils.getFileDescriptor(getContents(url)).findMessageTypeByName(DEFAULT_MESSAGE);
checkState(descriptor != null, format("Message %s not found", DEFAULT_MESSAGE));
return descriptor;
}
catch (Descriptors.DescriptorValidationException e) {
throw new TrinoException(INVALID_PROTO_FILE, "Unable to parse protobuf schema", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@
*/
package io.trino.decoder.protobuf;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.OneofDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.TypeRegistry;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.decoder.DecoderColumnHandle;
Expand All @@ -45,10 +51,12 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand All @@ -60,6 +68,9 @@

public class ProtobufColumnDecoder
{
// Trino JSON types are expected to be sorted by key
private static final ObjectMapper mapper = JsonMapper.builder().configure(ORDER_MAP_ENTRIES_BY_KEYS, true).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the ordering is relevant please add a commmment why. copy the one from sorted method in this class?

private static final String ANY_TYPE_NAME = "google.protobuf.Any";
private static final Slice EMPTY_JSON = Slices.utf8Slice("{}");

private static final Set<Type> SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of(
Expand All @@ -76,13 +87,15 @@ public class ProtobufColumnDecoder
private final String columnMapping;
private final String columnName;
private final TypeManager typeManager;
private final DescriptorProvider descriptorProvider;
private final Type jsonType;

public ProtobufColumnDecoder(DecoderColumnHandle columnHandle, TypeManager typeManager)
public ProtobufColumnDecoder(DecoderColumnHandle columnHandle, TypeManager typeManager, DescriptorProvider descriptorProvider)
{
try {
requireNonNull(columnHandle, "columnHandle is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.descriptorProvider = requireNonNull(descriptorProvider, "descriptorProvider is null");
this.jsonType = typeManager.getType(new TypeSignature(JSON));
this.columnType = columnHandle.getType();
this.columnMapping = columnHandle.getMapping();
Expand Down Expand Up @@ -141,7 +154,7 @@ public FieldValueProvider decodeField(DynamicMessage dynamicMessage)
}

@Nullable
private static Object locateField(DynamicMessage message, String columnMapping)
private Object locateField(DynamicMessage message, String columnMapping)
{
Object value = message;
Optional<Descriptor> valueDescriptor = Optional.of(message.getDescriptorForType());
Expand All @@ -160,6 +173,11 @@ private static Object locateField(DynamicMessage message, String columnMapping)
value = ((DynamicMessage) value).getField(fieldDescriptor);
valueDescriptor = getDescriptor(fieldDescriptor);
}

if (valueDescriptor.isPresent() && valueDescriptor.get().getFullName().equals(ANY_TYPE_NAME)) {
return createAnyJson((Message) value, valueDescriptor.get());
}

return value;
}

Expand Down Expand Up @@ -206,4 +224,34 @@ private static Object createOneofJson(DynamicMessage message, OneofDescriptor de
}
return EMPTY_JSON;
}

private Object createAnyJson(Message value, Descriptor valueDescriptor)
{
try {
String typeUrl = (String) value.getField(valueDescriptor.findFieldByName("type_url"));
Optional<Descriptor> descriptor = descriptorProvider.getDescriptorFromTypeUrl(typeUrl);
if (descriptor.isPresent()) {
return Slices.utf8Slice(sorted(JsonFormat.printer()
.usingTypeRegistry(TypeRegistry.newBuilder().add(descriptor.get()).build())
.omittingInsignificantWhitespace()
.print(value)));
}
return null;
}
catch (InvalidProtocolBufferException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to print JSON from 'any' message type", e);
}
}

private static String sorted(String json)
{
try {
// Trino JSON types are expected to be sorted by key
// This routine takes an input JSON string and sorts the entire tree by key, including nested maps
return mapper.writeValueAsString(mapper.treeToValue(mapper.readTree(json), Map.class));
}
catch (JsonProcessingException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to process JSON", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ProtobufDecoderModule
public void configure(Binder binder)
{
binder.bind(DynamicMessageProvider.Factory.class).to(FixedSchemaDynamicMessageProvider.Factory.class).in(SINGLETON);
binder.bind(DescriptorProvider.class).to(DummyDescriptorProvider.class).in(SINGLETON);
newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(ProtobufRowDecoder.NAME).to(ProtobufRowDecoderFactory.class).in(SINGLETON);
}
}
Loading