diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 1adbcf4b5..a9615d52e 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -474,9 +474,16 @@ async def _restart_recovery(self) -> None: # The changelog partitions only in the active_tps set need to be resumed active_only_partitions = active_tps - standby_tps if active_only_partitions: - T(consumer.resume_partitions)(active_only_partitions) - T(self.app.flow_control.resume)() - T(consumer.resume_flow)() + # Support for the specific scenario where recovery_buffer=1 + tps_resuming = [ + tp + for tp in active_only_partitions + if self.tp_to_table[tp].recovery_buffer_size == 1 + ] + if tps_resuming: + T(consumer.resume_partitions)(tps_resuming) + T(self.app.flow_control.resume)() + T(consumer.resume_flow)() self.log.info("Recovery complete") if span: