Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when consuming: "'PartialMessage' object has no attribute 'validate_crc'" #672

Closed
zoltan-fedor opened this issue Apr 27, 2016 · 10 comments

Comments

@zoltan-fedor
Copy link

In the last few days I have upgraded to kafka 0.9 and kafka-python 1.1.1 and started receiving at times the following error when consuming from Kafka:
'PartialMessage' object has no attribute 'validate_crc'

<FrameSummary file /data/apps/kabinet-dev/kabinet/jobs/avro.py, line 259 in run_job>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/group.py, line 844 in __next__>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/group.py, line 805 in _message_generator>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 445 in __next__>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 410 in _message_generator>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 360 in _unpack_message_set>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 360 in _unpack_message_set>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 357 in _unpack_message_set>
@zoltan-fedor
Copy link
Author

zoltan-fedor commented Apr 27, 2016

It seems it is always the same messages are causing this problem - so when it gets to the same message(s) then the consumer fails with the above error instead of just stepping them over (assuming they are incorrect messages).

@zoltan-fedor
Copy link
Author

If I read the code correctly, the problem is that https://github.com/dpkp/kafka-python/blame/aefafd270a75b9a3d21c148eefba5ba56cbc622b/kafka/consumer/fetcher.py#L357 does not check for PartialMessage, but assumes that all msg has a validate_crc() method.

I assume a similar handling of PartialMessages should be added to fetcher.py as the simple.py used to have at

if resp.messages and isinstance(resp.messages[-1].message, PartialMessage):

@dpkp
Copy link
Owner

dpkp commented Apr 27, 2016

strange: https://github.com/dpkp/kafka-python/blob/1.1.1/kafka/consumer/fetcher.py#L615 should guarantee that no PartialMessages are put on the internal _records queue, which is what fetcher consumes from when unpacking.

Are you using compressed messages by chance?

@zoltan-fedor
Copy link
Author

zoltan-fedor commented Apr 27, 2016

Our Kafka cluster stores the data snappy compressed - but not sure whether this would mean that the messages are compressed.
I have already increased all buffers thinking that maybe the messages get cut - to no help.
This is how the consumer setup currently looks:

kafka_consumer = KafkaConsumer(
            settings['topic'],
            group_id=settings['kafka_group_id'],
            bootstrap_servers=settings['kafka_servers'],
            max_partition_fetch_bytes=10*1048576,  # increased from the default 1048576 so large messages are able to come through
            receive_buffer_bytes=5*32768,
            enable_auto_commit=True,
            consumer_timeout_ms=10000,  # when there is no more data how quickly exit the loop
            fetch_max_wait_ms=2000,
            auto_offset_reset='earliest',
            request_timeout_ms=60*60*1000
        )

Interesting thing that I have no problem with another topic on the same Kafka cluster, for some reason so far this issue is specific to one topic only.

@zoltan-fedor
Copy link
Author

zoltan-fedor commented Apr 27, 2016

I can see that you are checking for PartialMessages at https://github.com/dpkp/kafka-python/blob/1.1.1/kafka/consumer/fetcher.py#L615, but only looking for it as the last message.
I suspected that there might be PartialMessages elsewhere, so I added the following if statement to fetcher.py (line #357) to look for them

            for offset, size, msg in messages:
               if isinstance(msg, PartialMessage):
                    print("This is a partial msg")
                    continue
                if self.config['check_crcs'] and not msg.validate_crc():
                    raise Errors.InvalidMessageError(msg)

And yes, I was seeing a few "This is a partial msg" all the way until I got stopped by another error:
unpack requires a bytes object of length 4

<FrameSummary file /data/apps/kabinet-dev/kabinet/jobs/avro.py, line 267 in run_job>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/group.py, line 844 in __next__>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/group.py, line 805 in _message_generator>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 451 in __next__>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 416 in _message_generator>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 366 in _unpack_message_set>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/consumer/fetcher.py, line 366 in _unpack_message_set>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/protocol/message.py, line 79 in decompress>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/protocol/message.py, line 148 in decode>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/protocol/message.py, line 50 in decode>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/protocol/message.py, line 50 in <listcomp>>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/protocol/types.py, line 79 in decode>,
<FrameSummary file /data/apps/kabinet-dev/kabinet/venv1/lib64/python3.5/site-packages/kafka/protocol/types.py, line 37 in decode>

So it seems I need some more verification of the messages.

@dpkp
Copy link
Owner

dpkp commented Apr 27, 2016

PartialMessage should only ever be at the end of a message set. It is/was an artifact of the broker cutting off message sets strictly at the max_partition_fetch_bytes boundary and not "pruning" the dangling bytes from the message set before returning to the client. It should be impossible to get a partial message anywhere else.

But if you are consuming compressed message sets then you should never ever see a partial message.

What code is producing messages to your brokers? Are the messages sent w/ compression enabled?

@zoltan-fedor
Copy link
Author

We are using kafka-python to produce these messages to the brokers and it is producing two different topics and I have no problem with the other topic.
But this was the topic I was using to test after the upgrade and having issues producing yesterday (and now consuming back that data), so maybe there are some sections in it which got damaged somehow.
I will try to skip over the section and try again.

Yes, the producer is sending the messages with compression enabled (compression_type='snappy').

@zoltan-fedor
Copy link
Author

After skipping that problematic section of the topic (by manually moving the consumer offset forward), the issue does not come up anymore, so it seems it was some kind of a problem with the topic data - probably related to the Kafka cluster and kafka-python upgrade.

Either case it seems this was just a one-time thing (multiple messages, but only in one section of the topic feed) and can't reproduce it, I am closing this.

@benauthor
Copy link
Contributor

I seem to be following closely in @zoltan-fedor's footsteps in digging up obscure occasional compression-related bugs -- I am seeing this one today. I'll attempt to show something reproducible...

@dpkp dpkp reopened this Jun 29, 2016
@dpkp
Copy link
Owner

dpkp commented Jul 15, 2016

Looking back at this issue, I believe it is the same root issue as #718 .

@dpkp dpkp closed this as completed Jul 15, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants