Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions rfc/rfc-57/rfc-57.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# RFC-57: DeltaStreamer Protobuf Support



## Proposers

- @the-other-tim-brown

## Approvers
- @bhasudha
- @vinothchandar

## Status

JIRA: https://issues.apache.org/jira/browse/HUDI-4399

> Please keep the status updated in `rfc/README.md`.

## Abstract

Support consuming Protobuf messages from Kafka with the DeltaStreamer.

## Background
Hudi's DeltaStreamer currently supports consuming Avro and JSON data from Kafka but it does not support Protobuf. Adding support will require:
1. Parsing the data from Kafka into Protobuf Messages
2. Generating a schema from a Protobuf Message class
3. Converting from Protobuf to Avro and Row

## Implementation

### Parsing Data from Kafka
Users will provide a classname for the Protobuf Message that is contained within a jar that is on the path. We will then implement a deserializer that parses the bytes from the kafka message into a protobuf Message.

Configuration options:
hoodie.deltastreamer.schemaprovider.proto.className - The class to use
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be helpful if we can support cases where the schema is present in Schema registry, this way users don't have to provide the class name every time. Similar to what we do today for Avro source.

Or should this be taken into the next cut ? wdyt @codope @vinothchandar ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have experience using that confluent value deserializer? We can add that in as an option but I don't have experience with it so may need your help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's take it in next cut

Copy link
Member

@codope codope Aug 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown Let's track it in the epic. Can you please add all tasks for this RFC in the epic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


### ProtobufClassBasedSchemaProvider
This new SchemaProvider will allow the user to provide a Protobuf Message class and get an Avro Schema. In the proto world, there is no concept of a nullable field so people use wrapper types such as Int32Value and StringValue to represent a nullable field. The schema provider will also allow the user to treat these wrapper fields as nullable versions of the fields they are wrapping instead of treating them as a nested message. In practice, this means that the user can choose between representing a field `Int32Value my_int = 1;` as `my_int.value` or simply `my_int` when writing the data out to the file system.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's elaborate more on certain schema aspects. You have already covered nullable fields. Other things to consider:

  1. Unsigned types: Avro doesn't support unsigned types. So, probably Long type.
  2. Schema evolution: Both handle schema evolution differently. I think adding and removing fields should be ok as long as we consider default value in avro while converting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more details on both points to the RFC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we also taken into consideration nested Message types here ? how would that map here ?
Also repeated fields like repeated Message type, ENUMS etc.

May be we call that out explicitly as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a mapping of proto to avro types here


#### Field Mappings
Protobuf -> Avro
bool -> boolean
float -> float
double -> double
enum -> enum
string -> string
bytes -> bytes
int32 -> int
sint32 -> int
fixed32 -> int
sfixed32 -> int
uint32 -> long [1]
int64 -> long
uint64 -> long
sint64 -> long
fixed64 -> long
sfixed64 -> long
message -> record [2]
repeated -> array
map -> array [3]


[1] Handling of Unsigned Integers and Longs: Protobuf provides support for unsigned integers and longs while Avro does not. The schema provider will convert unsigned integers and longs to Avro long type in the schema definition.
[2] All messages will be translated to a union[null, record] with a default of null.
[3] Protobuf maps allow non-string keys while avro does not so we convert maps to an array of records containing a key and a value.

#### Schema Evolution
**Adding a Field:**
Protobuf has a default value for all fields and the translation from proto to avro schema will carry over this default value so there are no errors when adding a new field to the proto definition.
**Removing a Field:**
If a user removes a field in the Protobuf schema, the schema provider will not be able to add this field to the avro schema it generates. To avoid issues when writing data, users must use `hoodie.datasource.write.reconcile.schema=true` to properly reconcile the schemas if a field is removed from the proto definition. Users can avoid this situation by using `deprecated` field option in proto instead of removing the field from the schema.

Configuration Options:
hoodie.deltastreamer.schemaprovider.proto.className - The class to use
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this work when we want to read from Multiple Kafka topics ? for example use MultiDeltaStreamer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the MultiDeltaStreamer is that the tables are configured individually. I would expect that means you can configure a schema provider and in our case, proto class, for each of the tables.

hoodie.deltastreamer.schemaprovider.proto.flattenWrappers (Default: false) - By default the wrapper classes will be treated like any other message and have a nested `value` field. When this is set to true, we do not have a nested `value` field and treat the field as nullable in the generated Schema

### ProtoToAvroConverter and ProtoToRowConverter

A utility will be provided that can take in a Protobuf Message and convert it to an Avro GenericRecord. This will be used inside the SourceFormatAdapter to properly convert to an avro RDD. This change will be adding a new `Source.SourceType` as well so other sources in the future can implement this source type, for example Protobuf messages on PubSub.

To convert to `Dataset<Row>` we will initially convert to Avro and then to Row to reduce the amount of changes required to initially ship the feature. Then we will do a fast-follow with a direct proto to row conversion that follows a similar approach as the proto to avro converter.

Special handling for maps:
Protobuf allows you to use any integral or string type as a key while Avro requires a string. To account for this, we convert all maps to lists of entries in that map.

## Rollout/Adoption Plan

This change simply adds new functionality and will not impact an existing users. The changes to the SourceFormatAdapter will only impact Proto source types and this is the first of its kind. Users will need to start running a new DeltaStreamer to get this functionality.

## Test Plan

- The new source will have testing that mirrors what we are currently doing for the TestJsonKafkaSource. This will exercise both reading the records from kafka into an `JavaRDD<GenericRecord>` and a `DataSet<Row>`
- The converter code will also be executed on the above path, but it will be more thoroughly tested in its own unit tests. The unit tests will use a sample protobuf message that covers nested messages, primitive, and wrapped field types.
- The schema provider will similarly be tested within its own unit tests to validate the behavior matches expectations.