Skip to content

Commit

Permalink
Fix recovery of partitions for large volume writes (#335)
Browse files Browse the repository at this point in the history
* commit inital fix for #333

* manually lint comment

* allow multiple partitions to resume

* minor cleanup
  • Loading branch information
wbarnha authored Aug 10, 2022
1 parent 9ec1858 commit 5a2ba13
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,16 @@ async def _restart_recovery(self) -> None:
# The changelog partitions only in the active_tps set need to be resumed
active_only_partitions = active_tps - standby_tps
if active_only_partitions:
T(consumer.resume_partitions)(active_only_partitions)
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()
# Support for the specific scenario where recovery_buffer=1
tps_resuming = [
tp
for tp in active_only_partitions
if self.tp_to_table[tp].recovery_buffer_size == 1
]
if tps_resuming:
T(consumer.resume_partitions)(tps_resuming)
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()

self.log.info("Recovery complete")
if span:
Expand Down

0 comments on commit 5a2ba13

Please sign in to comment.