-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async producer retries for failed messages #331
Conversation
There are several exceptions that could be raised within the send_produce_request() call chain besides FailedPayloadsError:
|
Thanks a lot for the clarifications, it seems that we need more smart retrying logic, I'll think about it. So far I've updated the code a little bit to retry all failed requests without exclusions. |
I dont think we can do blanket retries. Many of the errors require resyncing cluster metadata before retry. The error handling logic should handle that. |
bc8965a
to
8f45421
Compare
Sorry, removed/restored the branch by accident, I'm on this issue. |
400f10e
to
59faeca
Compare
@dpkp I'm still having some odd errors from Travis, but could you check the approach please? I'm not sure if we're fine with having a named tuple as an argument for a producer. |
except Exception: | ||
log.exception("Unable to send message") | ||
|
||
except FailedPayloadsError as ex: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that #366 is merged, we can pass fail_on_error=False
to client.send_produce_request
and get a list of responses/errors. This would allow us to loop through the responses and only retry the reqs that failed. Currently this exception would cause all reqs in the batch to retry, even if only a single req caused the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for pointing me, I missed that PR!
@dpkp Thanks for all your comments, they're extremely helpful, I'll try to fix everything asap. |
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
93ef46d
to
4474a50
Compare
Rebased and prepared a draft for |
8916dbc
to
7d6f3f5
Compare
awesome! will take a look shortly! |
There are a handful of issues remaining -- need to reenable the test_switch_leader_async test in test/test_failover_integration.py and then fix it along w/ the lint issues failing travis build. But you've put in a ton of work and I think it is basically there, so I am going to merge and push a few fixes directly. Thanks so much for working on this one! |
Async producer retries for failed messages
Cool, thank you @dpkp! If I have some additional time, I'll fix other issues (like additional tests and docstrings) as well in a separate PR. |
Thanks again for your help on this. I put together some edits in #388 (merged yesterday). |
Another approach to add retries for failed messages (for asynchronous mode only).
ProduceRequest
a little bit by adding complete retries count to the fields.It's backward compatible, you can miss this param when creating initial ProduceRequest, it's 0 initially.
(It's not very clean solution in terms of code, but storing retries count per request sounds reasonable.)
batch_retry_backoff_ms
- backoff timeout to wait before next retry (300 by default);batch_retries_limit
- upper limit of retries per request(
5
by default,None
means infinite retries,0
means no retries).FailedPayloadsException
from Improve fault tolerance by handling leadership election and other metadata changes #55 to get failed requests and increment retries for it, or drop it depending on settings.What do you think about the approach? I'm going to add some additional tests for it asap.