Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_encrypted filtering to Sliding Sync /sync #17281

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
76ce7a9
Add `is_dm` filtering to Sliding Sync `/sync`
MadLittleMods Jun 6, 2024
360f05c
Move changelog
MadLittleMods Jun 6, 2024
d8e2b1d
Add docstring
MadLittleMods Jun 6, 2024
dd43938
Reference actual filter code
MadLittleMods Jun 6, 2024
88fe201
Condense true/false tests
MadLittleMods Jun 6, 2024
44088bd
Add `is_encrypted` filtering to Sliding Sync `/sync`
MadLittleMods Jun 6, 2024
4412dbd
Update changelog number
MadLittleMods Jun 6, 2024
35b18be
Fix lints
MadLittleMods Jun 6, 2024
61f86e0
Add future todo
MadLittleMods Jun 10, 2024
578b44a
Move get_state_at() to area we can share from
MadLittleMods Jun 10, 2024
7dec930
Filter based on state at to_token
MadLittleMods Jun 10, 2024
945197b
Update docstring
MadLittleMods Jun 10, 2024
48eca7d
Less test bulk
MadLittleMods Jun 10, 2024
7aa0519
Incorporate `to_token` to filters
MadLittleMods Jun 10, 2024
a6e5798
Explain why no `to_token` for global account data
MadLittleMods Jun 10, 2024
5dd6d37
Add docstring
MadLittleMods Jun 10, 2024
d0d198f
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 10, 2024
271ae6f
Remove import workaround
MadLittleMods Jun 10, 2024
2e4627b
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into mad…
MadLittleMods Jun 10, 2024
355de36
Remove import workaround
MadLittleMods Jun 10, 2024
f69d1c5
Remove sneaky log
MadLittleMods Jun 10, 2024
bb5dfc3
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into mad…
MadLittleMods Jun 10, 2024
d752b8a
Comment no longer as useful
MadLittleMods Jun 10, 2024
9896478
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 12, 2024
eaaf408
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 13, 2024
aff2e82
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into mad…
MadLittleMods Jun 13, 2024
0ea4fdd
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 13, 2024
810e9af
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 17, 2024
8965f3b
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 17, 2024
c73391d
Fix tests
MadLittleMods Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17277.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `is_dm` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
1 change: 1 addition & 0 deletions changelog.d/17281.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `is_encrypted` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
139 changes: 133 additions & 6 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

from immutabledict import immutabledict

from synapse.api.constants import Membership
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -57,6 +58,7 @@ class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources()
Expand All @@ -69,9 +71,19 @@ async def wait_for_sync_for_user(
from_token: Optional[StreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
"""
Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.

Args:
requester: The user making the request
sync_config: Sync configuration
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
timeout_ms: The time in milliseconds to wait for new data to arrive. If 0,
we will immediately but there might not be any new data so we just return an
empty response.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand Down Expand Up @@ -143,6 +155,14 @@ async def current_sync_for_user(
"""
Generates the response body of a Sliding Sync result, represented as a
`SlidingSyncResult`.

We fetch data according to the token range (> `from_token` and <= `to_token`).

Args:
sync_config: Sync configuration
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
"""
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
Expand All @@ -163,11 +183,12 @@ async def current_sync_for_user(
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
for list_key, list_config in sync_config.lists.items():
# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
# Apply filters
filtered_room_ids = room_id_set
if list_config.filters is not None:
filtered_room_ids = await self.filter_rooms(
sync_config.user, room_id_set, list_config.filters, to_token
)
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)

Expand Down Expand Up @@ -217,6 +238,12 @@ async def get_sync_room_ids_for_user(
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.


Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.
"""
user_id = user.to_string()

Expand Down Expand Up @@ -439,3 +466,103 @@ async def get_sync_room_ids_for_user(
sync_room_id_set.add(room_id)

return sync_room_id_set

async def filter_rooms(
self,
user: UserID,
room_id_set: AbstractSet[str],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> AbstractSet[str]:
"""
Filter rooms based on the sync request.

Args:
user: User to filter rooms for
room_id_set: Set of room IDs to filter down
filters: Filters to apply
to_token: We filter based on the state of the room at this token
"""
user_id = user.to_string()

# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`

filtered_room_id_set = set(room_id_set)

# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
# We're using global account data (`m.direct`) instead of checking for
# `is_direct` on membership events because that property only appears for
# the invitee membership event (doesn't show up for the inviter). Account
# data is set by the client so it needs to be scrutinized.
#
# We're unable to take `to_token` into account for global account data since
# we only keep track of the latest account data for the user.
dm_map = await self.store.get_global_account_data_by_type_for_user(
user_id, AccountDataTypes.DIRECT
)

# Flatten out the map
dm_room_id_set = set()
if dm_map:
for room_ids in dm_map.values():
# Account data should be a list of room IDs. Ignore anything else
if isinstance(room_ids, list):
for room_id in room_ids:
if isinstance(room_id, str):
dm_room_id_set.add(room_id)

if filters.is_dm:
# Only DM rooms please
filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
else:
# Only non-DM rooms please
filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)

if filters.spaces:
raise NotImplementedError()

# Filter for encrypted rooms
if filters.is_encrypted is not None:
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
for room_id in list(filtered_room_id_set):
# TODO: Is there a good method to look up all rooms at once? (N+1 query problem)
state_at_to_token = await self.storage_controllers.state.get_state_at(
room_id,
to_token,
state_filter=StateFilter.from_types(
[(EventTypes.RoomEncryption, "")]
),
)
is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))

# If we're looking for encrypted rooms, filter out rooms that are not
# encrypted and vice versa
if (filters.is_encrypted and not is_encrypted) or (
not filters.is_encrypted and is_encrypted
):
filtered_room_id_set.remove(room_id)

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if filters.is_invite:
raise NotImplementedError()

if filters.room_types:
raise NotImplementedError()

if filters.not_room_types:
raise NotImplementedError()

if filters.room_name_like:
raise NotImplementedError()

if filters.tags:
raise NotImplementedError()

if filters.not_tags:
raise NotImplementedError()

return filtered_room_id_set
107 changes: 13 additions & 94 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,89 +981,6 @@ async def _load_filtered_recents(
bundled_aggregations=bundled_aggregations,
)

async def get_state_after_event(
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event

Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

# using get_metadata_for_events here (instead of get_event) sidesteps an issue
# with redactions: if `event_id` is a redaction event, and we don't have the
# original (possibly because it got purged), get_event will refuse to return
# the redaction event, which isn't terribly helpful here.
#
# (To be fair, in that case we could assume it's *not* a state event, and
# therefore we don't need to worry about it. But still, it seems cleaner just
# to pull the metadata.)
m = (await self.store.get_metadata_for_events([event_id]))[event_id]
if m.state_key is not None and m.rejection_reason is None:
state_ids = dict(state_ids)
state_ids[(m.event_type, m.state_key)] = event_id

return state_ids

async def get_state_at(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving get_state_at() to a shared storage controller so that we can use it in Sync v2 and Sliding sync

self,
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position

Args:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position` and
`state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)

if last_event_id:
state = await self.get_state_after_event(
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

else:
# no events in this room - so presumably no state
state = {}

# (erikj) This should be rarely hit, but we've had some reports that
# we get more state down gappy syncs than we should, so let's add
# some logging.
logger.info(
"Failed to find any events in room %s at %s",
room_id,
stream_position.room_key,
)
return state

async def compute_summary(
self,
room_id: str,
Expand Down Expand Up @@ -1437,7 +1354,7 @@ async def _compute_state_delta_for_full_sync(
await_full_state = True
lazy_load_members = False

state_at_timeline_end = await self.get_state_at(
state_at_timeline_end = await self._state_storage_controller.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
Expand Down Expand Up @@ -1565,7 +1482,7 @@ async def _compute_state_delta_for_incremental_sync(
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
state_at_timeline_start = await self._state_storage_controller.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
Expand All @@ -1587,14 +1504,14 @@ async def _compute_state_delta_for_incremental_sync(
# about them).
state_filter = StateFilter.all()

state_at_previous_sync = await self.get_state_at(
state_at_previous_sync = await self._state_storage_controller.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_at_timeline_end = await self.get_state_at(
state_at_timeline_end = await self._state_storage_controller.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
Expand Down Expand Up @@ -2593,7 +2510,7 @@ async def _get_room_changes_for_incremental_sync(
continue

if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(
old_state_ids = await self._state_storage_controller.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
Expand Down Expand Up @@ -2623,12 +2540,14 @@ async def _get_room_changes_for_incremental_sync(
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
old_state_ids = (
await self._state_storage_controller.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
)
)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
Expand Down
Loading
Loading