Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SS: Reset connection if token is unrecognized #17529

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17529.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reset the sliding sync connection if we don't recognize the per-connection state position.
18 changes: 18 additions & 0 deletions synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class Codes(str, Enum):
# MSC2677
DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"

# MSC3575 we are telling the client they need to reset their sliding sync
# connection.
UNKNOWN_POS = "M_UNKNOWN_POS"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec calls this "expiring connections" vs "resetting connections". Probably should just align all of the language to the spec because it also explains how "To handle expired connections".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this still says "reset")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nyaryiyr, will fix. (i thought I had got them all 🙁 )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



class CodeMessageException(RuntimeError):
"""An exception with integer code, a message string attributes and optional headers.
Expand Down Expand Up @@ -847,3 +851,17 @@ def __init__(self) -> None:
msg=PartialStateConflictError.message(),
errcode=Codes.UNKNOWN,
)


class SlidingSyncUnknownPosition(SynapseError):
"""An error that Synapse can return to signal to the client to expire their
sliding sync connection (i.e. send a new request without a `?since=`
param).
"""

def __init__(self) -> None:
super().__init__(
HTTPStatus.BAD_REQUEST,
msg="Unknown position",
errcode=Codes.UNKNOWN_POS,
)
27 changes: 27 additions & 0 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
EventTypes,
Membership,
)
from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.events import EventBase, StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event, strip_event
from synapse.handlers.relations import BundledAggregations
Expand Down Expand Up @@ -491,6 +492,22 @@ async def current_sync_for_user(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

if from_token:
# Check that we recognize the connection position, if not tell the
# clients that they need to start again.
Comment on lines +496 to +497
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the context for why we do this:

If we don't do this and the client asks for the full range of rooms, we end up sending down all rooms and their state from scratch (which can be very slow)

-> We do this to avoid sending down all rooms and their state from scratch (initial = True ) which can be very slow.

Comment on lines +496 to +497
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the following explanation correct?

Before this change, if the connection_position is unknown, the /sync request should just behave like an incremental sync without the cache layer and return initial = True rooms.

With this new change, we're forcing them to send another initial sync request where we will also return rooms with initial = True. The only difference is that if we reset them, they can request a smaller range and expand over time which will get them some results sooner but will end up taking the same amount of time (more with round-trips and re-processing) in the end to get everything again.


We should consider adding this context as well.

comment example

This allows the client a chance to do an initial request with a smaller range of rooms to get them some results sooner but will end up taking the same amount of time (more with round-trips and re-processing) in the end to get everything again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup!

#
# If we don't do this and the client asks for the full range of
# rooms, we end up sending down all rooms and their state from
# scratch (which can be very slow). By expiring the connection we
# allow the client a chance to do an initial request with a smaller
# range of rooms to get them some results sooner but will end up
# taking the same amount of time (more with round-trips and
# re-processing) in the end to get everything again.
if not await self.connection_store.is_valid_token(
sync_config, from_token.connection_position
):
raise SlidingSyncUnknownPosition()

await self.connection_store.mark_token_seen(
sync_config=sync_config,
from_token=from_token,
Expand Down Expand Up @@ -2821,6 +2838,16 @@ class SlidingSyncConnectionStore:
attr.Factory(dict)
)

async def is_valid_token(
self, sync_config: SlidingSyncConfig, connection_token: int
) -> bool:
"""Return whether the connection token is valid/recognized"""
if connection_token == 0:
return True

conn_key = self._get_connection_key(sync_config)
return connection_token in self._connections.get(conn_key, {})

async def have_sent_room(
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
) -> HaveSentRoom:
Expand Down
28 changes: 11 additions & 17 deletions tests/rest/client/sliding_sync/test_rooms_required_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ def test_rooms_required_state_incremental_sync(self) -> None:
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))

def test_rooms_required_state_incremental_sync_restart(self) -> None:
def test_rooms_incremental_sync_restart(self) -> None:
"""
Test `rooms.required_state` returns requested state events in the room during an
incremental sync, after a restart (and so the in memory caches are reset).
Test that after a restart (and so the in memory caches are reset) that
we correctly return an `M_UNKNOWN_POS`
"""

user1_id = self.register_user("user1", "pass")
Expand Down Expand Up @@ -195,22 +195,16 @@ def test_rooms_required_state_incremental_sync_restart(self) -> None:
self.hs.get_sliding_sync_handler().connection_store._connections.clear()

# Make the Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# If the cache has been cleared then we do expect the state to come down
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
channel = self.make_request(
method="POST",
path=self.sync_endpoint + f"?pos={from_token}",
content=sync_body,
access_token=user1_tok,
)

self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
self.assertEqual(channel.code, 400, channel.json_body)
self.assertEqual(
channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body
)
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))

def test_rooms_required_state_wildcard(self) -> None:
"""
Expand Down
Loading