Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Add typing notification extension (MSC3961) #17505

Merged
merged 10 commits into from
Jul 31, 2024
4 changes: 4 additions & 0 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ async def get_new_events(
explicit_room_id: Optional[str] = None,
to_key: Optional[MultiWriterStreamToken] = None,
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
"""
Find read receipts for given rooms (> `from_token` and <= `to_token`)
"""

if to_key is None:
to_key = self.get_current_key()

Expand Down
78 changes: 78 additions & 0 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,11 +2284,24 @@ async def get_extensions_response(
from_token=from_token,
)

typing_response = None
if sync_config.extensions.typing is not None:
typing_response = await self.get_typing_extension_response(
sync_config=sync_config,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
actual_room_response_map=actual_room_response_map,
typing_request=sync_config.extensions.typing,
to_token=to_token,
from_token=from_token,
)

return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
account_data=account_data_response,
receipts=receipts_response,
typing=typing_response,
)

def find_relevant_room_ids_for_extension(
Expand Down Expand Up @@ -2615,6 +2628,8 @@ async def get_receipts_extension_response(

room_id_to_receipt_map: Dict[str, JsonMapping] = {}
if len(relevant_room_ids) > 0:
# TODO: Take connection tracking into account so that when a room comes back
# into range we can send the receipts that were missed.
receipt_source = self.event_sources.sources.receipt
receipts, _ = await receipt_source.get_new_events(
user=sync_config.user,
Expand All @@ -2636,6 +2651,8 @@ async def get_receipts_extension_response(
type = receipt["type"]
content = receipt["content"]

# For `inital: True` rooms, we only want to include receipts for events
# in the timeline.
room_result = actual_room_response_map.get(room_id)
if room_result is not None:
if room_result.initial:
Expand All @@ -2659,6 +2676,67 @@ async def get_receipts_extension_response(
room_id_to_receipt_map=room_id_to_receipt_map,
)

async def get_typing_extension_response(
self,
sync_config: SlidingSyncConfig,
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
typing_request: SlidingSyncConfig.Extensions.TypingExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.TypingExtension]:
"""Handle Typing Notification extension (MSC3961)

Args:
sync_config: Sync configuration
actual_lists: Sliding window API. A map of list key to list results in the
Sliding Sync response.
actual_room_ids: The actual room IDs in the the Sliding Sync response.
actual_room_response_map: A map of room ID to room results in the the
Sliding Sync response.
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
# Skip if the extension is not enabled
if not typing_request.enabled:
return None

relevant_room_ids = self.find_relevant_room_ids_for_extension(
requested_lists=typing_request.lists,
requested_room_ids=typing_request.rooms,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
)

room_id_to_typing_map: Dict[str, JsonMapping] = {}
if len(relevant_room_ids) > 0:
# TODO: Take connection tracking into account so that when a room comes back
# into range we can send the typing notifications that were missed.
typing_source = self.event_sources.sources.typing
typing_notifications, _ = await typing_source.get_new_events(
user=sync_config.user,
from_key=(from_token.stream_token.typing_key if from_token else 0),
to_key=to_token.typing_key,
# This is a dummy value and isn't used in the function
limit=0,
room_ids=relevant_room_ids,
is_guest=False,
)

for typing_notification in typing_notifications:
# These fields should exist for every typing notification
room_id = typing_notification["room_id"]
type = typing_notification["type"]
content = typing_notification["content"]

room_id_to_typing_map[room_id] = {"type": type, "content": content}

return SlidingSyncResult.Extensions.TypingExtension(
room_id_to_typing_map=room_id_to_typing_map,
)


class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
Expand Down
9 changes: 8 additions & 1 deletion synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,12 @@ async def get_new_events(
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
to_key: Optional[int] = None,
) -> Tuple[List[JsonMapping], int]:
"""
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
"""

with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()
Expand All @@ -574,7 +579,9 @@ async def get_new_events(
for room_id in room_ids:
if room_id not in handler._room_serials:
continue
if handler._room_serials[room_id] <= from_key:
if handler._room_serials[room_id] <= from_key or (
to_key is not None and handler._room_serials[room_id] > to_key
):
continue

events.append(self._make_event_for(room_id))
Expand Down
6 changes: 5 additions & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,14 @@ async def encode_extensions(

if extensions.receipts is not None:
serialized_extensions["receipts"] = {
# Same as the the top-level `account_data.events` field in Sync v2.
"rooms": extensions.receipts.room_id_to_receipt_map,
}

if extensions.typing is not None:
serialized_extensions["typing"] = {
"rooms": extensions.typing.room_id_to_typing_map,
}

return serialized_extensions


Expand Down
24 changes: 22 additions & 2 deletions synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,42 @@ class ReceiptsExtension:
"""The Receipts extension (MSC3960)

Attributes:
room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
event (type, content)
"""

room_id_to_receipt_map: Mapping[str, JsonMapping]

def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)

@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingExtension:
"""The Typing Notification extension (MSC3961)

Attributes:
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
event (type, content)
"""

room_id_to_typing_map: Mapping[str, JsonMapping]

def __bool__(self) -> bool:
return bool(self.room_id_to_typing_map)

to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None

def __bool__(self) -> bool:
return bool(
self.to_device or self.e2ee or self.account_data or self.receipts
self.to_device
or self.e2ee
or self.account_data
or self.receipts
or self.typing
)

next_pos: SlidingSyncStreamToken
Expand Down
18 changes: 18 additions & 0 deletions synapse/types/rest/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,28 @@ class ReceiptsExtension(RequestBodyModel):
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]

class TypingExtension(RequestBodyModel):
"""The Typing Notification extension (MSC3961)

Attributes:
enabled
lists: List of list keys (from the Sliding Window API) to apply this
extension to.
rooms: List of room IDs (from the Room Subscription API) to apply this
extension to.
"""

enabled: Optional[StrictBool] = False
# Process all lists defined in the Sliding Window API. (This is the default.)
lists: Optional[List[StrictStr]] = ["*"]
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]

to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None

conn_id: Optional[str]

Expand Down
20 changes: 20 additions & 0 deletions tests/rest/client/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
Loading
Loading