Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consumer methods to run while closing #1073

Conversation

milindl
Copy link
Contributor

@milindl milindl commented Oct 6, 2023

See #1062
#901 (comment)

With #901, such flows were no longer working after the final Close().

rebalanceCb() {
   ...
   ...
   if (partitions_are_being_revoked) { 
     commit(some_offsets) 
   }

This was because the consumer would be counted as closed and commit would return an error.

it's based on the dev_go_flush_ignore_queue_buffering_time since I'm using the same file

kafka/consumer.go Show resolved Hide resolved
kafka/consumer.go Show resolved Hide resolved
@@ -545,6 +553,9 @@ func (c *Consumer) Close() (err error) {
c.Poll(100)
}

// After this point, no more consumer methods may be called.
atomic.StoreUint32(&c.isClosed, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be just before returning? rd_kafka_destroy can stuck due to various reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're destroying an important queue below, which is necessary for most public API to run (it's the queue on which we're getting all the events from, from librdkafka side).

So most functions called after this point will panic due to segfault

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just talking about atomic.StoreUint32(&c.isClosed, 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, storing that will allow a small window of usage after the queue destruction - methods like Poll() can be called until that store is called.

@milindl milindl merged commit 8740152 into dev_go_flush_ignore_queue_buffering_time Oct 18, 2023
@milindl milindl deleted the dev_fix_methods_allowed_in_final_close branch October 18, 2023 14:09
@milindl
Copy link
Contributor Author

milindl commented Oct 18, 2023

It's merged but only in the other branch. I did it on top of that because of test file organization related changes.

@jdemeyer
Copy link
Contributor

Is this by chance related to #1016 ?

@milindl
Copy link
Contributor Author

milindl commented Oct 27, 2023

I'm guessing it would not be. I saw the stack traces in those functions, we've already made it to the librdkafka functions and are waiting for those to return, the internal state of the Go consumer wouldn't come into the picture

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants