Skip to content

Commit 23534b4

Browse files
authored
Raise AssertionError if consumer closed in poll() (#1978)
1 parent 3aada77 commit 23534b4

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

kafka/consumer/group.py

+3
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
638638
max_records = self.config['max_poll_records']
639639
assert isinstance(max_records, int), 'max_records must be an integer'
640640
assert max_records > 0, 'max_records must be positive'
641+
assert not self._closed, 'KafkaConsumer is closed'
641642

642643
# Poll for new data until the timeout expires
643644
start = time.time()
@@ -1173,6 +1174,8 @@ def __iter__(self): # pylint: disable=non-iterator-returned
11731174
return self
11741175

11751176
def __next__(self):
1177+
if self._closed:
1178+
raise StopIteration('KafkaConsumer closed')
11761179
# Now that the heartbeat thread runs in the background
11771180
# there should be no reason to maintain a separate iterator
11781181
# but we'll keep it available for a few releases just in case

0 commit comments

Comments
 (0)