diff --git a/kafka/client_async.py b/kafka/client_async.py index 5a16f6bba..5c9e07cf4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -151,6 +151,7 @@ class KafkaClient(object): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, + 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -198,6 +199,7 @@ def __init__(self, **configs): self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) self._wake_lock = threading.Lock() self._lock = threading.RLock() @@ -847,6 +849,9 @@ def wakeup(self): with self._wake_lock: try: self._wake_w.sendall(b'x') + except socket.timeout: + log.warning('Timeout to send to wakeup socket!') + raise Errors.KafkaTimeoutError() except socket.error: log.warning('Unable to send to wakeup socket!') diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 719acef59..45b3a3374 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -364,6 +364,7 @@ def __init__(self, **configs): self._metrics = Metrics(metric_config, reporters) client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], **self.config) # Get auto-discovered version from client if necessary