Skip to content

Commit

Permalink
Refactor Sync handler to be able to return different sync responses (…
Browse files Browse the repository at this point in the history
…`SyncVersion`) (#17200)

Refactor Sync handler to be able to be able to return different sync
responses (`SyncVersion`). Preparation to be able support sync v2 and a
new Sliding Sync `/sync/e2ee` endpoint which returns a subset of sync
v2.

Split upon request:
#17167 (comment)

Split from #17167 where we
will add `SyncVersion.E2EE_SYNC` and a new type of sync response.
  • Loading branch information
MadLittleMods authored May 16, 2024
1 parent 2359c64 commit d2d48cc
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 25 deletions.
1 change: 1 addition & 0 deletions changelog.d/17200.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prepare sync handler to be able to return different sync responses (`SyncVersion`).
65 changes: 58 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -112,6 +113,23 @@
SyncRequestKey = Tuple[Any, ...]


class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.
This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""

# These string values are semantically significant because they are used in the the
# metrics

# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
Expand Down Expand Up @@ -309,13 +327,25 @@ async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Args:
requester: The user requesting the sync response.
sync_config: Config/info necessary to process the sync request.
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.
timeout: How long to wait for new data to arrive before giving up.
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -327,6 +357,7 @@ async def wait_for_sync_for_user(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
Expand All @@ -338,6 +369,7 @@ async def wait_for_sync_for_user(
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
Expand All @@ -363,9 +395,11 @@ async def _wait_for_sync_for_user(
else:
sync_type = "incremental_sync"

sync_label = f"{sync_version}:{sync_type}"

context = current_context()
if context:
context.tag = sync_type
context.tag = sync_label

# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
Expand All @@ -384,14 +418,16 @@ async def _wait_for_sync_for_user(
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand All @@ -416,13 +452,14 @@ async def current_sync_callback(
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()

return result

async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
Expand All @@ -431,12 +468,26 @@ async def current_sync_for_user(
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.
Args:
sync_config: Config/info necessary to process the sync request.
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.p.
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: SyncResult = await self.generate_sync_result(
sync_config, since_token, full_state
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncVersion,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
Expand Down Expand Up @@ -232,6 +233,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
since_token=since_token,
timeout=timeout,
full_state=full_state,
Expand Down
4 changes: 2 additions & 2 deletions tests/events/test_presence_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from synapse.types import JsonDict, StreamToken, create_requester
from synapse.util import Clock

from tests.handlers.test_sync import generate_sync_config
from tests.handlers.test_sync import SyncVersion, generate_sync_config
from tests.unittest import (
FederatingHomeserverTestCase,
HomeserverTestCase,
Expand Down Expand Up @@ -521,7 +521,7 @@ def sync_presence(
sync_config = generate_sync_config(requester.user.to_string())
sync_result = testcase.get_success(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester, sync_config, since_token
requester, sync_config, SyncVersion.SYNC_V2, since_token
)
)

Expand Down
Loading

0 comments on commit d2d48cc

Please sign in to comment.