Skip to content

Commit

Permalink
Dont wakeup during maybe_refresh_metadata -- it is only called by pol…
Browse files Browse the repository at this point in the history
…l() (#1769)
  • Loading branch information
dpkp authored Mar 31, 2019
1 parent de6e9d3 commit b1effa2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
8 changes: 4 additions & 4 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def send(self, node_id, request, wakeup=True):
Future: resolves to Response struct or Error
"""
if not self._can_send_request(node_id):
self.maybe_connect(node_id)
self.maybe_connect(node_id, wakeup=wakeup)
return Future().failure(Errors.NodeNotReadyError(node_id))

# conn.send will queue the request internally
Expand Down Expand Up @@ -761,7 +761,7 @@ def add_topic(self, topic):
return self.cluster.request_update()

# This method should be locked when running multi-threaded
def _maybe_refresh_metadata(self):
def _maybe_refresh_metadata(self, wakeup=False):
"""Send a metadata request if needed.
Returns:
Expand Down Expand Up @@ -792,7 +792,7 @@ def _maybe_refresh_metadata(self):
api_version = 0 if self.config['api_version'] < (0, 10) else 1
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future = self.send(node_id, request, wakeup=wakeup)
future.add_callback(self.cluster.update_metadata)
future.add_errback(self.cluster.failed_update)

Expand All @@ -809,7 +809,7 @@ def refresh_done(val_or_error):
if self._connecting:
return self.config['reconnect_backoff_ms']

if self.maybe_connect(node_id):
if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return self.config['reconnect_backoff_ms']

Expand Down
4 changes: 2 additions & 2 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_maybe_refresh_metadata_update(mocker, client):
client._poll.assert_called_with(9999.999) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
send.assert_called_once_with('foobar', request)
send.assert_called_once_with('foobar', request, wakeup=False)


def test_maybe_refresh_metadata_cant_send(mocker, client):
Expand All @@ -348,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
client.maybe_connect.assert_called_once_with('foobar')
client.maybe_connect.assert_called_once_with('foobar', wakeup=False)

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
Expand Down

0 comments on commit b1effa2

Please sign in to comment.