Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Increase perf of handling presence when joining large rooms. (#9916)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored May 5, 2021
1 parent e2a4435 commit 37623e3
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 82 deletions.
1 change: 1 addition & 0 deletions changelog.d/9916.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance after joining a large room when presence is enabled.
154 changes: 82 additions & 72 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,16 @@ async def _unsafe_process(self) -> None:
max_pos, deltas = await self.store.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
await self._handle_state_delta(deltas)

# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
deltas_by_room: Dict[str, List[JsonDict]] = {}
for delta in deltas:
deltas_by_room.setdefault(delta["room_id"], []).append(delta)

for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)

self._event_pos = max_pos

Expand All @@ -1192,17 +1201,21 @@ async def _unsafe_process(self) -> None:
max_pos
)

async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
"""Process current state deltas to find new joins that need to be
handled.
async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
# A map of destination to a set of user state that they should receive
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]

# Sets of newly joined users. Note that if the local server is
# joining a remote room for the first time we'll see both the joining
# user and all remote users as newly joined.
newly_joined_users = set()

for delta in deltas:
assert room_id == delta["room_id"]

typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

Expand Down Expand Up @@ -1231,72 +1244,55 @@ async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
# Ignore changes to join events.
continue

# Retrieve any user presence state updates that need to be sent as a result,
# and the destinations that need to receive it
destinations, user_presence_states = await self._on_user_joined_room(
room_id, state_key
)

# Insert the destinations and respective updates into our destinations dict
for destination in destinations:
presence_destinations.setdefault(destination, set()).update(
user_presence_states
)

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self._federation_queue.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

async def _on_user_joined_room(
self, room_id: str, user_id: str
) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
delta stream. Returns the destinations that need to be updated and the
presence updates to send to them.
Args:
room_id: The ID of the room that the user has joined.
user_id: The ID of the user that has joined the room.
Returns:
A tuple of destinations and presence updates to send to them.
"""
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

remote_hosts = await self.state.get_current_hosts_in_room(room_id)
newly_joined_users.add(state_key)

# Filter out ourselves.
filtered_remote_hosts = [
host for host in remote_hosts if host != self.server_name
]

state = await self.current_state_for_user(user_id)
return filtered_remote_hosts, [state]
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.

# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen

# TODO: Check that this is actually a new server joining the
# room.

remote_host = get_domain_from_id(user_id)
if not newly_joined_users:
# If nobody has joined then there's nothing to do.
return

users = await self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))
# We want to send:
# 1. presence states of all local users in the room to newly joined
# remote servers
# 2. presence states of newly joined users to all remote servers in
# the room.
#
# TODO: Only send presence states to remote hosts that don't already
# have them (because they already share rooms).

# Get all the users who were already in the room, by fetching the
# current users in the room and removing the newly joined users.
users = await self.store.get_users_in_room(room_id)
prev_users = set(users) - newly_joined_users

# Construct sets for all the local users and remote hosts that were
# already in the room
prev_local_users = []
prev_remote_hosts = set()
for user_id in prev_users:
if self.is_mine_id(user_id):
prev_local_users.append(user_id)
else:
prev_remote_hosts.add(get_domain_from_id(user_id))

# Similarly, construct sets for all the local users and remote hosts
# that were *not* already in the room. Care needs to be taken with the
# calculating the remote hosts, as a host may have already been in the
# room even if there is a newly joined user from that host.
newly_joined_local_users = []
newly_joined_remote_hosts = set()
for user_id in newly_joined_users:
if self.is_mine_id(user_id):
newly_joined_local_users.append(user_id)
else:
host = get_domain_from_id(user_id)
if host not in prev_remote_hosts:
newly_joined_remote_hosts.add(host)

states_d = await self.current_state_for_users(user_ids)
# Send presence states of all local users in the room to newly joined
# remote servers. (We actually only send states for local users already
# in the room, as we'll send states for newly joined local users below.)
if prev_local_users and newly_joined_remote_hosts:
local_states = await self.current_state_for_users(prev_local_users)

# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
Expand All @@ -1306,13 +1302,27 @@ async def _on_user_joined_room(
now = self.clock.time_msec()
states = [
state
for state in states_d.values()
for state in local_states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]

return [remote_host], states
self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)

# Send presence states of newly joined users to all remote servers in
# the room
if newly_joined_local_users and (
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)


def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
Expand Down
14 changes: 4 additions & 10 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def test_remote_joins(self):
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server2"], states={expected_state}
destinations={"server2"}, states=[expected_state]
)

#
Expand All @@ -740,7 +740,7 @@ def test_remote_joins(self):
self._add_new_user(room_id, "@bob:server3")

self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server3"], states={expected_state}
destinations={"server3"}, states=[expected_state]
)

def test_remote_gets_presence_when_local_user_joins(self):
Expand Down Expand Up @@ -788,14 +788,8 @@ def test_remote_gets_presence_when_local_user_joins(self):
self.presence_handler.current_state_for_user("@test2:server")
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.assertEqual(
self.federation_sender.send_presence_to_destinations.call_count, 2
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server3"], states={expected_state}
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server2"], states={expected_state}
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations={"server2", "server3"}, states=[expected_state]
)

def _add_new_user(self, room_id, user_id):
Expand Down

0 comments on commit 37623e3

Please sign in to comment.