Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1780 - Consumer hang indefinitely in fetcher._retrieve_offsets() due to topic deletion while rebalancing #1782

Merged
merged 2 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,16 @@ def _reset_offset(self, partition):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
if partition not in offsets:
raise NoOffsetForPartitionError(partition)
offset = offsets[partition][0]

# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
if partition in offsets:
offset = offsets[partition][0]

# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
else:
log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,))

def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
"""Fetch offset for each partition passed in ``timestamps`` map.
Expand All @@ -267,6 +269,9 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
start_time = time.time()
remaining_ms = timeout_ms
while remaining_ms > 0:
if not timestamps:
return {}

future = self._send_offset_requests(timestamps)
self._client.poll(future=future, timeout_ms=remaining_ms)

Expand All @@ -283,6 +288,15 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)

# Issue #1780
# Recheck partition existance after after a successful metadata refresh
if refresh_future.succeeded() and isinstance(future.exception, Errors.StaleMetadata):
log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existance")
unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata
if not self._client.cluster.leader_for_partition(unknown_partition):
log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, ))
timestamps.pop(unknown_partition)
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)

Expand Down
6 changes: 5 additions & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ def _on_join_complete(self, generation, member_id, protocol,
self._subscription.needs_fetch_committed_offsets = True

# update partition assignment
self._subscription.assign_from_subscribed(assignment.partitions())
try:
self._subscription.assign_from_subscribed(assignment.partitions())
except ValueError as e:
log.warning("%s. Probably due to a deleted topic. Requesting Re-join" % e)
self.request_rejoin()

# give the assignor a chance to update internal state
# based on the received assignment
Expand Down
4 changes: 0 additions & 4 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ def test__reset_offset(fetcher, mocker):
fetcher._subscriptions.need_offset_reset(tp)
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')

mocked.return_value = {}
with pytest.raises(NoOffsetForPartitionError):
fetcher._reset_offset(tp)

mocked.return_value = {tp: (1001, None)}
fetcher._reset_offset(tp)
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
Expand Down