Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SyncResultBuilder assembly to its own function #17202

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17202.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `SyncResultBuilder` assembly to its own function.
266 changes: 149 additions & 117 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,128 +1518,17 @@ async def generate_sync_result(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

# Note: we get the users room list *before* we get the current token, this
# avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

# Since we fetched the users room list before the token, there's a small window
# during which membership events may have been persisted, so we fetch these now
# and modify the joined room list for any changes between the get_rooms_for_user
# call and the get_current_token call.
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
since_token.room_key,
now_token.room_key,
self.rooms_to_exclude_globally,
)

mem_last_change_by_room_id: Dict[str, EventBase] = {}
for event in membership_change_events:
mem_last_change_by_room_id[event.room_id] = event

# For the latest membership event in each room found, add/remove the room ID
# from the joined room list accordingly. In this case we only care if the
# latest change is JOIN.

for room_id, event in mem_last_change_by_room_id.items():
assert event.internal_metadata.stream_ordering
if (
event.internal_metadata.stream_ordering
< token_before_rooms.room_key.stream
):
continue

logger.info(
"User membership change between getting rooms and current token: %s %s %s",
user_id,
event.membership,
room_id,
)
# User joined a room - we have to then check the room state to ensure we
# respect any bans if there's a race between the join and ban events.
if event.membership == Membership.JOIN:
user_ids_in_room = await self.store.get_users_in_room(room_id)
if user_id in user_ids_in_room:
mutable_joined_room_ids.add(room_id)
# The user left the room, or left and was re-invited but not joined yet
else:
mutable_joined_room_ids.discard(room_id)

# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
if not sync_config.filter_collection.lazy_load_members():
# Non-lazy syncs should never include partially stated rooms.
# Exclude all partially stated rooms from this sync.
results = await self.store.is_partial_state_room_batched(
mutable_joined_room_ids
)
mutable_rooms_to_exclude.update(
room_id
for room_id, is_partial_state in results.items()
if is_partial_state
)
membership_change_events = [
event
for event in membership_change_events
if not results.get(event.room_id, False)
]

# Incremental eager syncs should additionally include rooms that
# - we are joined to
# - are full-stated
# - became fully-stated at some point during the sync period
# (These rooms will have been omitted during a previous eager sync.)
forced_newly_joined_room_ids: Set[str] = set()
if since_token and not sync_config.filter_collection.lazy_load_members():
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
since_token.un_partial_stated_rooms_key,
now_token.un_partial_stated_rooms_key,
mutable_joined_room_ids,
)
)
results = await self.store.is_partial_state_room_batched(
un_partial_stated_rooms
)
forced_newly_joined_room_ids.update(
room_id
for room_id, is_partial_state in results.items()
if not is_partial_state
)

# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
room_id
for room_id in mutable_joined_room_ids
if room_id not in mutable_rooms_to_exclude
sync_result_builder = await self.get_sync_result_builder(
sync_config,
since_token,
full_state,
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
since_token,
now_token,
)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events,
sync_result_builder.since_token,
sync_result_builder.now_token,
)

logger.debug("Fetching account data")
Expand Down Expand Up @@ -1751,6 +1640,149 @@ async def generate_sync_result(
next_batch=sync_result_builder.now_token,
)

async def get_sync_result_builder(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> "SyncResultBuilder":
"""
Assemble a `SyncResultBuilder` with all of the initial context to
start building up the sync response:

- Membership changes between the last sync and the current sync.
- Joined room IDs (minus any rooms to exclude).
- Rooms that became fully-stated/un-partial stated since the last sync.

Args:
sync_config: Config/info necessary to process the sync request.
since_token: The point in the stream to sync from.
full_state: Whether to return the full state for each room.

Returns:
`SyncResultBuilder` ready to start generating parts of the sync response.
"""
user_id = sync_config.user.to_string()

# NB: The `now_token` gets changed by some of the `generate_sync_*` methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

# Note: we get the users room list *before* we get the current token, this
# avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))

# Since we fetched the users room list before the token, there's a small window
# during which membership events may have been persisted, so we fetch these now
# and modify the joined room list for any changes between the get_rooms_for_user
# call and the get_current_token call.
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id,
since_token.room_key,
now_token.room_key,
self.rooms_to_exclude_globally,
)

mem_last_change_by_room_id: Dict[str, EventBase] = {}
for event in membership_change_events:
mem_last_change_by_room_id[event.room_id] = event

# For the latest membership event in each room found, add/remove the room ID
# from the joined room list accordingly. In this case we only care if the
# latest change is JOIN.

for room_id, event in mem_last_change_by_room_id.items():
assert event.internal_metadata.stream_ordering
if (
event.internal_metadata.stream_ordering
< token_before_rooms.room_key.stream
):
continue

logger.info(
"User membership change between getting rooms and current token: %s %s %s",
user_id,
event.membership,
room_id,
)
# User joined a room - we have to then check the room state to ensure we
# respect any bans if there's a race between the join and ban events.
if event.membership == Membership.JOIN:
user_ids_in_room = await self.store.get_users_in_room(room_id)
if user_id in user_ids_in_room:
mutable_joined_room_ids.add(room_id)
# The user left the room, or left and was re-invited but not joined yet
else:
mutable_joined_room_ids.discard(room_id)

# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
if not sync_config.filter_collection.lazy_load_members():
# Non-lazy syncs should never include partially stated rooms.
# Exclude all partially stated rooms from this sync.
results = await self.store.is_partial_state_room_batched(
mutable_joined_room_ids
)
mutable_rooms_to_exclude.update(
room_id
for room_id, is_partial_state in results.items()
if is_partial_state
)
membership_change_events = [
event
for event in membership_change_events
if not results.get(event.room_id, False)
]

# Incremental eager syncs should additionally include rooms that
# - we are joined to
# - are full-stated
# - became fully-stated at some point during the sync period
# (These rooms will have been omitted during a previous eager sync.)
forced_newly_joined_room_ids: Set[str] = set()
if since_token and not sync_config.filter_collection.lazy_load_members():
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
since_token.un_partial_stated_rooms_key,
now_token.un_partial_stated_rooms_key,
mutable_joined_room_ids,
)
)
results = await self.store.is_partial_state_room_batched(
un_partial_stated_rooms
)
forced_newly_joined_room_ids.update(
room_id
for room_id, is_partial_state in results.items()
if not is_partial_state
)

# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
room_id
for room_id in mutable_joined_room_ids
if room_id not in mutable_rooms_to_exclude
)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events,
)

return sync_result_builder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is just a copy and paste right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct 👍

@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
Expand Down
Loading