Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Sync handler to be able to return different sync responses (SyncVersion) #17200

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17200.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prepare sync handler to be able to return different sync responses (`SyncVersion`).
65 changes: 58 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -112,6 +113,23 @@
SyncRequestKey = Tuple[Any, ...]


class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.

This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""

# These string values are semantically significant because they are used in the the
# metrics

# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
Expand Down Expand Up @@ -309,13 +327,25 @@ async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.

Args:
requester: The user requesting the sync response.
sync_config: Config/info necessary to process the sync request.
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.
timeout: How long to wait for new data to arrive before giving up.
full_state: Whether to return the full state for each room.

Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -327,6 +357,7 @@ async def wait_for_sync_for_user(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
Expand All @@ -338,6 +369,7 @@ async def wait_for_sync_for_user(
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
Expand All @@ -363,9 +395,11 @@ async def _wait_for_sync_for_user(
else:
sync_type = "incremental_sync"

sync_label = f"{sync_version}:{sync_type}"

context = current_context()
if context:
context.tag = sync_type
context.tag = sync_label

# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
Expand All @@ -384,14 +418,16 @@ async def _wait_for_sync_for_user(
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand All @@ -416,13 +452,14 @@ async def current_sync_callback(
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()

return result

async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
Expand All @@ -431,12 +468,26 @@ async def current_sync_for_user(
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.

Args:
sync_config: Config/info necessary to process the sync request.
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.p.
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: SyncResult = await self.generate_sync_result(
sync_config, since_token, full_state
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncVersion,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
Expand Down Expand Up @@ -232,6 +233,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
since_token=since_token,
timeout=timeout,
full_state=full_state,
Expand Down
4 changes: 2 additions & 2 deletions tests/events/test_presence_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from synapse.types import JsonDict, StreamToken, create_requester
from synapse.util import Clock

from tests.handlers.test_sync import generate_sync_config
from tests.handlers.test_sync import SyncVersion, generate_sync_config
from tests.unittest import (
FederatingHomeserverTestCase,
HomeserverTestCase,
Expand Down Expand Up @@ -521,7 +521,7 @@ def sync_presence(
sync_config = generate_sync_config(requester.user.to_string())
sync_result = testcase.get_success(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester, sync_config, since_token
requester, sync_config, SyncVersion.SYNC_V2, since_token
)
)

Expand Down
Loading
Loading