Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add basic tests for sync/pagination with vector clock tokens. #8488

Merged
merged 6 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8488.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow events to be sent to clients sooner when using sharded event persisters.
20 changes: 17 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ async def create_room(
config: JsonDict,
ratelimit: bool = True,
creator_join_profile: Optional[JsonDict] = None,
requested_room_id: Optional[str] = None,
) -> Tuple[dict, int]:
""" Creates a new room.

Expand All @@ -576,6 +577,10 @@ async def create_room(
values to go in the body of the 'join' event (typically
`avatar_url` and/or `displayname`.

requested_room_id: Allow callees to request a particular room ID.
This is useful for testing event persistence sharding, and
*should not* be exposed to clients.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like a bit of a footgun waiting to be misused. Can you call store_room and _send_events_for_new_room from the test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively: patch out _generate_room_id.

Returns:
First, a dict containing the keys `room_id` and, if an alias
was, requested, `room_alias`. Secondly, the stream_id of the
Expand Down Expand Up @@ -678,9 +683,18 @@ async def create_room(
visibility = config.get("visibility", None)
is_public = visibility == "public"

room_id = await self._generate_room_id(
creator_id=user_id, is_public=is_public, room_version=room_version,
)
if requested_room_id:
await self.store.store_room(
room_id=requested_room_id,
room_creator_user_id=user_id,
is_public=is_public,
room_version=room_version,
)
room_id = requested_room_id
else:
room_id = await self._generate_room_id(
creator_id=user_id, is_public=is_public, room_version=room_version,
)

# Check whether this visibility value is blocked by a third party module
allowed_by_third_party_rules = await (
Expand Down
171 changes: 171 additions & 0 deletions tests/replication/test_sharded_event_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.rest.client.v2_alpha import sync
from synapse.types import create_requester

from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import USE_POSTGRES_FOR_TESTS
Expand All @@ -36,6 +38,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
sync.register_servlets,
]

def prepare(self, reactor, clock, hs):
Expand Down Expand Up @@ -100,3 +103,171 @@ def test_basic(self):

self.assertTrue(persisted_on_1)
self.assertTrue(persisted_on_2)

def test_vector_clock_token(self):
"""Tests that using a stream token with a vector clock component works
correctly with basic /sync and /messages usage.
"""

self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "worker1"},
)

self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "worker2"},
)

sync_hs = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "sync"},
)

# Specially selected room IDs that get persisted on different workers.
room_id1 = "!foo:test"
room_id2 = "!baz:test"

self.assertEqual(
self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
)
self.assertEqual(
self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
)

user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")
requester = create_requester(user_id, access_token)

store = self.hs.get_datastore()

# Create two room on the different workers.
room_creator = self.hs.get_room_creation_handler()
self.get_success(
room_creator.create_room(requester, {}, requested_room_id=room_id1), by=0.1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that the by params are needed here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why, but we seem to need it whenever we send HTTP requests (e.g. the render methods automatically advance the clock repeatedly until a result comes in), which happens here where it sends HTTP replication requests to send events.

)
self.get_success(
room_creator.create_room(requester, {}, requested_room_id=room_id2), by=0.1
)

# The other user joins
self.helper.join(
room=room_id1, user=self.other_user_id, tok=self.other_access_token
)
self.helper.join(
room=room_id2, user=self.other_user_id, tok=self.other_access_token
)

# Do an initial sync so that we're up to date.
request, channel = self.make_request("GET", "/sync", access_token=access_token)
self.render_on_worker(sync_hs, request)
next_batch = channel.json_body["next_batch"]

# We now fetch and throw away a stream ID so that there will be a gap in
# the stream orderings. This means that the MultiWriterIdGenerators
# won't be able to intelligently "roll foward" the persisted upto
# position, resulting in a RoomStreamToken that has non-empty instance
# map component.
#
# Note: ideally we'd try to simulate one event persister getting behind
# in a more realistic way, but that involves adding quite a bit of code
# to support doing that.
self.get_success(
store.db_pool.execute(
"test", None, "SELECT nextval(?)", "events_stream_seq"
)
)

response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
first_event_in_room1 = response["event_id"]

# Assert that the current stream token has an instance map component, as
# we are trying to test vector clock tokens.
room_stream_token = store.get_room_max_token()
self.assertNotEqual(len(room_stream_token.instance_map), 0)

# Check that syncing still gets the new event, despite the gap in the
# stream IDs.
request, channel = self.make_request(
"GET", "/sync?since={}".format(next_batch), access_token=access_token
)
self.render_on_worker(sync_hs, request)

# We should only see the new event and nothing else
self.assertIn(room_id1, channel.json_body["rooms"]["join"])
self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])

events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
self.assertListEqual(
[first_event_in_room1], [event["event_id"] for event in events]
)

# Get the next batch and makes sure its a vector clock style token.
vector_clock_token = channel.json_body["next_batch"]
self.assertTrue(vector_clock_token.startswith("m"))

# Now try and send an event to the other rooom so that we can test that
# the vector clock style token works as a `since` token.
response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
first_event_in_room2 = response["event_id"]

request, channel = self.make_request(
"GET",
"/sync?since={}".format(vector_clock_token),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)

self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
self.assertIn(room_id2, channel.json_body["rooms"]["join"])

events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
self.assertListEqual(
[first_event_in_room2], [event["event_id"] for event in events]
)

next_batch = channel.json_body["next_batch"]

# We also want to test that the vector clock style token works with
# pagination. We do this by sending a couple of new events into the room
# and syncing again to get a prev_batch token for each room, then
# paginating from there back to the vector clock token.
self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)

request, channel = self.make_request(
"GET", "/sync?since={}".format(next_batch), access_token=access_token
)
self.render_on_worker(sync_hs, request)

prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
"prev_batch"
]
prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
"prev_batch"
]

# Paginating back in the first room should not produce any results, as
# no events have happened in it. This tests that we are correctly
# filtering results based on the vector clock portion.
request, channel = self.make_request(
"GET",
"/rooms/{}/messages?from={}&to={}&dir=b".format(
room_id1, prev_batch1, vector_clock_token
),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertListEqual([], channel.json_body["chunk"])

# Paginating back on the second room should produce the first event
# again. This tests that pagination isn't completely broken.
request, channel = self.make_request(
"GET",
"/rooms/{}/messages?from={}&to={}&dir=b".format(
room_id2, prev_batch2, vector_clock_token
),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertEqual(len(channel.json_body["chunk"]), 1)
self.assertEqual(
channel.json_body["chunk"][0]["event_id"], first_event_in_room2
)
32 changes: 31 additions & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import inspect
import logging
import time
from typing import Optional, Tuple, Type, TypeVar, Union
from typing import Optional, Tuple, Type, TypeVar, Union, overload

from mock import Mock, patch

Expand Down Expand Up @@ -357,6 +357,36 @@ def prepare(self, reactor, clock, homeserver):
Function to optionally be overridden in subclasses.
"""

# Annoyingly mypy doesn't seem to pick up the fact that T is SynapseRequest
# when the `request` arg isn't given, so we define an explicit override to
# cover that case.
@overload
def make_request(
self,
method: Union[bytes, str],
path: Union[bytes, str],
content: Union[bytes, dict] = b"",
access_token: Optional[str] = None,
shorthand: bool = True,
federation_auth_origin: str = None,
content_is_form: bool = False,
) -> Tuple[SynapseRequest, FakeChannel]:
...

@overload
def make_request(
self,
method: Union[bytes, str],
path: Union[bytes, str],
content: Union[bytes, dict] = b"",
access_token: Optional[str] = None,
request: Type[T] = SynapseRequest,
shorthand: bool = True,
federation_auth_origin: str = None,
content_is_form: bool = False,
) -> Tuple[T, FakeChannel]:
...

def make_request(
self,
method: Union[bytes, str],
Expand Down