-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-73]: implemented vanilla AvroKafkaSource #1565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1565 +/- ##
============================================
- Coverage 71.82% 71.79% -0.04%
- Complexity 294 303 +9
============================================
Files 383 386 +3
Lines 16549 16598 +49
Branches 1663 1667 +4
============================================
+ Hits 11887 11916 +29
- Misses 3930 3946 +16
- Partials 732 736 +4
Continue to review full report at Codecov.
|
| protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) { | ||
| try { | ||
| ByteBuffer buffer = this.getByteBuffer(payload); | ||
| int id = buffer.getInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks specific to confluent schema registry. Is it a good idea to assume all the messages will start with magic int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it also look like older version of https://github.com/confluentinc/schema-registry/commits/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java
any reason it it not inspired by the latest version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks specific to confluent schema registry. Is it a good idea to assume all the messages will start with magic int?
Ok, the actual purpose of this PR was to support kafka installations without schema-registry. I messed it up at this point. Thank you for pointing this out. Will handle this accordingly.
any reason it it not inspired by the latest version?
No specific reason. The implementation is inspired from the kafka-avro-serializer version we are using currently. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar @afilipchik I explored a bit around this. Deserialization process has to happen exactly opposite to how the data was serialized. Also it is highly discouraged to not use schema-registry when working with avro and kafka [1][2][3]
That said, I guess if some one wants to use vanilla AvroKafkaSource, he will have to serialize the data without actual schema-registry setup. Hence as a matter of practice and a good alternative can be mandating use of MockSchemaRegistryClient.java class for the same and hence the code written above specific to schema-registry should be fine.
I am suggesting use of MockSchemaRegistryClient.java class since that has been used by Confluent for writing their test cases and this will bring a standard practice to follow for using HoodieKafkaAvroDecoder.java class introduced with this PR.
If we agree to this, I will mention the same as a comment in the code above.
[1] confluentinc/confluent-kafka-dotnet#530
[2] https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1
[3] https://stackoverflow.com/questions/45635726/kafkaavroserializer-for-serializing-avro-without-schema-registry-url
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are reusing code, we need to be also mindful of updates needed for NOTICE and LICENSE>
| } | ||
|
|
||
| protected Object deserialize(byte[] payload) throws SerializationException { | ||
| return deserialize(null, null, payload, sourceSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like sourceSchema is used both as writer and reader. Why? Avro is painful when writer schema is lost as some evolutions become impossible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not get this point. What do you mean by writer schema getting lost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically what I want to understand here is - what specific advantages you get by having separate writer and reader schemas apart from being able to handle field renames using aliases? I tried to go through avro library, but still am not convinced about this. If you could point me to some useful documentation or blog regarding this, that would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me find it. We had to patch kafka -> avro deser in hudi 0.5 as it was breaking in some situations.
In general based on spec at https://avro.apache.org/docs/current/spec.html:
Data Serialization and Deserialization
Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema.
Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@afilipchik got a chance to explore this?
| result = bytes; | ||
| } else { | ||
| int start = buffer.position() + buffer.arrayOffset(); | ||
| DatumReader reader = this.getDatumReader(sourceSchema, readerSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what the difference here bt sourceSchema and readerSchema ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are both same. :)
|
Overall, this PR is nice in the sense that it let's us read data from Kafka using AVRO, with a fixed schema.. |
|
So handling schema evolutions without schema-registry is going to be really tricky. I tried googling around this stuff, and found the below 2 links. These might be useful in what we want to achieve -
Particularly the second repository aims at serializing and deserializing avro data without schema-registry using Confluent and Avro libraries. At a high level, it looks like they are also not handling schema evolution in their code. I would need some time to go through it in depth though. |
|
Hey, any way to push it over the line? We are upgrading to 0.6 and in the current state evolutions on top of Confluent Schema registry is borked |
|
@pratyakshsharma any comments? do you have cycles in the next couple weeks to get this over finish line. We can target this for 0.7.0 as well . |
|
Hi, I am currently facing issues with my laptop. Replacement is on its way. I will pick this up as soon as I get the new laptop, most probably in another 2 days. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@afilipchik @pratyakshsharma : can you folks summarize whats pending wrt this patch. I can take this up and try to get this in upcoming release. Would appreciate if you can go over what needs to be done to get this to closure.
Here is my understanding so far.
For those who don't want to use schema registry, this patch adds support to use file based schema for deser kafka avro data.
AbstractHoodieKafkaAvroDeserializer is the class that has the core logic to deser. and HoodieKafkaAvroDecoder is the decoder impl which leverages AbstractHoodieKafkaAvroDeserializer.
One downside of this approach is regards to schema evolution, since user has to keep updating the schema as and when required. but based on Pratyaksh's last comment, looks like we can't get past it.
Do I need to research more on these lines? as to how to support schema evolution.
|
@nsivabalan Should we close this PR in favor of the recent Kafka Avro Source that was landed ? |
@n3nash can you please point me to the specific PR you are referring to here? |
|
@pratyakshsharma I'm referring to this commit -> 900de34 |
@n3nash I went through the commit you are referring to. It basically takes care of passing in custom reader schema for deserialization. It does not take care of kafka setup without schema-registry. Still there is some work that needs to be done in this regard. I will sync up with @nsivabalan for the same. |
@nsivabalan Basically we need to figure out/ decide on the below items -
Any thoughts are welcome. :) |
|
@pratyakshsharma Thanks for providing that context. @nsivabalan Once you have shared your thoughts, we can redo the scope of this PR and either rebase and fix this or open a new one. |
|
Closing due to inactivity |
Tips
What is the purpose of the pull request
Supports kafka installations without schema-registry by allowing FileBasedSchemaProvider to be integrated to AvroKafkaSource.
Brief change log
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.