Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should consumer.poll() first verify the consumer isn't closed? #1219

Closed
jeffwidman opened this issue Sep 17, 2017 · 2 comments · Fixed by #1978
Closed

Should consumer.poll() first verify the consumer isn't closed? #1219

jeffwidman opened this issue Sep 17, 2017 · 2 comments · Fixed by #1978
Labels

Comments

@jeffwidman
Copy link
Collaborator

jeffwidman commented Sep 17, 2017

Is it expected behavior that, depending on the choice of selector, you can successfully poll for messages after closing the consumer?

If you call consumer.close() followed by consumer.poll() that you'll generally (but not always) get an exception... The exception is thrown by the underlying selector implementation. EpollSelector and SelectSelector throw an error about the file descriptor being closed.

However, I noticed that PollSelector does not throw any error, and in a surprising twist, I can continue to consume messages by calling consumer.poll() after calling consumer.close(). I specifically observed this behavior within a docker container while running some simple tests, as we need to pin the selector due to a gevent issue at my day job:

from kafka import KafkaConsumer, KafkaProducer
from kafka.client_async import selectors

bootstrap_server = '172.18.0.6'  # docker IP

kp = KafkaProducer(bootstrap_servers=[bootstrap_server])

kp.send('test_topic', 'auto create the topic')
kp.flush()

kc = KafkaConsumer('test_topic',
                group_id='testing-kp-selectors',
                bootstrap_servers=[bootstrap_server],
                enable_auto_commit=False,
                selector=selectors.PollSelector)

# force the connection to kafka
p0 = kc.poll(timeout_ms=1000)

# start the test
kp.send('test_topic', 't1')
kp.flush()

# confirm poll successful
p1 = kc.poll(timeout_ms=1000)

assert p1.values()[0][0].value == 't1'

kc.close()

# send message after consumer closed
kp.send('test_topic', 't2')
kp.flush()

# continue consuming from closed consumer
p2 = kc.poll(timeout_ms=1000)
assert p2.values()[0][0].value == 't2'

print("Finished")

Should we change this behavior?

On the one hand, it seems like a natural safeguard that the client should throw an straightforward human-readable error if the connection is closed, rather than an indirect error about an unavailable file descriptor, or, in the case of PollSelector, continuing to return records.

On the other hand, this check would run on every single poll() call, and I'm loathe to waste the world's compute energy on something that is only hit in edge cases.

Thoughts?

@dpkp dpkp added the consumer label Oct 22, 2017
@jeffwidman
Copy link
Collaborator Author

jeffwidman commented May 31, 2019

I just tested this with EpollSelector and it loops forever rather than raising ValueError.

I'm not sure which of the changes over the past year and a half would have changed this behavior.

I do think it is more appropriate that trying to poll() from a closed consumer should raise an exception--not sure if this isn't happening due to an edgecase in the design or an actual code bug.

Regardless, it doesn't seem like anyone is interested in tracking this down so closing for now...

@jeffwidman
Copy link
Collaborator Author

jeffwidman commented May 31, 2019

Actually, re-opening this... I think it's probably worth doing at some point that if there are no active connections because the consumer/producer/client was closed, then an exception should be thrown--I suspect it'd be fairly straightforward to add.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants