Skip to content

Commit 7965460

Browse files
keithksdpkp
authored andcommitted
1701 use last offset from fetch v4 if available (#1724)
1 parent 1904b53 commit 7965460

File tree

3 files changed

+28
-0
lines changed

3 files changed

+28
-0
lines changed

kafka/consumer/fetcher.py

+19
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,14 @@ def _unpack_message_set(self, tp, records):
439439
try:
440440
batch = records.next_batch()
441441
while batch is not None:
442+
443+
# LegacyRecordBatch cannot access either base_offset or last_offset_delta
444+
try:
445+
self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
446+
batch.last_offset_delta
447+
except AttributeError:
448+
pass
449+
442450
for record in batch:
443451
key_size = len(record.key) if record.key is not None else -1
444452
value_size = len(record.value) if record.value is not None else -1
@@ -643,6 +651,17 @@ def _create_fetch_requests(self):
643651

644652
for partition in self._fetchable_partitions():
645653
node_id = self._client.cluster.leader_for_partition(partition)
654+
655+
# advance position for any deleted compacted messages if required
656+
if self._subscriptions.assignment[partition].last_offset_from_message_batch:
657+
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
658+
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
659+
log.debug(
660+
"Advance position for partition %s from %s to %s (last message batch location plus one)"
661+
" to correct for deleted compacted messages",
662+
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
663+
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
664+
646665
position = self._subscriptions.assignment[partition].position
647666

648667
# fetch if there is a leader and no in-flight requests

kafka/consumer/subscription_state.py

+5
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ def __init__(self):
382382
self._position = None # offset exposed to the user
383383
self.highwater = None
384384
self.drop_pending_message_set = False
385+
# The last message offset hint available from a message batch with
386+
# magic=2 which includes deleted compacted messages
387+
self.last_offset_from_message_batch = None
385388

386389
def _set_position(self, offset):
387390
assert self.has_valid_position, 'Valid position required'
@@ -396,6 +399,7 @@ def await_reset(self, strategy):
396399
self.awaiting_reset = True
397400
self.reset_strategy = strategy
398401
self._position = None
402+
self.last_offset_from_message_batch = None
399403
self.has_valid_position = False
400404

401405
def seek(self, offset):
@@ -404,6 +408,7 @@ def seek(self, offset):
404408
self.reset_strategy = None
405409
self.has_valid_position = True
406410
self.drop_pending_message_set = True
411+
self.last_offset_from_message_batch = None
407412

408413
def pause(self):
409414
self.paused = True

kafka/record/default_records.py

+4
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ def crc(self):
140140
def attributes(self):
141141
return self._header_data[5]
142142

143+
@property
144+
def last_offset_delta(self):
145+
return self._header_data[6]
146+
143147
@property
144148
def compression_type(self):
145149
return self.attributes & self.CODEC_MASK

0 commit comments

Comments
 (0)