Skip to content

Commit

Permalink
Make /key/changes and SS use same path as sync
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Aug 7, 2024
1 parent 6576ecd commit c786948
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 111 deletions.
187 changes: 76 additions & 111 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)

from synapse.api import errors
from synapse.api.constants import EduTypes, EventTypes
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import (
Codes,
FederationDeniedError,
Expand All @@ -48,6 +48,7 @@
wrap_as_background_process,
)
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
DeviceListUpdates,
JsonDict,
Expand Down Expand Up @@ -232,129 +233,93 @@ async def get_user_ids_changed(

set_tag("user_id", user_id)
set_tag("from_token", str(from_token))
now_room_key = self.store.get_room_max_token()

room_ids = await self.store.get_rooms_for_user(user_id)

changed = await self.get_device_changes_in_shared_rooms(
user_id, room_ids, from_token
)

# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)

member_events = await self.store.get_membership_changes_for_user(
user_id, from_token.room_key, now_room_key
)
rooms_changed.update(event.room_id for event in member_events)
now_token = self._event_sources.get_current_token()

stream_ordering = from_token.room_key.stream
joined_room_ids = await self.store.get_rooms_for_user(user_id)

possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
# Check if the forward extremities have changed. If not then we know
# the current state won't have changed, and so we can skip this room.
try:
if not await self.store.have_room_forward_extremities_changed_since(
room_id, stream_ordering
):
continue
except errors.StoreError:
pass

current_state_ids = await self._state_storage.get_current_state_ids(
room_id, await_full_state=False
membership_changes = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id, from_key=from_token.room_key, to_key=now_token.room_key
)
)

# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
continue
newly_joined_rooms = set()
newly_left_rooms = set()
for change in membership_changes:
if change.membership == Membership.JOIN:
if change.prev_membership != Membership.JOIN:
newly_joined_rooms.add(change.room_id)
elif change.prev_membership == Membership.JOIN:
newly_left_rooms.add(change.room_id)

# Fetch the current state at the time.
try:
event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
room_id, stream_ordering=stream_ordering
)
except errors.StoreError:
# we have purged the stream_ordering index since the stream
# ordering: treat it the same as a new room
event_ids = []

# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
newly_left_rooms -= newly_joined_rooms

current_member_id = current_state_ids.get((EventTypes.Member, user_id))
if not current_member_id:
continue
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(
joined_room_ids, from_token.room_key
)

# mapping from event_id -> state_dict
prev_state_ids = await self._state_storage.get_state_ids_for_events(
event_ids,
await_full_state=False,
room_to_deltas: Dict[str, List[StateDelta]] = {}
memberships_to_fetch: Set[str] = set()
for room_id in rooms_changed:
# TODO: Only pull out membership events?
state_changes = await self.store.get_current_state_deltas_for_room(
room_id, from_token=from_token.room_key, to_token=now_token.room_key
)

# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
for state_dict in prev_state_ids.values():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
break

# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
for delta in state_changes:
if delta.event_type != EventTypes.Member:
continue

# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
if state_key != user_id:
possibly_changed.add(state_key)
break

if possibly_changed or possibly_left:
possibly_joined = possibly_changed
possibly_left = possibly_changed | possibly_left

# Double check if we still share rooms with the given user.
users_rooms = await self.store.get_rooms_for_users(possibly_left)
for changed_user_id, entries in users_rooms.items():
if any(rid in room_ids for rid in entries):
possibly_left.discard(changed_user_id)
else:
possibly_joined.discard(changed_user_id)
room_to_deltas.setdefault(room_id, []).append(delta)
if delta.event_id:
memberships_to_fetch.add(delta.event_id)
if delta.prev_event_id:
memberships_to_fetch.add(delta.prev_event_id)

else:
possibly_joined = set()
possibly_left = set()
event_id_to_memberships = await self.store.get_membership_from_event_ids(
memberships_to_fetch
)

device_list_updates = DeviceListUpdates(
changed=possibly_joined,
left=possibly_left,
joined_invited_knocked = (
Membership.JOIN,
Membership.INVITE,
Membership.KNOCK,
)

newly_joined_or_invited_or_knocked_users = set()
newly_left_users = set()
for _, deltas in room_to_deltas.items():
for delta in deltas:
new_membership = None
prev_membership = None

if delta.event_id:
m = event_id_to_memberships.get(delta.event_id)
if m is not None:
new_membership = m.membership
if delta.prev_event_id:
m = event_id_to_memberships.get(delta.prev_event_id)
if m is not None:
prev_membership = m.membership

if new_membership in joined_invited_knocked:
if prev_membership not in joined_invited_knocked:
newly_joined_or_invited_or_knocked_users.add(delta.state_key)
elif prev_membership in joined_invited_knocked:
newly_left_users.add(delta.state_key)

newly_left_users -= newly_joined_or_invited_or_knocked_users

device_list_updates = await self.generate_sync_entry_for_device_list(
user_id=user_id,
since_token=from_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

log_kv(
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ async def get_current_state_deltas_for_room(
) -> List[StateDelta]:
"""Get the state deltas between two tokens."""

if not self._curr_state_delta_stream_cache.has_entity_changed(
room_id, from_token.stream
):
return []

def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction,
) -> List[StateDelta]:
Expand Down

0 comments on commit c786948

Please sign in to comment.