Skip to content

Commit 298cb0d

Browse files
royantmandpkp
authored andcommitted
Issue #1780 - Consumer hang indefinitely in fetcher._retrieve_offsets() due to topic deletion while rebalancing (#1782)
1 parent 0f929bd commit 298cb0d

File tree

3 files changed

+26
-12
lines changed

3 files changed

+26
-12
lines changed

kafka/consumer/fetcher.py

+21-7
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,16 @@ def _reset_offset(self, partition):
235235
log.debug("Resetting offset for partition %s to %s offset.",
236236
partition, strategy)
237237
offsets = self._retrieve_offsets({partition: timestamp})
238-
if partition not in offsets:
239-
raise NoOffsetForPartitionError(partition)
240-
offset = offsets[partition][0]
241238

242-
# we might lose the assignment while fetching the offset,
243-
# so check it is still active
244-
if self._subscriptions.is_assigned(partition):
245-
self._subscriptions.seek(partition, offset)
239+
if partition in offsets:
240+
offset = offsets[partition][0]
241+
242+
# we might lose the assignment while fetching the offset,
243+
# so check it is still active
244+
if self._subscriptions.is_assigned(partition):
245+
self._subscriptions.seek(partition, offset)
246+
else:
247+
log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,))
246248

247249
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
248250
"""Fetch offset for each partition passed in ``timestamps`` map.
@@ -267,6 +269,9 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
267269
start_time = time.time()
268270
remaining_ms = timeout_ms
269271
while remaining_ms > 0:
272+
if not timestamps:
273+
return {}
274+
270275
future = self._send_offset_requests(timestamps)
271276
self._client.poll(future=future, timeout_ms=remaining_ms)
272277

@@ -283,6 +288,15 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
283288
if future.exception.invalid_metadata:
284289
refresh_future = self._client.cluster.request_update()
285290
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
291+
292+
# Issue #1780
293+
# Recheck partition existance after after a successful metadata refresh
294+
if refresh_future.succeeded() and isinstance(future.exception, Errors.StaleMetadata):
295+
log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existance")
296+
unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata
297+
if not self._client.cluster.leader_for_partition(unknown_partition):
298+
log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, ))
299+
timestamps.pop(unknown_partition)
286300
else:
287301
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
288302

kafka/coordinator/consumer.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,11 @@ def _on_join_complete(self, generation, member_id, protocol,
225225
self._subscription.needs_fetch_committed_offsets = True
226226

227227
# update partition assignment
228-
self._subscription.assign_from_subscribed(assignment.partitions())
228+
try:
229+
self._subscription.assign_from_subscribed(assignment.partitions())
230+
except ValueError as e:
231+
log.warning("%s. Probably due to a deleted topic. Requesting Re-join" % e)
232+
self.request_rejoin()
229233

230234
# give the assignor a chance to update internal state
231235
# based on the received assignment

test/test_fetcher.py

-4
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ def test__reset_offset(fetcher, mocker):
138138
fetcher._subscriptions.need_offset_reset(tp)
139139
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
140140

141-
mocked.return_value = {}
142-
with pytest.raises(NoOffsetForPartitionError):
143-
fetcher._reset_offset(tp)
144-
145141
mocked.return_value = {tp: (1001, None)}
146142
fetcher._reset_offset(tp)
147143
assert not fetcher._subscriptions.assignment[tp].awaiting_reset

0 commit comments

Comments
 (0)