-
Notifications
You must be signed in to change notification settings - Fork 652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Rebalance events behavior for static membership and fix for consu… #757
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff!
for run { | ||
c.Poll(100) | ||
select { | ||
case <-doneChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will block until doneChan is written to, which means if Poll() is triggered once for some other reason, e.g., a stats callback, then it will hang here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use int(C.rd_kafka_queue_length(c.handle.rkq)) == 0
to exist the loop which is same with the previous way?
for run {
c.Poll(100)
select {
case <-doneChan:
run = false
default:
if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
run = false
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I'm also confused why it will hang, I tested with this code with the existing change in this PR, it doesn't hang
func TestConsumerClose1(t *testing.T) {
c, err := NewConsumer(&ConfigMap{
"group.id": "gotest"})
if err != nil {
t.Fatalf("%s", err)
}
startClose := time.Now()
err = c.Close()
if err != nil {
panic(err)
}
fmt.Println(fmt.Sprintf("consumer closed. closing time: %v", time.Since(startClose)))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with the following sequence of events it will hang:
- application calls Close()
- Close() is blocking on c.Poll()
- A stats event (or something else which is not a rebalance event) triggers the Poll and Poll() wakes up and returns
- Close() is now blocking on doneChan
- librdkafka emits a rebalance event and waits for the app/go to ack it
- .. but since Close() is no longer calling Poll() the rebalance event is never handled.
- indefinite hang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is easily solved by moving the Poll to the default case in the select, like in other places in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for run {
select {
case <-doneChan:
**-------------->// Is it possible that after the doneChan, there are still other events we need to handle? We always have to wait to read the doneChan value. If we exit from default, the doneChan will report some error**
run = false
default:
c.Poll(100)
if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
run = false
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, when doneChan is done we don't need to call Poll no more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the queue_length() check needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it, I already removed it from my code.
kafka/consumer_test.go
Outdated
switch ev := event.(type) { | ||
case AssignedPartitions: | ||
atomic.AddInt32(&assignedEvents, 1) | ||
fmt.Fprintf(os.Stderr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Tests should use t.Logf if possible, but that might be tricky here in the callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, is there any good way to use t *testing.T
in the callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, really. Go 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do all existing tests pass now?
@@ -429,3 +431,277 @@ func TestConsumerLog(t *testing.T) { | |||
} | |||
} | |||
} | |||
|
|||
func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 on the wrapping to provide the testing.T
kafka/consumer_test.go
Outdated
return | ||
} | ||
|
||
wg := &sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to keep this as a non-pointer here, and then pass it as pointer when needed (to testPoll()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.
For example:
wg.Add(1)
wg.Wait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pointer receiver methods apply also to non pointer variables, based on this.
So pointer should be needed to pass the WaitGroup struct by reference, but not to call its methods with pointer receiver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.
I still believe it is more correct to keep it as a non-pointer here and pass it as a pointer where needed.
But it is a nit, so do as you wish.
Yes:)
|
…mer close takes longer than 10s
fe62e71
to
f9ff23f
Compare
This is the CI test: https://app.travis-ci.com/github/confluentinc/confluent-kafka-go/builds/249581975 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still some unaddressed comments in the test file.
kafka/consumer_test.go
Outdated
return | ||
} | ||
|
||
wg := &sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.
I still believe it is more correct to keep it as a non-pointer here and pass it as a pointer where needed.
But it is a nit, so do as you wish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Is there a plan to make a release with this fix? |
Yes, this one will be released soon. |
…mer close takes longer than 10s
Manually tested for the
Close
, before this fix, the consumer takes more than 10s, with this fix, closing time: 730.602µs