From 5a2ba13ddc45bcfdf7948d6f77daaba34dcaef97 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Wed, 10 Aug 2022 11:59:59 -0400 Subject: [PATCH] Fix recovery of partitions for large volume writes (#335) * commit inital fix for #333 * manually lint comment * allow multiple partitions to resume * minor cleanup --- faust/tables/recovery.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 1adbcf4b5..a9615d52e 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -474,9 +474,16 @@ async def _restart_recovery(self) -> None: # The changelog partitions only in the active_tps set need to be resumed active_only_partitions = active_tps - standby_tps if active_only_partitions: - T(consumer.resume_partitions)(active_only_partitions) - T(self.app.flow_control.resume)() - T(consumer.resume_flow)() + # Support for the specific scenario where recovery_buffer=1 + tps_resuming = [ + tp + for tp in active_only_partitions + if self.tp_to_table[tp].recovery_buffer_size == 1 + ] + if tps_resuming: + T(consumer.resume_partitions)(tps_resuming) + T(self.app.flow_control.resume)() + T(consumer.resume_flow)() self.log.info("Recovery complete") if span: