Skip to content

Commit fbea5f0

Browse files
committed
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
1 parent f04435c commit fbea5f0

21 files changed

+1142
-355
lines changed

kafka/consumer/fetcher.py

+27-72
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
from kafka.future import Future
1414
from kafka.metrics.stats import Avg, Count, Max, Rate
1515
from kafka.protocol.fetch import FetchRequest
16-
from kafka.protocol.message import PartialMessage
1716
from kafka.protocol.offset import (
1817
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
1918
)
19+
from kafka.record import MemoryRecords
2020
from kafka.serializer import Deserializer
2121
from kafka.structs import TopicPartition, OffsetAndTimestamp
2222

@@ -295,7 +295,7 @@ def fetched_records(self, max_records=None):
295295
296296
Raises:
297297
OffsetOutOfRangeError: if no subscription offset_reset_strategy
298-
InvalidMessageError: if message crc validation fails (check_crcs
298+
CorruptRecordException: if message crc validation fails (check_crcs
299299
must be set to True)
300300
RecordTooLargeError: if a message is larger than the currently
301301
configured max_partition_fetch_bytes
@@ -440,57 +440,25 @@ def _message_generator(self):
440440

441441
self._next_partition_records = None
442442

443-
def _unpack_message_set(self, tp, messages):
443+
def _unpack_message_set(self, tp, records):
444444
try:
445-
for offset, size, msg in messages:
446-
if self.config['check_crcs'] and not msg.validate_crc():
447-
raise Errors.InvalidMessageError(msg)
448-
449-
if not msg.is_compressed():
450-
yield self._parse_record(tp, offset, msg.timestamp, msg)
451-
452-
else:
453-
# If relative offset is used, we need to decompress the entire message first
454-
# to compute the absolute offset.
455-
inner_mset = msg.decompress()
456-
457-
# There should only ever be a single layer of compression
458-
if inner_mset[0][-1].is_compressed():
459-
log.warning('MessageSet at %s offset %d appears '
460-
' double-compressed. This should not'
461-
' happen -- check your producers!',
462-
tp, offset)
463-
if self.config['skip_double_compressed_messages']:
464-
log.warning('Skipping double-compressed message at'
465-
' %s %d', tp, offset)
466-
continue
467-
468-
if msg.magic > 0:
469-
last_offset, _, _ = inner_mset[-1]
470-
absolute_base_offset = offset - last_offset
471-
else:
472-
absolute_base_offset = -1
473-
474-
for inner_offset, inner_size, inner_msg in inner_mset:
475-
if msg.magic > 0:
476-
# When magic value is greater than 0, the timestamp
477-
# of a compressed message depends on the
478-
# typestamp type of the wrapper message:
479-
480-
if msg.timestamp_type == 0: # CREATE_TIME (0)
481-
inner_timestamp = inner_msg.timestamp
482-
483-
elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
484-
inner_timestamp = msg.timestamp
485-
486-
else:
487-
raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
488-
else:
489-
inner_timestamp = msg.timestamp
490-
491-
if absolute_base_offset >= 0:
492-
inner_offset += absolute_base_offset
493-
yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg)
445+
batch = records.next_batch()
446+
while batch is not None:
447+
for record in batch:
448+
key_size = len(record.key) if record.key is not None else -1
449+
value_size = len(record.value) if record.value is not None else -1
450+
key = self._deserialize(
451+
self.config['key_deserializer'],
452+
tp.topic, record.key)
453+
value = self._deserialize(
454+
self.config['value_deserializer'],
455+
tp.topic, record.value)
456+
yield ConsumerRecord(
457+
tp.topic, tp.partition, record.offset, record.timestamp,
458+
record.timestamp_type, key, value, record.checksum,
459+
key_size, value_size)
460+
461+
batch = records.next_batch()
494462

495463
# If unpacking raises StopIteration, it is erroneously
496464
# caught by the generator. We want all exceptions to be raised
@@ -505,15 +473,6 @@ def _unpack_message_set(self, tp, messages):
505473
log.exception('AssertionError raised unpacking messageset: %s', e)
506474
raise
507475

508-
def _parse_record(self, tp, offset, timestamp, msg):
509-
key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key)
510-
value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value)
511-
return ConsumerRecord(tp.topic, tp.partition, offset,
512-
timestamp, msg.timestamp_type,
513-
key, value, msg.crc,
514-
len(msg.key) if msg.key is not None else -1,
515-
len(msg.value) if msg.value is not None else -1)
516-
517476
def __iter__(self): # pylint: disable=non-iterator-returned
518477
return self
519478

@@ -783,7 +742,7 @@ def _parse_fetched_data(self, completed_fetch):
783742

784743
error_code, highwater = completed_fetch.partition_data[:2]
785744
error_type = Errors.for_code(error_code)
786-
messages = completed_fetch.partition_data[-1]
745+
records = MemoryRecords(partition_data[-1])
787746

788747
try:
789748
if not self._subscriptions.is_fetchable(tp):
@@ -807,21 +766,17 @@ def _parse_fetched_data(self, completed_fetch):
807766
position)
808767
return None
809768

810-
partial = None
811-
if messages and isinstance(messages[-1][-1], PartialMessage):
812-
partial = messages.pop()
813-
814-
if messages:
769+
if records.has_next():
815770
log.debug("Adding fetched record for partition %s with"
816771
" offset %d to buffered record list", tp,
817772
position)
818-
unpacked = list(self._unpack_message_set(tp, messages))
773+
unpacked = list(self._unpack_message_set(tp, records))
819774
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
820-
last_offset, _, _ = messages[-1]
775+
last_offset = unpacked[-1].offset
821776
self._sensors.records_fetch_lag.record(highwater - last_offset)
822-
num_bytes = sum(msg[1] for msg in messages)
823-
records_count = len(messages)
824-
elif partial:
777+
num_bytes = records.valid_bytes()
778+
records_count = len(unpacked)
779+
elif records.size_in_bytes() > 0:
825780
# we did not read a single message from a non-empty
826781
# buffer because that message's size is larger than
827782
# fetch size, in this case record this exception

kafka/errors.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,15 @@ class OffsetOutOfRangeError(BrokerResponseError):
101101
' maintained by the server for the given topic/partition.')
102102

103103

104-
class InvalidMessageError(BrokerResponseError):
104+
class CorruptRecordException(BrokerResponseError):
105105
errno = 2
106-
message = 'INVALID_MESSAGE'
106+
message = 'CORRUPT_MESSAGE'
107107
description = ('This message has failed its CRC checksum, exceeds the'
108108
' valid size, or is otherwise corrupt.')
109109

110+
# Backward compatibility
111+
InvalidMessageError = CorruptRecordException
112+
110113

111114
class UnknownTopicOrPartitionError(BrokerResponseError):
112115
errno = 3

kafka/producer/buffer.py

+1-125
Original file line numberDiff line numberDiff line change
@@ -5,133 +5,9 @@
55
import threading
66
import time
77

8-
from ..codec import (has_gzip, has_snappy, has_lz4,
9-
gzip_encode, snappy_encode,
10-
lz4_encode, lz4_encode_old_kafka)
11-
from .. import errors as Errors
128
from ..metrics.stats import Rate
13-
from ..protocol.types import Int32, Int64
14-
from ..protocol.message import MessageSet, Message
159

16-
17-
18-
class MessageSetBuffer(object):
19-
"""Wrap a buffer for writing MessageSet batches.
20-
21-
Arguments:
22-
buf (IO stream): a buffer for writing data. Typically BytesIO.
23-
batch_size (int): maximum number of bytes to write to the buffer.
24-
25-
Keyword Arguments:
26-
compression_type ('gzip', 'snappy', None): compress messages before
27-
publishing. Default: None.
28-
"""
29-
_COMPRESSORS = {
30-
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
31-
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
32-
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
33-
'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
34-
}
35-
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
36-
if compression_type is not None:
37-
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
38-
39-
# Kafka 0.8/0.9 had a quirky lz4...
40-
if compression_type == 'lz4' and message_version == 0:
41-
compression_type = 'lz4-old-kafka'
42-
43-
checker, encoder, attributes = self._COMPRESSORS[compression_type]
44-
assert checker(), 'Compression Libraries Not Found'
45-
self._compressor = encoder
46-
self._compression_attributes = attributes
47-
else:
48-
self._compressor = None
49-
self._compression_attributes = None
50-
51-
self._message_version = message_version
52-
self._buffer = buf
53-
# Init MessageSetSize to 0 -- update on close
54-
self._buffer.seek(0)
55-
self._buffer.write(Int32.encode(0))
56-
self._batch_size = batch_size
57-
self._closed = False
58-
self._messages = 0
59-
self._bytes_written = 4 # Int32 header is 4 bytes
60-
self._final_size = None
61-
62-
def append(self, offset, message):
63-
"""Append a Message to the MessageSet.
64-
65-
Arguments:
66-
offset (int): offset of the message
67-
message (Message or bytes): message struct or encoded bytes
68-
69-
Returns: bytes written
70-
"""
71-
if isinstance(message, Message):
72-
encoded = message.encode()
73-
else:
74-
encoded = bytes(message)
75-
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
76-
self._buffer.write(msg)
77-
self._messages += 1
78-
self._bytes_written += len(msg)
79-
return len(msg)
80-
81-
def has_room_for(self, key, value):
82-
if self._closed:
83-
return False
84-
if not self._messages:
85-
return True
86-
needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
87-
if key is not None:
88-
needed_bytes += len(key)
89-
if value is not None:
90-
needed_bytes += len(value)
91-
return self._buffer.tell() + needed_bytes < self._batch_size
92-
93-
def is_full(self):
94-
if self._closed:
95-
return True
96-
return self._buffer.tell() >= self._batch_size
97-
98-
def close(self):
99-
# This method may be called multiple times on the same batch
100-
# i.e., on retries
101-
# we need to make sure we only close it out once
102-
# otherwise compressed messages may be double-compressed
103-
# see Issue 718
104-
if not self._closed:
105-
if self._compressor:
106-
# TODO: avoid copies with bytearray / memoryview
107-
uncompressed_size = self._buffer.tell()
108-
self._buffer.seek(4)
109-
msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)),
110-
attributes=self._compression_attributes,
111-
magic=self._message_version)
112-
encoded = msg.encode()
113-
self._buffer.seek(4)
114-
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
115-
self._buffer.write(Int32.encode(len(encoded)))
116-
self._buffer.write(encoded)
117-
118-
# Update the message set size (less the 4 byte header),
119-
# and return with buffer ready for full read()
120-
self._final_size = self._buffer.tell()
121-
self._buffer.seek(0)
122-
self._buffer.write(Int32.encode(self._final_size - 4))
123-
124-
self._buffer.seek(0)
125-
self._closed = True
126-
127-
def size_in_bytes(self):
128-
return self._final_size or self._buffer.tell()
129-
130-
def compression_rate(self):
131-
return self.size_in_bytes() / self._bytes_written
132-
133-
def buffer(self):
134-
return self._buffer
10+
import kafka.errors as Errors
13511

13612

13713
class SimpleBufferPool(object):

kafka/producer/kafka.py

+34-9
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212

1313
from .. import errors as Errors
1414
from ..client_async import KafkaClient, selectors
15+
from ..codec import has_gzip, has_snappy, has_lz4
1516
from ..metrics import MetricConfig, Metrics
1617
from ..partitioner.default import DefaultPartitioner
17-
from ..protocol.message import Message, MessageSet
18+
from ..record.legacy_records import LegacyRecordBatchBuilder
1819
from ..serializer import Serializer
1920
from ..structs import TopicPartition
2021
from .future import FutureRecordMetadata, FutureProduceResult
@@ -310,6 +311,13 @@ class KafkaProducer(object):
310311
'sasl_plain_password': None,
311312
}
312313

314+
_COMPRESSORS = {
315+
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
316+
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
317+
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
318+
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
319+
}
320+
313321
def __init__(self, **configs):
314322
log.debug("Starting the Kafka producer") # trace
315323
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -355,7 +363,16 @@ def __init__(self, **configs):
355363
if self.config['compression_type'] == 'lz4':
356364
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
357365

358-
message_version = 1 if self.config['api_version'] >= (0, 10) else 0
366+
# Check compression_type for library support
367+
ct = self.config['compression_type']
368+
if ct not in self._COMPRESSORS:
369+
raise ValueError("Not supported codec: {}".format(ct))
370+
else:
371+
checker, compression_attrs = self._COMPRESSORS[ct]
372+
assert checker(), "Libraries for {} compression codec not found".format(ct)
373+
self.config['compression_type'] = compression_attrs
374+
375+
message_version = self._max_usable_produce_magic()
359376
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
360377
self._metadata = client.cluster
361378
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
@@ -465,6 +482,17 @@ def partitions_for(self, topic):
465482
max_wait = self.config['max_block_ms'] / 1000.0
466483
return self._wait_on_metadata(topic, max_wait)
467484

485+
def _max_usable_produce_magic(self):
486+
if self.config['api_version'] >= (0, 10):
487+
return 1
488+
else:
489+
return 0
490+
491+
def _estimate_size_in_bytes(self, key, value):
492+
magic = self._max_usable_produce_magic()
493+
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
494+
magic, self.config['compression_type'], key, value)
495+
468496
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
469497
"""Publish a message to a topic.
470498
@@ -514,11 +542,7 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
514542
partition = self._partition(topic, partition, key, value,
515543
key_bytes, value_bytes)
516544

517-
message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
518-
if key_bytes is not None:
519-
message_size += len(key_bytes)
520-
if value_bytes is not None:
521-
message_size += len(value_bytes)
545+
message_size = self._estimate_size_in_bytes(key, value)
522546
self._ensure_valid_record_size(message_size)
523547

524548
tp = TopicPartition(topic, partition)
@@ -527,11 +551,12 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
527551
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
528552
result = self._accumulator.append(tp, timestamp_ms,
529553
key_bytes, value_bytes,
530-
self.config['max_block_ms'])
554+
self.config['max_block_ms'],
555+
estimated_size=message_size)
531556
future, batch_is_full, new_batch_created = result
532557
if batch_is_full or new_batch_created:
533558
log.debug("Waking up the sender since %s is either full or"
534-
" getting a new batch", tp)
559+
" getting a new batch", tp)
535560
self._sender.wakeup()
536561

537562
return future

0 commit comments

Comments
 (0)