From cd136ad791546c3b0e1fdac44a27cb6cd2caea21 Mon Sep 17 00:00:00 2001 From: Bob Haddleton Date: Mon, 22 Feb 2021 12:53:33 -0600 Subject: [PATCH] fix table rebalance issue (#110) --- faust/stores/base.py | 2 +- faust/stores/rocksdb.py | 7 +++---- faust/types/stores.py | 1 - tests/unit/stores/test_rocksdb.py | 2 +- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/faust/stores/base.py b/faust/stores/base.py index e224c65b7..c684b8511 100644 --- a/faust/stores/base.py +++ b/faust/stores/base.py @@ -68,10 +68,10 @@ async def need_active_standby_for(self, tp: TP) -> bool: async def on_rebalance( self, - table: CollectionT, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], + generation_id: int = 0, ) -> None: """Handle rebalancing of the cluster.""" ... diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 649e56991..17e8a9ef8 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -329,7 +329,6 @@ def _del(self, key: bytes) -> None: async def on_rebalance( self, - table: CollectionT, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], @@ -338,16 +337,16 @@ async def on_rebalance( """Rebalance occurred. Arguments: - table: The table that we store data for. assigned: Set of all assigned topic partitions. revoked: Set of newly revoked topic partitions. newly_assigned: Set of newly assigned topic partitions, for which we were not assigned the last time. + generation_id: the metadata generation identifier for the re-balance """ self.rebalance_ack = False async with self.db_lock: - self.revoke_partitions(table, revoked) - await self.assign_partitions(table, newly_assigned, generation_id) + self.revoke_partitions(self.table, revoked) + await self.assign_partitions(self.table, newly_assigned, generation_id) async def stop(self) -> None: self.logger.info("Closing rocksdb on stop") diff --git a/faust/types/stores.py b/faust/types/stores.py index 93572f957..a9cbac723 100644 --- a/faust/types/stores.py +++ b/faust/types/stores.py @@ -89,7 +89,6 @@ def reset_state(self) -> None: @abc.abstractmethod async def on_rebalance( self, - table: _CollectionT, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], diff --git a/tests/unit/stores/test_rocksdb.py b/tests/unit/stores/test_rocksdb.py index 69d665799..b4907898a 100644 --- a/tests/unit/stores/test_rocksdb.py +++ b/tests/unit/stores/test_rocksdb.py @@ -323,7 +323,7 @@ async def test_on_rebalance(self, *, store, table): newly_assigned = {TP2} generation_id = 1 await store.on_rebalance( - table, assigned, revoked, newly_assigned, generation_id=generation_id + assigned, revoked, newly_assigned, generation_id=generation_id ) store.revoke_partitions.assert_called_once_with(table, revoked)