-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 #14567
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, Chris. Thanks for this PR, we have also made similar functions based on version 2.8.1. Here I left a minor comment.
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() | ||
.filter(this::isUncommitted).count()); | ||
|
||
log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicPartitionOffsets
here maybe not a correct offset for committed offsets(actually needs to plus 1), right? So the log here may be misleading. I think the line 285 is correct. Maybe the two places can retain one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Considering they're both at TRACE
level, I think it's fine to remove this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @C0urante. I was also tracking this one. Left few minor comments.
void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { | ||
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); | ||
consumer.assign(topicPartitionOffsets.keySet()); | ||
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() | |
log.debug("Found {} uncommitted partitions.", topicPartitionOffsets.values().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the existing log is fine--can you elaborate on why you believe this change is necessary? Logging at INFO level seems okay since this should happen once per task lifecycle, and "Starting with" also seems warranted given that this should only occur during MirrorSourceTask::start
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought that we already have an info level message at the end of MirrorSourceTask::start
and this message was not that useful, but I don't have a strong opinion, so I'm fine with leaving it as it is.
@@ -302,6 +320,10 @@ private static int byteSize(byte[] bytes) { | |||
} | |||
} | |||
|
|||
private boolean isUncommitted(Long offset) { | |||
return offset == null || offset < 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like unwrapOffset
takes care of the null check. We can simply use the primitive type here and check equal to -1L (it can't have any other value). Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked this suggestion initially and played around with changing MirrorUtils::unwrapOffset
to return a primitive long
type, but we end up being forced to convert offsets back to boxed Long
types when using them to populate a Map<TopicPartition, Long>
in MirrorSourceTask::loadOffsets
.
IMO, without the ability to guarantee via the type system that values are non-null, we should try to handle them in cases like this, so I'd prefer to keep it as-is. Let me know if there are any implications I'm missing that might still warrant a direct comparison to -1L, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Thanks for exploring that suggestion.
LGTM, thanks for this PR! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @C0urante, LGTM. Thanks.
void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { | ||
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); | ||
consumer.assign(topicPartitionOffsets.keySet()); | ||
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought that we already have an info level message at the end of MirrorSourceTask::start
and this message was not that useful, but I don't have a strong opinion, so I'm fine with leaving it as it is.
@@ -302,6 +320,10 @@ private static int byteSize(byte[] bytes) { | |||
} | |||
} | |||
|
|||
private boolean isUncommitted(Long offset) { | |||
return offset == null || offset < 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Thanks for exploring that suggestion.
@gharris1727 @showuon @mimaison would any of you have time to take a look? Hoping we can get this one merged in time for 3.7.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks @C0urante for taking this over the finish line.
Also thanks to @Justinwins and @blacktooth for the earlier PRs, @yuz10 for review, and @electrical for stitching together the history of this change.
Thanks Greg! |
…#14567) Reviewers: hudeqi <[email protected]>, Federico Valeri <[email protected]>, Greg Harris <[email protected]>
…apache#14567) Reviewers: hudeqi <[email protected]>, Federico Valeri <[email protected]>, Greg Harris <[email protected]>
…apache#14567) Reviewers: hudeqi <[email protected]>, Federico Valeri <[email protected]>, Greg Harris <[email protected]>
…apache#14567) Reviewers: hudeqi <[email protected]>, Federico Valeri <[email protected]>, Greg Harris <[email protected]>
…apache#14567) Reviewers: hudeqi <[email protected]>, Federico Valeri <[email protected]>, Greg Harris <[email protected]>
Jira
Based off of #13905, which itself appears to be based off of #12358. The only changes applied here are the ones proposed during review of #13905.
This PR tweaks how the
MirrorSourceTask
class interacts with its consumer on startup. With this change, instead of manually seeking to offset 0 when no committed offset for a topic partition is found, no seek is performed at all. This allows users to configure MM2 to begin replicating from the ends of topics (as opposed to the beginning, which is and remains the default behavior) by configuring its consumer withauto.offset.reset
set tolatest
.Committer Checklist (excluded from commit message)