-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[Hudi 73] Adding support for vanilla AvroKafkaSource #2380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2380 +/- ##
============================================
- Coverage 52.20% 52.01% -0.19%
- Complexity 2659 2660 +1
============================================
Files 335 338 +3
Lines 14981 15043 +62
Branches 1505 1509 +4
============================================
+ Hits 7821 7825 +4
- Misses 6535 6593 +58
Partials 625 625
Flags with carried forward coverage won't be shown. Click here to find out more.
|
|
@afilipchik @vinothchandar : I have taken a stab at #1565. I did not have permission to update Pratyaksh's repo, hence created a new one. Basically, AbstractHoodieKafkaAvroDeserializer initializes SchemaProvider based on configs to fetch source scheme and target schema. In other words I have combined #1562 and #1565 If deserialize() is called w/ reader schema, same is used. If not, the one from schema provider is used. In either case, writer schema is fetched from schema provider. In previous patch from Pratyaksh, we were using the passed in schema as both reader and writer schema and hence schema evolution could run into issues. But AbstractHoodieKafkaAvroDeserializer is inspired from Confluent schema-registry repo. I am not sure how to make this generic(as of now assumes the schema id at the beginning, followed by length and data). I haven't worked w/ schema registries nor w/ kafka/avro before. Will have to do research on what other ways we could deser kafka avro data. But as per Pratyaksh's comment, looks like usage of non schema registry flows are discouraged in general. So, not sure how much value we could add by supporting all diff ways to deser kafka avro data (i.e if not for confluent way). Let me know your thoughts. I am looking to get this into 0.7.0 (will be cutting a release in a weeks time). So, would appreciate if you can respond whenever you can. |
|
@afilipchik could you take a pass at this PR this week? |
| return deserialize(null, null, payload, readerSchema); | ||
| } | ||
|
|
||
| protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original AbstractKafkaAvroDeserializer decoder has a check
if (payload == null) {
return null;
}
Is removal of this check safe?
| props.put("value.deserializer", KafkaAvroDeserializer.class); | ||
| } else { | ||
| DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_FILE_PROP)); | ||
| props.put("value.deserializer", HoodieKafkaAvroDecoder.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have an ability to configure which decoder class is to use for value.deserializer to be able to handle internal data decodings specifics.
| return deserialize(null, null, payload, readerSchema); | ||
| } | ||
|
|
||
| protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know where readerSchema come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be coming from the configured schema provider.
| protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) { | ||
| try { | ||
| ByteBuffer buffer = this.getByteBuffer(payload); | ||
| int id = buffer.getInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assumes the message starts with schema version (code looks like from Confluent deserializer). It doesn't belong to AbstractHoodieKafkaAvroDeserializer.java
|
On making AbstractHoodieKafkaAvroDeserializer abstract - it looks like modified Confluent deserializer, so it believe it should be called like that. If we want to support Confluent schema registry we need to use schema id to acquire writer's schema, otherwise schema evolutions will be a pain. I.E. to deserialize an avro message we need 2 things: schema it was written with (can come from properties, but with Confluent id in the beginning of the message tells you exact version and can be used to fetch it from the schema registry) and the reader schema (schema on the reader side which comes from the schema provider) |
|
Closing this in favor of #2598 |
What is the purpose of the pull request
Redo of #1565
Brief change log
Verify this pull request
This change added tests and can be verified as follows:
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.