Skip to content

Commit

Permalink
fix table rebalance issue (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
bobh66 authored Feb 22, 2021
1 parent a552a03 commit cd136ad
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 7 deletions.
2 changes: 1 addition & 1 deletion faust/stores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ async def need_active_standby_for(self, tp: TP) -> bool:

async def on_rebalance(
self,
table: CollectionT,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
generation_id: int = 0,
) -> None:
"""Handle rebalancing of the cluster."""
...
Expand Down
7 changes: 3 additions & 4 deletions faust/stores/rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ def _del(self, key: bytes) -> None:

async def on_rebalance(
self,
table: CollectionT,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
Expand All @@ -338,16 +337,16 @@ async def on_rebalance(
"""Rebalance occurred.
Arguments:
table: The table that we store data for.
assigned: Set of all assigned topic partitions.
revoked: Set of newly revoked topic partitions.
newly_assigned: Set of newly assigned topic partitions,
for which we were not assigned the last time.
generation_id: the metadata generation identifier for the re-balance
"""
self.rebalance_ack = False
async with self.db_lock:
self.revoke_partitions(table, revoked)
await self.assign_partitions(table, newly_assigned, generation_id)
self.revoke_partitions(self.table, revoked)
await self.assign_partitions(self.table, newly_assigned, generation_id)

async def stop(self) -> None:
self.logger.info("Closing rocksdb on stop")
Expand Down
1 change: 0 additions & 1 deletion faust/types/stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def reset_state(self) -> None:
@abc.abstractmethod
async def on_rebalance(
self,
table: _CollectionT,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/stores/test_rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ async def test_on_rebalance(self, *, store, table):
newly_assigned = {TP2}
generation_id = 1
await store.on_rebalance(
table, assigned, revoked, newly_assigned, generation_id=generation_id
assigned, revoked, newly_assigned, generation_id=generation_id
)

store.revoke_partitions.assert_called_once_with(table, revoked)
Expand Down

0 comments on commit cd136ad

Please sign in to comment.