Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix rooms not being properly excluded from incremental sync #13408

Merged
merged 10 commits into from
Aug 4, 2022
1 change: 1 addition & 0 deletions changelog.d/13408.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.57.0 where rooms listed in `exclude_rooms_from_sync` in the configuration file would not be properly excluded from incremental syncs.
25 changes: 14 additions & 11 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,15 +1536,13 @@ async def _generate_sync_entry_for_rooms(
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users, self.rooms_to_exclude
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
room_changes = await self._get_all_rooms(
sync_result_builder, ignored_users, self.rooms_to_exclude
)
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
tags_by_room = await self.store.get_tags_for_user(user_id)

log_kv({"rooms_changed": len(room_changes.room_entries)})
Expand Down Expand Up @@ -1623,13 +1621,14 @@ async def _get_rooms_changed(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
excluded_rooms: List[str],
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.

This function is a first pass at generating the rooms part of the sync response.
It determines which rooms have changed during the sync period, and categorises
them into four buckets: "knock", "invite", "join" and "leave".
them into four buckets: "knock", "invite", "join" and "leave". It also excludes
from that list any room that appears in the list of rooms to exclude from sync
results in the server configuration.

1. Finds all membership changes for the user in the sync period (from
`since_token` up to `now_token`).
Expand All @@ -1655,7 +1654,7 @@ async def _get_rooms_changed(
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, excluded_rooms
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
)

mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
Expand Down Expand Up @@ -1862,7 +1861,6 @@ async def _get_all_rooms(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
ignored_rooms: List[str],
) -> _RoomChanges:
"""Returns entries for all rooms for the user.

Expand All @@ -1884,7 +1882,7 @@ async def _get_all_rooms(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
excluded_rooms=ignored_rooms,
excluded_rooms=self.rooms_to_exclude,
)

room_entries = []
Expand Down Expand Up @@ -2150,7 +2148,9 @@ async def _generate_room_entry(
raise Exception("Unrecognized rtype: %r", room_builder.rtype)

async def get_rooms_for_user_at(
self, user_id: str, room_key: RoomStreamToken
self,
user_id: str,
room_key: RoomStreamToken,
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.

Expand All @@ -2165,7 +2165,10 @@ async def get_rooms_for_user_at(
ReturnValue:
Set of room_ids the user is in at given stream_ordering.
"""
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(
user_id,
exclude_for_sync=True,
)

joined_room_ids = set()

Expand Down
7 changes: 6 additions & 1 deletion synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,13 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)

if data.type == EventTypes.Member:
# We need to invalidate the cache for get_rooms_for_user_with_stream_ordering
# both with and without excluding rooms for sync results.
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
(data.state_key, False)
)
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key, True)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
Expand Down
8 changes: 7 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,9 +1172,15 @@ def _update_current_state_txn(
# Invalidate the various caches

for member in members_changed:
# We need to invalidate the cache for get_rooms_for_user_with_stream_ordering
# both with and without excluding rooms for sync results.
txn.call_after(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
(member, False),
)
txn.call_after(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member, True),
)
Copy link
Contributor Author

@babolivier babolivier Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of this bit but I couldn't find out another way to invalidate the cache for member for any value of the second parameter.


self.store._invalidate_state_caches_and_stream(
Expand Down
15 changes: 13 additions & 2 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def __init__(
lambda: self._known_servers_count,
)

self._rooms_excluded_from_sync = hs.config.server.rooms_to_exclude_from_sync

@wrap_as_background_process("_count_known_servers")
async def _count_known_servers(self) -> int:
"""
Expand Down Expand Up @@ -565,7 +567,9 @@ async def get_local_current_membership_for_user_in_room(

@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
self,
user_id: str,
exclude_for_sync: bool = False,
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
"""Returns a set of room_ids the user is currently joined to.

Expand All @@ -574,6 +578,8 @@ async def get_rooms_for_user_with_stream_ordering(

Args:
user_id
exclude_for_sync: Whether to exclude rooms that the configuration says
should be excluded from sync results.

Returns:
Returns the rooms the user is in currently, along with the stream
Expand All @@ -584,10 +590,11 @@ async def get_rooms_for_user_with_stream_ordering(
"get_rooms_for_user_with_stream_ordering",
self._get_rooms_for_user_with_stream_ordering_txn,
user_id,
exclude_for_sync,
)

def _get_rooms_for_user_with_stream_ordering_txn(
self, txn: LoggingTransaction, user_id: str
self, txn: LoggingTransaction, user_id: str, exclude_for_sync: bool
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
# We use `current_state_events` here and not `local_current_membership`
# as a) this gets called with remote users and b) this only gets called
Expand Down Expand Up @@ -620,6 +627,10 @@ def _get_rooms_for_user_with_stream_ordering_txn(
room_id, PersistedEventPosition(instance, stream_id)
)
for room_id, instance, stream_id in txn
if (
exclude_for_sync is False
or room_id not in self._rooms_excluded_from_sync
)
babolivier marked this conversation as resolved.
Show resolved Hide resolved
)

@cachedList(
Expand Down
21 changes: 21 additions & 0 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,3 +948,24 @@ def test_invite(self) -> None:

self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["invite"])
self.assertIn(self.included_room_id, channel.json_body["rooms"]["invite"])

def test_incremental_sync(self) -> None:
"""Tests that activity in the room is properly filtered out of incremental
syncs.
"""
channel = self.make_request("GET", "/sync", access_token=self.tok)
self.assertEqual(channel.code, 200, channel.result)
next_batch = channel.json_body["next_batch"]

self.helper.send(self.excluded_room_id, tok=self.tok)
self.helper.send(self.included_room_id, tok=self.tok)

channel = self.make_request(
"GET",
f"/sync?since={next_batch}",
access_token=self.tok,
)
self.assertEqual(channel.code, 200, channel.result)

self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"])
self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"])