-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
set socket timeout for the wake_w #1577
Conversation
5736393
to
891f927
Compare
891f927
to
6fe3bb7
Compare
kafka/client_async.py
Outdated
@@ -198,6 +199,7 @@ def __init__(self, **configs): | |||
self._bootstrap_fails = 0 | |||
self._wake_r, self._wake_w = socket.socketpair() | |||
self._wake_r.setblocking(False) | |||
self._wake_w.settimeout(self.config['max_block_ms'] / 1000.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember we set setblocking(True) in code sometimes. That operation resets timeout if I remember correctly. Could you make sure we don't do it on this socket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, here is all the setblocking()
calls in 1.4.3:
$ git grep 'setblocking'
kafka/client_async.py: self._wake_r.setblocking(False)
kafka/conn.py: self._sock.setblocking(False)
kafka/vendor/socketpair.py: # setblocking(False) that prevents us from having to create a thread.
kafka/vendor/socketpair.py: csock.setblocking(False)
kafka/vendor/socketpair.py: csock.setblocking(True)
the csock in socketpair.py is called with setblocking(True)
before returning as self._wake_w
in KafkaClient.__init__()
, but not get called with setblocking()
again.
from kafka import KafkaProducer
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
KAFKA_CONFIG = {
'batch_size': 163840,
'linger_ms': 10000, # enlarge the linger_ms to run into the batch_is_full condition
'max_block_ms': 10,
'reconnect_backoff_ms': 300,
}
producer = KafkaProducer(bootstrap_servers='kafka03.dev.rack.xxxxxxx.com:3390', **KAFKA_CONFIG)
i = 0
while True:
producer.send('log.franz.test', b'some_message_bytes')
i += 1
if i % 1000 == 0:
print('1000')
this is the script to reproduce the hangs, when this script is looping, close the wifi connection. and we will see it hangs, and get this traceback on pressing ^C:
after setting this socket timeout, this script can quit immediately after max_block_ms
.
Thanks for the PR! I have a few thoughts here: (1) We should definitely add a timeout to the (2) Are you expecting the KafkaProducer to raise an exception if there is a connection error? That is not our current design and I am hesitant to add that without a lot of thought. I would prefer to log a warning, the same as we do on (3) But so in normal operation the wake socket pair should be drained very frequently and we should not see blocked writes. So the fact that this happens at all is strange. It suggests that the background producer thread is blocked (or crashed) somewhere as well. I would love to figure out where that is and fix it, because that seems like the root cause. In your local testing can you print Thanks again for helping improve this library! |
Thank you for your advice! (1) It's much better to separate the concern between the (2) In fact this PR did not introduce any new Exception for KafkaProducer, In my opinion, it's better to fail fast & fail loudly in the production systems. The cascading failure caused by timeout is hard to diagnose, the available backend instances would drained quick by timeout, incoming requests queued in the proxy, and the huge queued requests enlarged the load pressure on the backend instances. Even a timeout as small as 100ms can overload the backend instances quickly, because timeout reduced available computing resources. As far as we know, we can escape this by the Circuit Breaker pattern, which requires an exception to catch and break. (3) Agree, the root cause might be the sender thread hangs by some reason, so it can not execute the event loop in time to consume the wakeup() IO event. In my understanding, the KafkaSender works in another thread to decouple the KafkaProducer from unpredicatable network errors. The sender threads have some sync operations, but it's ok because it works in a seperate thread which would not block the user threads, but the sync operations might block the event loop from consuming the Here is the sender thread's backtrace as it prints:
Regards. |
nudge @dpkp ^^ |
hi Dana,
we're running kafka-python on a SNS site in china, yesterday we've met an issue about the kafka host hangs because of some kernel gotchas, and the application containers hangs after the kafka, thus the site is down before we restarted the kafka.
after some investigation, we found the writing side of the
wake_w
socketpair is blocking, and with no socket timeout. if the sender thread can not executewake_r.receive
but got blocked,wake_w.sendall(b'x')
would block theproducer.send()
method without timeout. we have a config option aboutmax_block_ms
which works onbuffer is full or metadata unavailable
, but have not worked on thewake_w.send
yet.we'd prefer the
wake_w.send
method fails fast with a timeout error instead of blocking forever on the producer thread, so we can save ourselves from hanging all the containers by a CircuitBreaker.thank you
This change is