Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New error messages seen in faust app with aiokafka 0.7.1 #166

Open
patkivikram opened this issue Jun 29, 2021 · 30 comments
Open

New error messages seen in faust app with aiokafka 0.7.1 #166

patkivikram opened this issue Jun 29, 2021 · 30 comments
Labels
bug Something isn't working

Comments

@patkivikram
Copy link
Collaborator

patkivikram commented Jun 29, 2021

Checklist

  • [x ] I have included information about relevant versions
  • [x ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Bring up the faust app and let it run for a while. After running for some time the application starts logging the following error messages

2021-06-28 10:57:33 [1] [ERROR] faust.transport.drivers.aiokafka [^----AIOKafkaConsumerThread]: Has not committed TP(topic='test_topic', partition=27) at all since worker start (started 5.52 minutes ago).

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

## Expected behavior

These errors should only happen if the commit offset is not progressing

## Actual behavior
The app is running fine and consumer offsets are actually updated

# Versions

* Python version 3.9
* Faust version 0.6.7
* Operating system Centos75
* Kafka version 2.4.1


patkivikram added a commit that referenced this issue Jun 29, 2021
patkivikram added a commit that referenced this issue Jun 29, 2021
patkivikram added a commit that referenced this issue Jun 29, 2021
* fix for consumer errors in app #166

* fix for consumer errors in app #166
patkivikram added a commit that referenced this issue Jul 6, 2021
@bluefatter
Copy link

Thanks for fix this. [Party]

@bitdivision
Copy link

We're still seeing this issue with v0.6.9

@patkivikram
Copy link
Collaborator Author

@bitdivision what are you seeing? Is this on a single worker or on multiple?

@bitdivision
Copy link

bitdivision commented Jul 12, 2021

We've now ignored all of the following logs in sentry configuration, however we are still seeing the logs for them in 0.6.9:

        "[^---AIOKafkaConsumerThread]: Has not committed",
        "[^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for",
        "[^---AIOKafkaConsumerThread]: Stream has not started processing",
        "[^---AIOKafkaConsumerThread]: Stream stopped processing, or is slow for",

We're running with 6 workers. I can give more details on specific logs if that's helpful?

Edit: to add to this, we've seen these messages for topics which are clearly being processed correctly. We would have seen lag increases if a topic stopped processing, but that's not been the case

@patkivikram
Copy link
Collaborator Author

can you share the logs you are seeing in 0.6.9 with more than 1 worker? It should not happen with 1 worker I hope :)

@popadi
Copy link
Contributor

popadi commented Nov 29, 2021

Any update on this? It still happens and it's hard to tell if it's a real issue or not. I'm using 0.6.10.

Edit:
Apparently one of my two agents was publishing thousands of messages every second and it was taking all the resources away from the other agents. The error stopped appearing after I added more replicas to my app and bumped the resources a bit.

patkivikram added a commit that referenced this issue Dec 13, 2021
* fix race condition when buffers are full

* fix race condition when buffers are full

* Fix error messages in faust app #166
@richardhundt
Copy link
Collaborator

If you run a worker with 1 topic N partitions, it only consumes from a single partition. After 300 seconds the monitor starts complaining that N-1 partitions are idle.

I've had to set isolated_partitions=True but this can't possibly be the intended behaviour, can it?

@daigotanaka
Copy link

daigotanaka commented Jul 21, 2022

Hi all,

I am having what seems to be the same problem with faust-streaming==0.8.5.

Here is my setup:

I have a source topic with 6 partitions.
As a test, I started a worker, and it's moving the offsets for Partitions 2, 3, 4, and 5. But 0 and 1 aren't processed, and they keep on lagging.
I added/removed a couple of more workers. Each time a worker joins or leaves the group, a reassignment happens. No matter which worker got assigned to Partition 0 or 1, the same partitions never move their offsets.

Our Kafka cluster is on Confluent Cloud. (Don't think it matters?)

I am also wondering if there is a bad record that's causing something like an infinite loop? If that is the case, the fix is on us, of course.
But I could not even do

tp = TP(<source_topic_name>, 0) 
await app.consumer.seek(tp, <some_offset>)

to skip to a certain offset.

So, just wanted to see if the problem in this issue is still alive just to eliminate the possibility that our issue is based on a Faust bug.

@richardhundt : You mentioned about isolated_partitions param. I checked the doc but could not understand what it does. Can you tell me how it helped you to fix your issue? Thanks!

Update: I just noticed my issue happens even with a single worker. This time, Partition 0 is stuck at offset=1. Other partitions are moving forward nicely.

@richardhundt
Copy link
Collaborator

@daigotanaka

As far as I can tell, the isolated_partitions parameter causes an actor to be spawned for each partition.

I expected that concurrency lets you create N actors for M partitions, including running N actors for a single partition or 1 actor for M partitions, but that doesn't seem to be how it works. The docs are kind of hand-wavy on the details, so I'm not sure if my understanding is correct.

I also found that in order to seek I needed to use an AIOKafkaConsumer directly. Something like this:

from aiokafka.consumer import AIOKafkaConsumer
from aiokafka import TopicPartition
from faust import TopicT

async def seek_topic_partition(topic: TopicT, partition: int, offset: int):
    app = topic.app
    consumer = AIOKafkaConsumer(loop=app.loop, group_id=app.conf.id)
    tp = TopicPartition(topic.get_topic_name(), partition)
    await consumer.start()
    consumer.assign([tp])
    await consumer.seek(tp, offset)
    await consumer.stop()

@daigotanaka
Copy link

@richardhundt Thank you for the details! I'll try AIOKafkaConsumer :)

@joekohlsdorf
Copy link
Contributor

joekohlsdorf commented Jul 22, 2022

This was introduced in 0.6.5 when we actually started calling the verification: 7a45b2b#diff-5704609ad5592d977f497ac5defed2c54606a1bf7e42f0677ddf88f59f47938bR278

The code doesn't care if commits go through, offsets are set in a dictionary and this is all we look at:

This probably never worked, I didn't have time to look into this in detail but my guess is that the global variable is read and updated from different threads and isn't really global.
Committing to Kafka works fine, offsets advance on all partitions.

In #153 people also complained about a significant performance regression when this additional check was enabled.

Until we find the issue you can go back to 0.6.4 or patch this check out.

@daigotanaka
Copy link

A follow-up to this comment:
#166 (comment)

I stopped seeing a message like "Has not committed TP(topic='test_topic', partition=27) at all since worker start (started 5.52 minutes ago)" and all the partitions started to process as expected as multiple-workers join/leave after finding the follow misconfiguration on our end:

The issue was the mismatch between the number of replicas between app config and Topic object.
We were using the MyTopic class workaround for changing # of Topic replicas we adapted from here:
#76

This was causing a mismatch between # replicas settings between the app.config and the topic. Properly aligning them via the env var TOPIC_REPLICATION_FACTOR resolved our issue.

This might be a novice mistake, but just leaving a note here anyways in case it's useful.

Thanks @richardhundt and @joekohlsdorf for providing the pointers! Reading those helped to narrow down the issue :)

Hi all,

I am having what seems to be the same problem with faust-streaming==0.8.5.

Here is my setup:

I have a source topic with 6 partitions. As a test, I started a worker, and it's moving the offsets for Partitions 2, 3, 4, and 5. But 0 and 1 aren't processed, and they keep on lagging. I added/removed a couple of more workers. Each time a worker joins or leaves the group, a reassignment happens. No matter which worker got assigned to Partition 0 or 1, the same partitions never move their offsets.

Our Kafka cluster is on Confluent Cloud. (Don't think it matters?)

I am also wondering if there is a bad record that's causing something like an infinite loop? If that is the case, the fix is on us, of course. But I could not even do

tp = TP(<source_topic_name>, 0) 
await app.consumer.seek(tp, <some_offset>)

to skip to a certain offset.

So, just wanted to see if the problem in this issue is still alive just to eliminate the possibility that our issue is based on a Faust bug.

@richardhundt : You mentioned about isolated_partitions param. I checked the doc but could not understand what it does. Can you tell me how it helped you to fix your issue? Thanks!

Update: I just noticed my issue happens even with a single worker. This time, Partition 0 is stuck at offset=1. Other partitions are moving forward nicely.

@wbarnha wbarnha closed this as completed Oct 5, 2022
@joekohlsdorf
Copy link
Contributor

joekohlsdorf commented Oct 5, 2022

@wbarnha Could you please explain why you closed this issue?

I don't see any recent changes to the problematic verification code I showed in #166 (comment)

I can still reproduce the problem and the solution posted by @daigotanaka does not work for me.

@wbarnha
Copy link
Member

wbarnha commented Oct 5, 2022

Thanks for getting back to me, I thought this was fixed by @daigotanaka but I'll go ahead and re-investigate.

@wbarnha wbarnha reopened this Oct 5, 2022
@wbarnha wbarnha added the bug Something isn't working label Oct 5, 2022
@patkivikram
Copy link
Collaborator Author

patkivikram commented Oct 10, 2022

this should be fixed with #380 - can you please test it @joekohlsdorf?

@patkivikram
Copy link
Collaborator Author

Anyone still seeing this with the latest release?

@JonathanSerafini
Copy link

JonathanSerafini commented Nov 11, 2022

most if not all of our faust-streaming 0.9.2 consumers are spitting out a bunch of these errors on and off ...
interestingly this also includes the assignor leader topic :

[^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='findings-processor-__assignor-__leader', partition=0) since start (started 7.57 hours ago)

@bhanuka-ilume
Copy link

Still seeing the issue in faust-streaming 0.9.5

[ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='', partition=0) (started 18.25 hours ago).

@wbarnha
Copy link
Member

wbarnha commented Feb 6, 2023

Still seeing the issue in faust-streaming 0.9.5

[ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='', partition=0) (started 18.25 hours ago).

I've also seen this error come up while Faust is actually running normally, so it's a bit hard to troubleshoot. I think the solution lies in reviewing our aiokafka drivers to log when everything is running nominally.

@alihoseiny
Copy link

For the record, We are facing the exact same issue using faust-streaming 0.10.13 and Python 3.11.3.

@richardhundt
Copy link
Collaborator

richardhundt commented Jul 19, 2023

Something to consider: if you have a large consumer_max_fetch_size (default is 1024 ** 2), have small messages, and your agent takes long to process, then you could see this issue.

What happens is that you fetch a chunk of 1048576 bytes in a single poll of Kafka, if your messages are 1kb on average then you'll have about 1k messages. If each message takes 1 second to process, then you end up polling Kafka once every ~15 minutes, and that'll trigger this error.

Try setting consumer_max_fetch_size to something much smaller. There are a couple of other settings you can play with. Here's my setup for feeding an OCR system with file names where the OCR backend takes several seconds per message:

app = faust.App(
    ...
    broker_heartbeat_interval=3,
    broker_session_timeout=120,
    broker_request_timeout=240,
    broker_max_poll_records=30,
    broker_max_poll_interval=120000,
    broker_commit_livelock_soft_timeout = 30 * 60,
    stream_processing_timeout = 30 * 60,
    consumer_max_fetch_size=2048,
   ...
)

Here I'm trying to increase the polling frequency by limiting max poll records and max fetch size, while increasing intervals and timeouts.

@joekohlsdorf
Copy link
Contributor

This is true but it also happens in environments which process millions of small messages.
We already know that this problem was introduced with this change. I don't know what's wrong with this validation but removing it fixes the problem (and speeds up processing significantly): 7a45b2b#diff-5704609ad5592d977f497ac5defed2c54606a1bf7e42f0677ddf88f59f47938bR278

@qlhai
Copy link

qlhai commented Nov 7, 2023

I also face this error msg in my project.
faust-streaming 0.10.16
aiokafka 0.8.1
kafka-python 2.0.2
Python 3.9.15
Debian GNU/Linux 11 (bullseye)

@rezblaze
Copy link

We have been seeing this error for a while:

  1. The agent processing the stream is hanging (waiting for network, I/O or infinite loop).
    2023-12-25 23:59:55,291 process=3269118 loglevel=ERROR request_id= correlation_id= logger=faust.transport.drivers.aiokafka _log_slow_processing() L909 [^--AIOKafkaConsumerThread]: Stream has not started processing TP(topic='build_events-BuildEvent.build_id-repartition', partition=4) (started 6.51 days ago).

There are multiple possible explanations for this:

  1. The processing of a single event in the stream
    is taking too long.

    The timeout for this is defined by the stream_processing_timeout setting,
    currently set to 300.0. If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

  2. The stream has stopped processing events for some reason.

  3. The agent processing the stream is hanging (waiting for network, I/O or infinite loop).
    2023-12-25 23:59:55,291 process=3269118 loglevel=ERROR request_id= correlation_id= logger=faust.transport.drivers.aiokafka _log_slow_processing() L909 [^--AIOKafkaConsumerThread]: Stream has not started processing TP(topic='build_events-BuildEvent.build_id-repartition', partition=1) (started 6.51 days ago).

There are multiple possible explanations for this:

  1. The processing of a single event in the stream
    is taking too long.

    The timeout for this is defined by the stream_processing_timeout setting,
    currently set to 300.0. If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

  2. The stream has stopped processing events for some reason.

We will get 100's / 1000's of these messages during a large run.

fastapi==0.90.1
uvicorn==0.14.0
python-dateutil==2.8.2
python-dotenv==1.0.0
faust-streaming==0.10.14
starlette-exporter==0.15.1
prometheus_fastapi_instrumentator==5.11.0
schedule==1.2.1
Python 3.11.3

@fonty422
Copy link
Collaborator

Just noting that we have the same thing, however for us it is a case where we send 7K messages on initialising the app, but thereafter may go several minutes without sending a single message. Then when one does send (after the 300s timer has elapsed) it triggers this. Is it simply triggering because it thinks there should have been a commit in the last 300s even though the topic hasn't changed the end offset?

@rahmed-hartree
Copy link

rahmed-hartree commented Jul 4, 2024

This is an issue when the stream is using transactions i.e. processing_guarantee is set to exactly_once.

There are two distinct methods used for committing here:
https://github.com/faust-streaming/faust/blob/master/faust/transport/consumer.py#L1043-L1050

The checks @ https://github.com/faust-streaming/faust/blob/master/faust/transport/drivers/aiokafka.py#L800 are dependent on the last committed offsets to be updated @ https://github.com/faust-streaming/faust/blob/master/faust/transport/drivers/aiokafka.py#L720.

However, if you are using transactions, this method is not used when committing offsets. Instead https://github.com/faust-streaming/faust/blob/master/faust/transport/consumer.py#L324 is used where the last committed offsets are not updated.

This results in false warnings being logged as it sees no offsets being committed.

@fonty422
Copy link
Collaborator

fonty422 commented Jul 4, 2024

So we can ignore these warnings? They're not impacting business?
Ideally, it would be fixed so it doesn't come up unless necessary, but in the mean time we ignore these?

@rahmed-hartree
Copy link

@fonty422 if you are using transactions, then yes - you can ignore these for now.

@thedevd
Copy link

thedevd commented Aug 1, 2024

I am facing the similar problem in my app, where all of sudden after sometime the consumer stopped.

^--AIOKafkaConsumerThread]: Has not committed TP(topic='workerInput', partition=0) (last commit 6.26 minutes ago).
There are multiple possible explanations for this:

  1. The processing of a single event in the stream is taking too long.
    The timeout for this is defined by the broker_commit_livelock_soft_timeout setting, currently set to 300.0. If you expect the time required to process an event, to be greater than this then please increase the timeout.
  2. The commit handler background thread has stopped working (report as bug).

"code_file": "/usr/local/lib/python3.10/dist-packages/faust/transport/drivers/aiokafka.py",
"code_line": 904,
"code_func": "_log_slow_processing"

I am using

  1. python 3.10
  2. faust-streaming 0.10.13
  3. aiokafka - 0.8.1

@MedAzizTousli
Copy link

Same issue here with faust 0.11.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests