Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,13 @@ public KafkaOffsetGen(TypedProperties props) {
this.props = props;

kafkaParams = new HashMap<>();
for (Object prop : props.keySet()) {
props.keySet().stream().filter(prop -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@li36909 This change does not change any logging unlike what the title of the PR says, can you correct the title and explain this change please ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about change to this: "DeltaStream print many unnecessary warn log because of passing hoodie config to kafka consumer". the warn log is printed by kafkaconsumer. when hudi new the kafka consumer, hudi pass some non-kafka parameter to the kafka comsumer, then lead to these warn log, to solve this problem we just need to filter the hoodie config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I find a UT fail cause by concurrent write to a hudi table, I will try to analyze it later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@li36909 Okay, I see. The current title makes sense to me now. Can you leave the above comment you made in the code as to why we are "removing the hoodie properties" ? Once that is done and the CI is green, we can land this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash I had added comment at the code, thank you

// In order to prevent printing unnecessary warn logs, here filter out the hoodie
// configuration items before passing to kafkaParams
return !prop.toString().startsWith("hoodie.");
}).forEach(prop -> {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
}
});
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name().toLowerCase());
Expand Down