Skip to content

Commit 5253d39

Browse files
wbarnhashimonturjeman
authored andcommitted
client_async: Allow throwing an exception upon socket error during (dpkp#134)
wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis <[email protected]> Co-authored-by: shimon-armis <[email protected]>
1 parent 3714dee commit 5253d39

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

kafka/client_async.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class KafkaClient(object):
154154
sasl mechanism handshake. Default: one of bootstrap servers
155155
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
156156
instance. (See kafka.oauth.abstract). Default: None
157+
raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception
158+
upon socket error during wakeup(). Default: False
157159
"""
158160

159161
DEFAULT_CONFIG = {
@@ -192,7 +194,8 @@ class KafkaClient(object):
192194
'sasl_plain_password': None,
193195
'sasl_kerberos_service_name': 'kafka',
194196
'sasl_kerberos_domain_name': None,
195-
'sasl_oauth_token_provider': None
197+
'sasl_oauth_token_provider': None,
198+
'raise_upon_socket_err_during_wakeup': False
196199
}
197200

198201
def __init__(self, **configs):
@@ -243,6 +246,8 @@ def __init__(self, **configs):
243246
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
244247
self.config['api_version'] = self.check_version(timeout=check_timeout)
245248

249+
self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup']
250+
246251
def _can_bootstrap(self):
247252
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
248253
backoff_factor = 2 ** effective_failures
@@ -936,8 +941,10 @@ def wakeup(self):
936941
except socket.timeout:
937942
log.warning('Timeout to send to wakeup socket!')
938943
raise Errors.KafkaTimeoutError()
939-
except socket.error:
944+
except socket.error as e:
940945
log.warning('Unable to send to wakeup socket!')
946+
if self._raise_upon_socket_err_during_wakeup:
947+
raise e
941948

942949
def _clear_wake_fd(self):
943950
# reading from wake socket should only happen in a single thread

0 commit comments

Comments
 (0)