From 6fe3bb708ccd91d06e52f7e3992017d699819c5e Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 17 Aug 2018 11:49:29 +0800 Subject: [PATCH 1/2] set socket timeout for the wake_w --- kafka/client_async.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 5a16f6bba..2efbea645 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -151,6 +151,7 @@ class KafkaClient(object): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, + 'max_block_ms': 60000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -198,6 +199,7 @@ def __init__(self, **configs): self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._wake_w.settimeout(self.config['max_block_ms'] / 1000.0) self._wake_lock = threading.Lock() self._lock = threading.RLock() @@ -847,6 +849,9 @@ def wakeup(self): with self._wake_lock: try: self._wake_w.sendall(b'x') + except socket.timeout: + log.warning('Timeout to send to wakeup socket!') + raise Errors.KafkaTimeoutError() except socket.error: log.warning('Unable to send to wakeup socket!') From 0fbe6a574701dfef6b893b9d1c353850b95b70f0 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 3 Sep 2018 15:36:20 +0800 Subject: [PATCH 2/2] add wakeup_timeout_ms instead of max_block_ms in KafkaClient --- kafka/client_async.py | 4 ++-- kafka/producer/kafka.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 2efbea645..5c9e07cf4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -151,7 +151,7 @@ class KafkaClient(object): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, - 'max_block_ms': 60000, + 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -199,7 +199,7 @@ def __init__(self, **configs): self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) - self._wake_w.settimeout(self.config['max_block_ms'] / 1000.0) + self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) self._wake_lock = threading.Lock() self._lock = threading.RLock() diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 719acef59..45b3a3374 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -364,6 +364,7 @@ def __init__(self, **configs): self._metrics = Metrics(metric_config, reporters) client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], **self.config) # Get auto-discovered version from client if necessary