Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SyncResultBuilder assembly to its own function #17202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17202.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `SyncResultBuilder` assembly to its own function.
264 changes: 148 additions & 116 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1569,12 +1569,158 @@ async def generate_sync_result(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

sync_result_builder = await self.get_sync_result_builder(
sync_config,
since_token,
full_state,
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
sync_result_builder.since_token,
sync_result_builder.now_token,
)

logger.debug("Fetching account data")

# Global account data is included if it is not filtered out.
if not sync_config.filter_collection.blocks_all_global_account_data():
await self._generate_sync_entry_for_account_data(sync_result_builder)

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
self.hs_config.server.presence_enabled
and not sync_config.filter_collection.blocks_all_presence()
)
# Device list updates are sent if a since token is provided.
include_device_list_updates = bool(since_token and since_token.device_list_key)

# If we do not care about the rooms or things which depend on the room
# data (namely presence and device list updates), then we can skip
# this process completely.
device_lists = DeviceListUpdates()
if (
not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
or include_presence_data
or include_device_list_updates
):
logger.debug("Fetching room data")

# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)

# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
# `_generate_sync_entry_for_presence` and
# `_generate_sync_entry_for_device_list` respectively.
if include_presence_data or include_device_list_updates:
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

if include_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)

if include_device_list_updates:
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)

logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)

num_events = 0

# debug for https://github.com/matrix-org/synapse/issues/9424
for joined_room in sync_result_builder.joined:
num_events += len(joined_room.timeline.events)

log_kv(
{
"joined_rooms_in_result": len(sync_result_builder.joined),
"events_in_result": num_events,
}
)

logger.debug("Sync response calculation complete")
return SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
knocked=sync_result_builder.knocked,
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)

async def get_sync_result_builder(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> "SyncResultBuilder":
"""
Assemble a `SyncResultBuilder` with all of the initial context to
start building up the sync response:

- Membership changes between the last sync and the current sync.
- Joined room IDs (minus any rooms to exclude).
- Rooms that became fully-stated/un-partial stated since the last sync.

Args:
sync_config: Config/info necessary to process the sync request.
since_token: The point in the stream to sync from.
full_state: Whether to return the full state for each room.

Returns:
`SyncResultBuilder` ready to start generating parts of the sync response.
"""
user_id = sync_config.user.to_string()

# Note: we get the users room list *before* we get the current token, this
# avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))

# NB: The now_token gets changed by some of the generate_sync_* methods,
# NB: The `now_token` gets changed by some of the `generate_sync_*` methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
Expand Down Expand Up @@ -1675,13 +1821,6 @@ async def generate_sync_result(
if room_id not in mutable_rooms_to_exclude
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
since_token,
now_token,
)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
Expand All @@ -1693,114 +1832,7 @@ async def generate_sync_result(
membership_change_events=membership_change_events,
)

logger.debug("Fetching account data")

# Global account data is included if it is not filtered out.
if not sync_config.filter_collection.blocks_all_global_account_data():
await self._generate_sync_entry_for_account_data(sync_result_builder)

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
self.hs_config.server.presence_enabled
and not sync_config.filter_collection.blocks_all_presence()
)
# Device list updates are sent if a since token is provided.
include_device_list_updates = bool(since_token and since_token.device_list_key)

# If we do not care about the rooms or things which depend on the room
# data (namely presence and device list updates), then we can skip
# this process completely.
device_lists = DeviceListUpdates()
if (
not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
or include_presence_data
or include_device_list_updates
):
logger.debug("Fetching room data")

# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)

# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
# `_generate_sync_entry_for_presence` and
# `_generate_sync_entry_for_device_list` respectively.
if include_presence_data or include_device_list_updates:
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

if include_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)

if include_device_list_updates:
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)

logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)

num_events = 0

# debug for https://github.com/matrix-org/synapse/issues/9424
for joined_room in sync_result_builder.joined:
num_events += len(joined_room.timeline.events)

log_kv(
{
"joined_rooms_in_result": len(sync_result_builder.joined),
"events_in_result": num_events,
}
)

logger.debug("Sync response calculation complete")
return SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
knocked=sync_result_builder.knocked,
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
return sync_result_builder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is just a copy and paste right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct 👍

@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
Expand Down
Loading