-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[WIP] Fix KafkaAvroSource to use the latest schema #765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
haiminh87
commented
Jun 27, 2019
- When we read data from Kafka, we want to always read with the latest schema.
- This allows us to make assumption throughout the rest of the pipeline that every record has the same schema.
- We create a custom KafkaAvroDecoder that use the latest schema as read schema.
- This does not work with all SchemaProvider yet.
- When we read data from Kafka, we want to always read with the latest schema. - This allows us to make assumption throughout the rest of the pipeline that every record has the same schema. - We create a custom KafkaAvroDecoder that use the latest schema as read schema.
|
cc @pratyakshsharma this has been around for long.. Could you take a look to understand if we still need this and may be retarget this for the next release |
|
ack. Will take a pass. |
|
|
||
| private final Schema sourceSchema; | ||
|
|
||
| public SourceSchemaKafkaAvroDecoder(VerifiableProperties props) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason why you are using VerifiableProperties here and not TypedProperties?
| return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema); | ||
| } | ||
|
|
||
| return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@haiminh87 I was thinking if we could make this configurable in sense that have a boolean like readUsingLatestSchema with a default value of true and can be overridden via TypedProperties instance.
| this.configure(new KafkaAvroDeserializerConfig(props.props())); | ||
|
|
||
| TypedProperties typedProperties = new TypedProperties(); | ||
| copyProperties(typedProperties, props.props()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do away with this function?
|
@haiminh87 Thank you for submitting this PR. Have left few comments. Apart from them, this PR needs a rebase and it would be great if we could add test cases as well. :) |
@vinothchandar this is a good feature to have. Have left few comments. |
pratyakshsharma
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us add test cases, and rebase once @haiminh87
|
@haiminh87 Still working on this? |
|
@pratyakshsharma Nope.. all yours if you want to take a run at it |
Sure, will be working on it next then :) |
|
@vinothchandar I raised #1562 for this feature. I guess we can close this PR then. |