Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up /keys/changes #17548

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17548.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance of device lists in `/key/changes` and sliding sync.
29 changes: 13 additions & 16 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,29 +269,26 @@ async def get_user_ids_changed(
# We now work out if any other users have since joined or left the rooms
# the user is currently in. First we filter out rooms that we know
# haven't changed recently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is unnecessary now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least the filter part

rooms_changed = self.store.get_rooms_that_changed(
joined_room_ids, from_token.room_key
)

# List of membership changes per room
room_to_deltas: Dict[str, List[StateDelta]] = {}
# The set of event IDs of membership events (so we can fetch their
# associated membership).
memberships_to_fetch: Set[str] = set()
for room_id in rooms_changed:
# TODO: Only pull out membership events?
state_changes = await self.store.get_current_state_deltas_for_room(
room_id, from_token=from_token.room_key, to_token=now_token.room_key
)
for delta in state_changes:
if delta.event_type != EventTypes.Member:
continue

room_to_deltas.setdefault(room_id, []).append(delta)
if delta.event_id:
memberships_to_fetch.add(delta.event_id)
if delta.prev_event_id:
memberships_to_fetch.add(delta.prev_event_id)
# TODO: Only pull out membership events?
state_changes = await self.store.get_current_state_deltas_for_rooms(
joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key
)
for delta in state_changes:
if delta.event_type != EventTypes.Member:
continue

room_to_deltas.setdefault(delta.room_id, []).append(delta)
if delta.event_id:
memberships_to_fetch.add(delta.event_id)
if delta.prev_event_id:
memberships_to_fetch.add(delta.prev_event_id)

# Fetch all the memberships for the membership events
event_id_to_memberships = await self.store.get_membership_from_event_ids(
Expand Down
64 changes: 62 additions & 2 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@

from synapse.logging.opentracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.types import RoomStreamToken
from synapse.types import RoomStreamToken, StrCollection
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -200,3 +201,62 @@ def get_current_state_deltas_for_room_txn(
return await self.db_pool.runInteraction(
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
)

@trace
async def get_current_state_deltas_for_rooms(
self,
room_ids: StrCollection,
from_token: RoomStreamToken,
to_token: RoomStreamToken,
) -> List[StateDelta]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a batch version of the DB function directly above

"""Get the state deltas between two tokens for the set of rooms."""

room_ids = self._curr_state_delta_stream_cache.get_entities_changed(
room_ids, from_token.stream
)
if not room_ids:
return []

def get_current_state_deltas_for_rooms_txn(
txn: LoggingTransaction,
room_ids: StrCollection,
) -> List[StateDelta]:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

sql = f"""
SELECT instance_name, stream_id, room_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE {clause} AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
args.append(from_token.stream)
args.append(to_token.get_max_stream_pos())

txn.execute(sql, args)

return [
StateDelta(
stream_id=row[1],
room_id=row[2],
event_type=row[3],
state_key=row[4],
event_id=row[5],
prev_event_id=row[6],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]

results = []
for batch in batch_iter(room_ids, 1000):
deltas = await self.db_pool.runInteraction(
"get_current_state_deltas_for_rooms",
get_current_state_deltas_for_rooms_txn,
batch,
)

results.extend(deltas)

return results
Loading