Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle to-device extensions to Sliding Sync #17416

Merged
merged 9 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17416.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add to-device support to experimental sliding sync implementation.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
89 changes: 88 additions & 1 deletion synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,11 +526,15 @@ async def current_sync_for_user(

rooms[room_id] = room_sync_result

extensions = await self.get_extensions_response(
sync_config=sync_config, to_token=to_token
)

return SlidingSyncResult(
next_pos=to_token,
lists=lists,
rooms=rooms,
extensions={},
extensions=extensions,
)

async def get_sync_room_ids_for_user(
Expand Down Expand Up @@ -1367,3 +1371,86 @@ async def get_room_sync_data(
notification_count=0,
highlight_count=0,
)

async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
) -> SlidingSyncResult.Extensions:
"""Handle extension requests."""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

if sync_config.extensions is None:
return SlidingSyncResult.Extensions()

to_device_response = None
if sync_config.extensions.to_device:
to_device_response = await self.get_to_device_extensions_response(
sync_config=sync_config,
to_device_request=sync_config.extensions.to_device,
to_token=to_token,
)

return SlidingSyncResult.Extensions(to_device=to_device_response)

async def get_to_device_extensions_response(
self,
sync_config: SlidingSyncConfig,
to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
to_token: StreamToken,
) -> SlidingSyncResult.Extensions.ToDeviceExtension:
"""Handle to-device extension (MSC3885)"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

user_id = sync_config.user.to_string()
device_id = sync_config.device_id

# Check that this request has a valid device ID (not all requests have
# to belong to a device, and so device_id is None), and that the
# extension is enabled.
if device_id is None or not to_device_request.enabled:
return SlidingSyncResult.Extensions.ToDeviceExtension(
next_batch=f"{to_token.to_device_key}",
events=[],
)

since_stream_id = 0
if to_device_request.since:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# We've already validated this is an int.
since_stream_id = int(to_device_request.since)

if to_token.to_device_key < since_stream_id:
# The since token is ahead of our current token, so we return an
# empty response.
logger.warning(
"Got to-device.since from the future. Next_batch: %r, current: %r",
since_stream_id,
to_token.to_device_key,
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return SlidingSyncResult.Extensions.ToDeviceExtension(
next_batch=to_device_request.since,
events=[],
)

# Delete everything before the given since token, as we know the
# device must have received them.
deleted = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=since_stream_id,
)

logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
)

messages, stream_id = await self.store.get_messages_for_device(
user_id=user_id,
device_id=device_id,
from_stream_id=since_stream_id,
to_stream_id=to_token.to_device_key,
limit=min(to_device_request.limit, 100), # Limit to at most 100 events
)

return SlidingSyncResult.Extensions.ToDeviceExtension(
next_batch=f"{stream_id}",
events=messages,
)
17 changes: 16 additions & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,9 @@ async def encode_response(
response["rooms"] = await self.encode_rooms(
requester, sliding_sync_result.rooms
)
response["extensions"] = {} # TODO: sliding_sync_result.extensions
response["extensions"] = await self.encode_extensions(
requester, sliding_sync_result.extensions
)

return response

Expand Down Expand Up @@ -1053,6 +1055,19 @@ async def encode_rooms(

return serialized_rooms

async def encode_extensions(
self, requester: Requester, extensions: SlidingSyncResult.Extensions
) -> JsonDict:
result = {}

if extensions.to_device is not None:
result["to_device"] = {
"next_batch": extensions.to_device.next_batch,
"events": extensions.to_device.events,
}

return result


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)
Expand Down
23 changes: 20 additions & 3 deletions synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#
#
from enum import Enum
from typing import TYPE_CHECKING, Dict, Final, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Final, List, Optional, Sequence, Tuple

import attr
from typing_extensions import TypedDict
Expand Down Expand Up @@ -244,10 +244,27 @@ class Operation:
count: int
ops: List[Operation]

@attr.s(slots=True, frozen=True, auto_attribs=True)
class Extensions:
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ToDeviceExtension:
"""The to-device extension (MSC3885)"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

next_batch: str
events: Sequence[JsonMapping]

def __bool__(self) -> bool:
return bool(self.events)

to_device: Optional[ToDeviceExtension] = None

def __bool__(self) -> bool:
return bool(self.to_device)

next_pos: StreamToken
lists: Dict[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: JsonMapping
extensions: Extensions

def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
Expand All @@ -263,5 +280,5 @@ def empty(next_pos: StreamToken) -> "SlidingSyncResult":
next_pos=next_pos,
lists={},
rooms={},
extensions={},
extensions=SlidingSyncResult.Extensions(),
)
37 changes: 32 additions & 5 deletions synapse/types/rest/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,45 @@ class Filters(RequestBodyModel):
class RoomSubscription(CommonRoomParameters):
pass

class Extension(RequestBodyModel):
enabled: Optional[StrictBool] = False
lists: Optional[List[StrictStr]] = None
rooms: Optional[List[StrictStr]] = None
class Extensions(RequestBodyModel):
"""The extensions section of the request."""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

class ToDeviceExtension(RequestBodyModel):
"""The to-device extension (MSC3885)

Args:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
enabled
limit: Maximum number of to-device messages to return
since: The `next_batch` from the previous sync response
"""

enabled: Optional[StrictBool] = False
limit: StrictInt = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some sort of limit here: conint(le=100, strict=True) (see other examples in this file)

It looks like we try to enforce this in the downstream code right now but probably better consolidated here with the rest of things we can figure out right away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec says that clients can request whatever limit they want, but the server is free to return fewer messages if it wants. So I don't think we should reject requests with higher limits.

since: Optional[StrictStr] = None

@validator("since")
def lists_length_check(
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
cls, value: Optional[StrictStr]
) -> Optional[StrictStr]:
if value is None:
return value

try:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
int(value)
except ValueError:
raise ValueError("'extensions.to_device.since' is invalid")
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

return value

to_device: Optional[ToDeviceExtension] = None

# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING:
lists: Optional[Dict[str, SlidingSyncList]] = None
else:
lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None # type: ignore[valid-type]
room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None
extensions: Optional[Dict[StrictStr, Extension]] = None
extensions: Optional[Extensions] = None

@validator("lists")
def lists_length_check(
Expand Down
Loading
Loading