From 98cee16024b214aae6a877576c06b4a78f4c902e Mon Sep 17 00:00:00 2001 From: dav-is Date: Tue, 30 Aug 2022 14:00:14 -0400 Subject: [PATCH 01/12] Add /messages admin API --- synapse/handlers/pagination.py | 23 ++++--- synapse/rest/admin/rooms.py | 106 +++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a0c39778abba..776ac1918b5d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -32,6 +32,7 @@ from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client +from synapse.rest.admin._base import assert_user_is_admin if TYPE_CHECKING: from synapse.server import HomeServer @@ -423,6 +424,7 @@ async def get_messages( pagin_config: PaginationConfig, as_client_event: bool = True, event_filter: Optional[Filter] = None, + use_admin_priviledge: bool = False, ) -> JsonDict: """Get messages in a room. @@ -432,10 +434,16 @@ async def get_messages( pagin_config: The pagination config rules to apply, if any. as_client_event: True to get events in client-server format. event_filter: Filter to apply to results or None + use_admin_priviledge: if `True`, return all events, regardless + of whether `user` has access to them. To be used **ONLY** + from the admin API. Returns: Pagination API results """ + if use_admin_priviledge: + await assert_user_is_admin(self.auth, requester) + user_id = requester.user.to_string() if pagin_config.from_token: @@ -528,12 +536,13 @@ async def get_messages( if event_filter: events = await event_filter.filter(events) - events = await filter_events_for_client( - self._storage_controllers, - user_id, - events, - is_peeking=(member_event_id is None), - ) + if not use_admin_priviledge: + events = await filter_events_for_client( + self._storage_controllers, + user_id, + events, + is_peeking=(member_event_id is None), + ) # if after the filter applied there are no more events # return immediately - but there might be more in next_token batch @@ -562,7 +571,7 @@ async def get_messages( state = state_dict.values() aggregations = await self._relations_handler.get_bundled_aggregations( - events, user_id + events, user_id # TODO: filters based on user ) time_now = self.clock.time_msec() diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 3d870629c414..45fa4b1140a7 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -37,6 +37,7 @@ from synapse.storage.state import StateFilter from synapse.types import JsonDict, RoomID, UserID, create_requester from synapse.util import json_decoder +from synapse.streams.config import PaginationConfig if TYPE_CHECKING: from synapse.api.auth import Auth @@ -858,3 +859,108 @@ async def on_PUT( await self._store.unblock_room(room_id) return HTTPStatus.OK, {"block": block} + + +class RoomMessagesRestServlet(RestServlet): + """ + Get messages list of a room. + """ + + PATTERNS = admin_patterns("/rooms/(?P[^/]*)/messages$") + + def __init__(self, hs: "HomeServer"): + super().__init__() + self._hs = hs + self.clock = hs.get_clock() + self.pagination_handler = hs.get_pagination_handler() + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester) + + pagination_config = await PaginationConfig.from_request( + self.store, request, default_limit=10 + ) + # Twisted will have processed the args by now. + assert request.args is not None + as_client_event = b"raw" not in request.args + filter_str = parse_string(request, "filter", encoding="utf-8") + if filter_str: + filter_json = urlparse.unquote(filter_str) + event_filter: Optional[Filter] = Filter( + self._hs, json_decoder.decode(filter_json) + ) + if ( + event_filter + and event_filter.filter_json.get("event_format", "client") + == "federation" + ): + as_client_event = False + else: + event_filter = None + + msgs = await self.pagination_handler.get_messages( + room_id=room_id, + requester=requester, + pagin_config=pagination_config, + as_client_event=as_client_event, + event_filter=event_filter, + use_admin_priviledge=True, + ) + + return 200, msgs + + +class RoomTimestampToEventRestServlet(RestServlet): + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for cases like jump to date so you can start paginating messages from + a given date in the archive. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= + { + "event_id": ... + } + """ + + PATTERNS = admin_patterns("/rooms/(?P[^/]*)/timestamp_to_event$") + + def __init__(self, hs: "HomeServer"): + super().__init__() + self._auth = hs.get_auth() + self._store = hs.get_datastores().main + self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester) + + timestamp = parse_integer(request, "ts", required=True) + direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) + + ( + event_id, + origin_server_ts, + ) = await self.timestamp_lookup_handler.get_event_for_timestamp( + requester, room_id, timestamp, direction + ) + + return HTTPStatus.OK, { + "event_id": event_id, + "origin_server_ts": origin_server_ts, + } From aaf77377e43dc44a6e4f537acd0022bd069d411c Mon Sep 17 00:00:00 2001 From: dav-is Date: Thu, 1 Sep 2022 12:41:12 -0400 Subject: [PATCH 02/12] Add some tests --- synapse/handlers/pagination.py | 25 ++++--- synapse/rest/admin/__init__.py | 4 + synapse/rest/admin/rooms.py | 2 +- tests/rest/admin/test_room.py | 132 +++++++++++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 776ac1918b5d..6c17f90fddef 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -466,12 +466,14 @@ async def get_messages( room_token = from_token.room_key async with self.pagination_lock.read(room_id): - ( - membership, - member_event_id, - ) = await self.auth.check_user_in_room_or_world_readable( - room_id, requester, allow_departed_users=True - ) + (membership, member_event_id) = (None, None) + if not use_admin_priviledge: + ( + membership, + member_event_id, + ) = await self.auth.check_user_in_room_or_world_readable( + room_id, requester, allow_departed_users=True + ) if pagin_config.direction == "b": # if we're going backwards, we might need to backfill. This @@ -483,7 +485,7 @@ async def get_messages( room_id, room_token.stream ) - if membership == Membership.LEAVE: + if not use_admin_priviledge and membership == Membership.LEAVE: # If they have left the room then clamp the token to be before # they left the room, to save the effort of loading from the # database. @@ -570,9 +572,12 @@ async def get_messages( state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() - aggregations = await self._relations_handler.get_bundled_aggregations( - events, user_id # TODO: filters based on user - ) + if not use_admin_priviledge: + aggregations = await self._relations_handler.get_bundled_aggregations( + events, user_id + ) + else: + aggregations = None # TODO: an admin might want aggregations time_now = self.clock.time_msec() diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index fa3266720bcc..c442971da7d5 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -64,6 +64,8 @@ RoomRestServlet, RoomRestV2Servlet, RoomStateRestServlet, + RoomMessagesRestServlet, + RoomTimestampToEventRestServlet, ) from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet @@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: DestinationResetConnectionRestServlet(hs).register(http_server) DestinationRestServlet(hs).register(http_server) ListDestinationsRestServlet(hs).register(http_server) + RoomMessagesRestServlet(hs).register(http_server) + RoomTimestampToEventRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if hs.config.worker.worker_app is None: diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 45fa4b1140a7..47974cf430c7 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -930,7 +930,7 @@ class RoomTimestampToEventRestServlet(RestServlet): `dir` can be `f` or `b` to indicate forwards and backwards in time from the given timestamp. - GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= + GET /_synapse/admin/v1/rooms//timestamp_to_event?ts=&dir= { "event_id": ... } diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index fd6da557c1c9..10dc1a7e42bf 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -15,6 +15,10 @@ from typing import List, Optional from unittest.mock import Mock +from http import HTTPStatus +import json +from synapse.util.stringutils import random_string +from synapse.handlers.pagination import PurgeStatus from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor @@ -1783,6 +1787,134 @@ def test_get_joined_members_after_leave_room(self) -> None: self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + def test_topo_token_is_accepted(self) -> None: + user = self.register_user("foo1", "pass") + user_tok = self.login("foo1", "pass") + room_id = self.helper.create_room_as(user, tok=user_tok) + + token = "t1-0_0_0_0_0_0_0_0_0" + channel = self.make_request( + "GET", + "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s" + % (room_id, self.admin_user_tok, token), + ) + self.assertEqual(HTTPStatus.OK, channel.code) + self.assertTrue("start" in channel.json_body) + self.assertEqual(token, channel.json_body["start"]) + self.assertTrue("chunk" in channel.json_body) + self.assertTrue("end" in channel.json_body) + + def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: + user = self.register_user("foo2", "pass") + user_tok = self.login("foo2", "pass") + room_id = self.helper.create_room_as(user, tok=user_tok) + + token = "s0_0_0_0_0_0_0_0_0" + channel = self.make_request( + "GET", + "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s" + % (room_id, self.admin_user_tok, token), + ) + self.assertEqual(HTTPStatus.OK, channel.code) + self.assertTrue("start" in channel.json_body) + self.assertEqual(token, channel.json_body["start"]) + self.assertTrue("chunk" in channel.json_body) + self.assertTrue("end" in channel.json_body) + + def test_room_messages_purge(self) -> None: + user = self.register_user("foo3", "pass") + user_tok = self.login("foo3", "pass") + room_id = self.helper.create_room_as(user, tok=user_tok) + + store = self.hs.get_datastores().main + pagination_handler = self.hs.get_pagination_handler() + + # Send a first message in the room, which will be removed by the purge. + first_event_id = self.helper.send(room_id, body="message 1", tok=user_tok)[ + "event_id" + ] + first_token = self.get_success( + store.get_topological_token_for_event(first_event_id) + ) + first_token_str = self.get_success(first_token.to_string(store)) + + # Send a second message in the room, which won't be removed, and which we'll + # use as the marker to purge events before. + second_event_id = self.helper.send(room_id, body="message 2", tok=user_tok)[ + "event_id" + ] + second_token = self.get_success( + store.get_topological_token_for_event(second_event_id) + ) + second_token_str = self.get_success(second_token.to_string(store)) + + # Send a third event in the room to ensure we don't fall under any edge case + # due to our marker being the latest forward extremity in the room. + self.helper.send(room_id, body="message 3", tok=user_tok) + + # Check that we get the first and second message when querying /messages. + channel = self.make_request( + "GET", + "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s" + % ( + room_id, + self.admin_user_tok, + second_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + chunk = channel.json_body["chunk"] + self.assertEqual(len(chunk), 2, [event["content"] for event in chunk]) + + # Purge every event before the second event. + purge_id = random_string(16) + pagination_handler._purges_by_id[purge_id] = PurgeStatus() + self.get_success( + pagination_handler._purge_history( + purge_id=purge_id, + room_id=room_id, + token=second_token_str, + delete_local_events=True, + ) + ) + + # Check that we only get the second message through /message now that the first + # has been purged. + channel = self.make_request( + "GET", + "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s" + % ( + room_id, + self.admin_user_tok, + second_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + chunk = channel.json_body["chunk"] + self.assertEqual(len(chunk), 1, [event["content"] for event in chunk]) + + # Check that we get no event, but also no error, when querying /messages with + # the token that was pointing at the first event, because we don't have it + # anymore. + channel = self.make_request( + "GET", + "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s" + % ( + room_id, + self.admin_user_tok, + first_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + chunk = channel.json_body["chunk"] + self.assertEqual(len(chunk), 0, [event["content"] for event in chunk]) + class JoinAliasRoomTestCase(unittest.HomeserverTestCase): From 7ba50617dee2fb9d1409abd72a5d1f12823c17af Mon Sep 17 00:00:00 2001 From: dav-is Date: Thu, 1 Sep 2022 13:02:32 -0400 Subject: [PATCH 03/12] Fix lint Signed-off-by: dav-is --- synapse/handlers/pagination.py | 2 +- synapse/rest/admin/__init__.py | 2 +- synapse/rest/admin/rooms.py | 2 +- tests/rest/admin/test_room.py | 9 ++++----- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 6c17f90fddef..4050bb26490d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -26,13 +26,13 @@ from synapse.handlers.room import ShutdownRoomResponse from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, StreamKeyType from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client -from synapse.rest.admin._base import assert_user_is_admin if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index c442971da7d5..bac754e1b1d5 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -61,10 +61,10 @@ MakeRoomAdminRestServlet, RoomEventContextServlet, RoomMembersRestServlet, + RoomMessagesRestServlet, RoomRestServlet, RoomRestV2Servlet, RoomStateRestServlet, - RoomMessagesRestServlet, RoomTimestampToEventRestServlet, ) from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 47974cf430c7..c8685267f598 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -35,9 +35,9 @@ ) from synapse.storage.databases.main.room import RoomSortOrder from synapse.storage.state import StateFilter +from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, RoomID, UserID, create_requester from synapse.util import json_decoder -from synapse.streams.config import PaginationConfig if TYPE_CHECKING: from synapse.api.auth import Auth diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 0c6e505a7f88..34d9053e7534 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -11,14 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json import urllib.parse +from http import HTTPStatus from typing import List, Optional from unittest.mock import Mock -from http import HTTPStatus -import json -from synapse.util.stringutils import random_string -from synapse.handlers.pagination import PurgeStatus from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor @@ -26,10 +24,11 @@ import synapse.rest.admin from synapse.api.constants import EventTypes, Membership, RoomTypes from synapse.api.errors import Codes -from synapse.handlers.pagination import PaginationHandler +from synapse.handlers.pagination import PaginationHandler, PurgeStatus from synapse.rest.client import directory, events, login, room from synapse.server import HomeServer from synapse.util import Clock +from synapse.util.stringutils import random_string from tests import unittest From 7ce4d4ea4b0a5d311cd6d48257ccbf3a7e42a330 Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 13:15:01 -0400 Subject: [PATCH 04/12] Add changelog Signed-off-by: Connor Davis --- changelog.d/13672.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13672.feature diff --git a/changelog.d/13672.feature b/changelog.d/13672.feature new file mode 100644 index 000000000000..b29931c115f2 --- /dev/null +++ b/changelog.d/13672.feature @@ -0,0 +1 @@ +Add Admin APIs to Fetch Messages Within a Particular Window. From a876d47104f6ca1fe37dc5a253a43b082909873e Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 13:43:49 -0400 Subject: [PATCH 05/12] Apply suggestions from code review Co-authored-by: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> --- synapse/rest/admin/rooms.py | 13 ++++++------- tests/rest/admin/test_room.py | 13 +++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index c8685267f598..c9b6d10d6690 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -869,12 +869,11 @@ class RoomMessagesRestServlet(RestServlet): PATTERNS = admin_patterns("/rooms/(?P[^/]*)/messages$") def __init__(self, hs: "HomeServer"): - super().__init__() self._hs = hs - self.clock = hs.get_clock() - self.pagination_handler = hs.get_pagination_handler() - self.auth = hs.get_auth() - self.store = hs.get_datastores().main + self._clock = hs.get_clock() + self._pagination_handler = hs.get_pagination_handler() + self._auth = hs.get_auth() + self._store = hs.get_datastores().main async def on_GET( self, request: SynapseRequest, room_id: str @@ -912,7 +911,7 @@ async def on_GET( use_admin_priviledge=True, ) - return 200, msgs + return HTTPStatus.OK, msgs class RoomTimestampToEventRestServlet(RestServlet): @@ -942,7 +941,7 @@ def __init__(self, hs: "HomeServer"): super().__init__() self._auth = hs.get_auth() self._store = hs.get_datastores().main - self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler() + self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler() async def on_GET( self, request: SynapseRequest, room_id: str diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 34d9053e7534..9f8736bb4c0b 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1806,7 +1806,7 @@ def test_topo_token_is_accepted(self) -> None: "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s" % (room_id, self.admin_user_tok, token), ) - self.assertEqual(HTTPStatus.OK, channel.code) + self.assertEqual(200, channel.code) self.assertTrue("start" in channel.json_body) self.assertEqual(token, channel.json_body["start"]) self.assertTrue("chunk" in channel.json_body) @@ -1820,10 +1820,11 @@ def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: token = "s0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s" + "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (room_id, self.admin_user_tok, token), + access_token=self.admin_user_tok ) - self.assertEqual(HTTPStatus.OK, channel.code) + self.assertEqual(200, channel.code) self.assertTrue("start" in channel.json_body) self.assertEqual(token, channel.json_body["start"]) self.assertTrue("chunk" in channel.json_body) @@ -1871,7 +1872,7 @@ def test_room_messages_purge(self) -> None: json.dumps({"types": [EventTypes.Message]}), ), ) - self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + self.assertEqual(channel.code, 200, channel.json_body) chunk = channel.json_body["chunk"] self.assertEqual(len(chunk), 2, [event["content"] for event in chunk]) @@ -1900,7 +1901,7 @@ def test_room_messages_purge(self) -> None: json.dumps({"types": [EventTypes.Message]}), ), ) - self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + self.assertEqual(channel.code, 200, channel.json_body) chunk = channel.json_body["chunk"] self.assertEqual(len(chunk), 1, [event["content"] for event in chunk]) @@ -1918,7 +1919,7 @@ def test_room_messages_purge(self) -> None: json.dumps({"types": [EventTypes.Message]}), ), ) - self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + self.assertEqual(channel.code, 200, channel.json_body) chunk = channel.json_body["chunk"] self.assertEqual(len(chunk), 0, [event["content"] for event in chunk]) From ca47515524b940fa3901f4e1dfc2094d8d477696 Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 13:47:37 -0400 Subject: [PATCH 06/12] Don't use access token query string --- tests/rest/admin/test_room.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 9f8736bb4c0b..d1dd84456960 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -13,7 +13,6 @@ # limitations under the License. import json import urllib.parse -from http import HTTPStatus from typing import List, Optional from unittest.mock import Mock @@ -1803,8 +1802,8 @@ def test_topo_token_is_accepted(self) -> None: token = "t1-0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s" - % (room_id, self.admin_user_tok, token), + "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (room_id, token), + access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code) self.assertTrue("start" in channel.json_body) @@ -1820,9 +1819,8 @@ def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: token = "s0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?from=%s" - % (room_id, self.admin_user_tok, token), - access_token=self.admin_user_tok + "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (room_id, token), + access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code) self.assertTrue("start" in channel.json_body) @@ -1864,13 +1862,13 @@ def test_room_messages_purge(self) -> None: # Check that we get the first and second message when querying /messages. channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s" + "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s" % ( room_id, - self.admin_user_tok, second_token_str, json.dumps({"types": [EventTypes.Message]}), ), + access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200, channel.json_body) @@ -1893,13 +1891,13 @@ def test_room_messages_purge(self) -> None: # has been purged. channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s" + "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s" % ( room_id, - self.admin_user_tok, second_token_str, json.dumps({"types": [EventTypes.Message]}), ), + access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200, channel.json_body) @@ -1911,13 +1909,13 @@ def test_room_messages_purge(self) -> None: # anymore. channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s" + "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s" % ( room_id, - self.admin_user_tok, first_token_str, json.dumps({"types": [EventTypes.Message]}), ), + access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200, channel.json_body) From f58d23552e39898633e0811b7cac2f715957853e Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 14:37:34 -0400 Subject: [PATCH 07/12] Add timestamp to event test --- synapse/rest/admin/rooms.py | 10 +++++----- tests/rest/admin/test_room.py | 26 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index c9b6d10d6690..16e69833ddea 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -878,11 +878,11 @@ def __init__(self, hs: "HomeServer"): async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester) pagination_config = await PaginationConfig.from_request( - self.store, request, default_limit=10 + self._store, request, default_limit=10 ) # Twisted will have processed the args by now. assert request.args is not None @@ -902,7 +902,7 @@ async def on_GET( else: event_filter = None - msgs = await self.pagination_handler.get_messages( + msgs = await self._pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -955,7 +955,7 @@ async def on_GET( ( event_id, origin_server_ts, - ) = await self.timestamp_lookup_handler.get_event_for_timestamp( + ) = await self._timestamp_lookup_handler.get_event_for_timestamp( requester, room_id, timestamp, direction ) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index d1dd84456960..f76febc65925 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +import time import urllib.parse from typing import List, Optional from unittest.mock import Mock @@ -1794,7 +1795,30 @@ def test_get_joined_members_after_leave_room(self) -> None: self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + def test_timestamp_to_event(self) -> None: + """Test that providing the current timestamp can get the last event.""" + user = self.register_user("foo", "pass") + user_tok = self.login("foo", "pass") + room_id = self.helper.create_room_as(user, tok=user_tok) + + self.helper.send(room_id, body="message 1", tok=user_tok) + second_event_id = self.helper.send(room_id, body="message 2", tok=user_tok)[ + "event_id" + ] + ts = str(round(time.time() * 1000)) + + channel = self.make_request( + "GET", + "/_synapse/admin/v1/rooms/%s/timestamp_to_event?dir=b&ts=%s" + % (room_id, ts), + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code) + self.assertTrue("event_id" in channel.json_body) + self.assertEqual(second_event_id, channel.json_body["event_id"]) + def test_topo_token_is_accepted(self) -> None: + """Test Topo Token is accepted.""" user = self.register_user("foo1", "pass") user_tok = self.login("foo1", "pass") room_id = self.helper.create_room_as(user, tok=user_tok) @@ -1812,6 +1836,7 @@ def test_topo_token_is_accepted(self) -> None: self.assertTrue("end" in channel.json_body) def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: + """Test that stream token is accepted for forward pagination.""" user = self.register_user("foo2", "pass") user_tok = self.login("foo2", "pass") room_id = self.helper.create_room_as(user, tok=user_tok) @@ -1829,6 +1854,7 @@ def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: self.assertTrue("end" in channel.json_body) def test_room_messages_purge(self) -> None: + """Test room messages can be retrieved by an admin that isn't in the room.""" user = self.register_user("foo3", "pass") user_tok = self.login("foo3", "pass") room_id = self.helper.create_room_as(user, tok=user_tok) From 90f8d88b0b943c092360dd05d18df96211c859fd Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 17:05:52 -0400 Subject: [PATCH 08/12] Add docs --- docs/admin_api/rooms.md | 145 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 7526956bec39..81f0dd16696b 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -393,6 +393,151 @@ A response body like the following is returned: } ``` +# Room Messages API + +The Room Messages admin API allows server admins to get all messages +sent to a room in a given timeframe. There are various parameters available +that allow for filtering and ordering the returned list. This API supports pagination. + +To use it, you will need to authenticate by providing an `access_token` +for a server admin: see [Admin API](../usage/administration/admin_api). + +This endpoint mirrors the [Matrix Spec defined Messages API](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages). + +The API is: +``` +GET /_synapse/admin/v1/rooms//messages +``` + +**Parameters** + +The following path parameters are required: + +* `room_id` - The ID of the room you wish you fetch messages from. + +The following query parameters are available: + +* `from` (required) - The token to start returning events from. This token can be obtained from a prev_batch + or next_batch token returned by the /sync endpoint, or from an end token returned by a previous request to this endpoint. +* `to` - The token to spot returning events at. +* `limit` - The maximum number of events to return. Defaults to `10`. +* `filter` - A JSON RoomEventFilter to filter returned events with. +* `dir` - The direction to return events from. Either `f` for forwards or `b` for backwards. Setting + this value to `b` will reverse the above sort order. Defaults to `f`. + +**Response** + +The following fields are possible in the JSON response body: + +* `chunk` - A list of room events. The order depends on the dir parameter. + Note that an empty chunk does not necessarily imply that no more events are available. Clients should continue to paginate until no end property is returned. +* `end` - A token corresponding to the end of chunk. This token can be passed back to this endpoint to request further events. + If no further events are available, this property is omitted from the response. +* `start` - A token corresponding to the start of chunk. +* `state` - A list of state events relevant to showing the chunk. + +**Example** + +For more details on each chunk, read [the Matrix specification](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages). + +```json +{ + "chunk": [ + { + "content": { + "body": "This is an example text message", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message", + "msgtype": "m.text" + }, + "event_id": "$143273582443PhrSn:example.org", + "origin_server_ts": 1432735824653, + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "type": "m.room.message", + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "name": "The room name" + }, + "event_id": "$143273582443PhrSn:example.org", + "origin_server_ts": 1432735824653, + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "Gangnam Style", + "info": { + "duration": 2140786, + "h": 320, + "mimetype": "video/mp4", + "size": 1563685, + "thumbnail_info": { + "h": 300, + "mimetype": "image/jpeg", + "size": 46144, + "w": 300 + }, + "thumbnail_url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe", + "w": 480 + }, + "msgtype": "m.video", + "url": "mxc://example.org/a526eYUSFFxlgbQYZmo442" + }, + "event_id": "$143273582443PhrSn:example.org", + "origin_server_ts": 1432735824653, + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "type": "m.room.message", + "unsigned": { + "age": 1234 + } + } + ], + "end": "t47409-4357353_219380_26003_2265", + "start": "t47429-4392820_219380_26003_2265" +} +``` + +# Room Timestamp to Event API + +The Room Timestamp to Event API endpoint fetches the `event_id` of the closest event to the given +timestamp (`ts` query parameter) in the given direction (`dir` query parameter). + +Useful for cases like jump to date so you can start paginating messages from +a given date in the archive. + +The API is: +``` + GET /_synapse/admin/v1/rooms//timestamp_to_event +``` + +**Parameters** + +The following path parameters are required: + +* `room_id` - The ID of the room you wish to check. + +The following query parameters are available: + +* `ts` - a timestamp in milliseconds where we will find the closest event in + the given direction. +* `dir` - can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + +**Response** + +* `event_id` - converted from timestamp + # Block Room API The Block Room admin API allows server admins to block and unblock rooms, and query to see if a given room is blocked. From cbbbc856952cedc6a6fe427e629922e7b8b9f858 Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 17:18:23 -0400 Subject: [PATCH 09/12] From code review --- synapse/rest/admin/rooms.py | 1 - tests/rest/admin/test_room.py | 82 +++++++++++++++++------------------ 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 16e69833ddea..747e6fda8360 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -938,7 +938,6 @@ class RoomTimestampToEventRestServlet(RestServlet): PATTERNS = admin_patterns("/rooms/(?P[^/]*)/timestamp_to_event$") def __init__(self, hs: "HomeServer"): - super().__init__() self._auth = hs.get_auth() self._store = hs.get_datastores().main self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler() diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index f76febc65925..d156be82b04d 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1795,77 +1795,77 @@ def test_get_joined_members_after_leave_room(self) -> None: self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + +class RoomMessagesTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.user = self.register_user("foo", "pass") + self.user_tok = self.login("foo", "pass") + self.room_id = self.helper.create_room_as(self.user, tok=self.user_tok) + def test_timestamp_to_event(self) -> None: """Test that providing the current timestamp can get the last event.""" - user = self.register_user("foo", "pass") - user_tok = self.login("foo", "pass") - room_id = self.helper.create_room_as(user, tok=user_tok) - - self.helper.send(room_id, body="message 1", tok=user_tok) - second_event_id = self.helper.send(room_id, body="message 2", tok=user_tok)[ - "event_id" - ] + self.helper.send(self.room_id, body="message 1", tok=self.user_tok) + second_event_id = self.helper.send( + self.room_id, body="message 2", tok=self.user_tok + )["event_id"] ts = str(round(time.time() * 1000)) channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/timestamp_to_event?dir=b&ts=%s" - % (room_id, ts), + % (self.room_id, ts), access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code) - self.assertTrue("event_id" in channel.json_body) + self.assertIn("event_id", channel.json_body) self.assertEqual(second_event_id, channel.json_body["event_id"]) def test_topo_token_is_accepted(self) -> None: """Test Topo Token is accepted.""" - user = self.register_user("foo1", "pass") - user_tok = self.login("foo1", "pass") - room_id = self.helper.create_room_as(user, tok=user_tok) - token = "t1-0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (room_id, token), + "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code) - self.assertTrue("start" in channel.json_body) + self.assertIn("start", channel.json_body) self.assertEqual(token, channel.json_body["start"]) - self.assertTrue("chunk" in channel.json_body) - self.assertTrue("end" in channel.json_body) + self.assertIn("chunk", channel.json_body) + self.assertIn("end", channel.json_body) def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: """Test that stream token is accepted for forward pagination.""" - user = self.register_user("foo2", "pass") - user_tok = self.login("foo2", "pass") - room_id = self.helper.create_room_as(user, tok=user_tok) - token = "s0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", - "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (room_id, token), + "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code) - self.assertTrue("start" in channel.json_body) + self.assertIn("start", channel.json_body) self.assertEqual(token, channel.json_body["start"]) - self.assertTrue("chunk" in channel.json_body) - self.assertTrue("end" in channel.json_body) + self.assertIn("chunk", channel.json_body) + self.assertIn("end", channel.json_body) def test_room_messages_purge(self) -> None: """Test room messages can be retrieved by an admin that isn't in the room.""" - user = self.register_user("foo3", "pass") - user_tok = self.login("foo3", "pass") - room_id = self.helper.create_room_as(user, tok=user_tok) - store = self.hs.get_datastores().main pagination_handler = self.hs.get_pagination_handler() # Send a first message in the room, which will be removed by the purge. - first_event_id = self.helper.send(room_id, body="message 1", tok=user_tok)[ - "event_id" - ] + first_event_id = self.helper.send( + self.room_id, body="message 1", tok=self.user_tok + )["event_id"] first_token = self.get_success( store.get_topological_token_for_event(first_event_id) ) @@ -1873,9 +1873,9 @@ def test_room_messages_purge(self) -> None: # Send a second message in the room, which won't be removed, and which we'll # use as the marker to purge events before. - second_event_id = self.helper.send(room_id, body="message 2", tok=user_tok)[ - "event_id" - ] + second_event_id = self.helper.send( + self.room_id, body="message 2", tok=self.user_tok + )["event_id"] second_token = self.get_success( store.get_topological_token_for_event(second_event_id) ) @@ -1883,14 +1883,14 @@ def test_room_messages_purge(self) -> None: # Send a third event in the room to ensure we don't fall under any edge case # due to our marker being the latest forward extremity in the room. - self.helper.send(room_id, body="message 3", tok=user_tok) + self.helper.send(self.room_id, body="message 3", tok=self.user_tok) # Check that we get the first and second message when querying /messages. channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s" % ( - room_id, + self.room_id, second_token_str, json.dumps({"types": [EventTypes.Message]}), ), @@ -1907,7 +1907,7 @@ def test_room_messages_purge(self) -> None: self.get_success( pagination_handler._purge_history( purge_id=purge_id, - room_id=room_id, + room_id=self.room_id, token=second_token_str, delete_local_events=True, ) @@ -1919,7 +1919,7 @@ def test_room_messages_purge(self) -> None: "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s" % ( - room_id, + self.room_id, second_token_str, json.dumps({"types": [EventTypes.Message]}), ), @@ -1937,7 +1937,7 @@ def test_room_messages_purge(self) -> None: "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s" % ( - room_id, + self.room_id, first_token_str, json.dumps({"types": [EventTypes.Message]}), ), From e38bc2f37fc9309289a3af3a310394de91971a92 Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Thu, 1 Sep 2022 17:20:20 -0400 Subject: [PATCH 10/12] Fix caps --- changelog.d/13672.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/13672.feature b/changelog.d/13672.feature index b29931c115f2..2334e6fe1563 100644 --- a/changelog.d/13672.feature +++ b/changelog.d/13672.feature @@ -1 +1 @@ -Add Admin APIs to Fetch Messages Within a Particular Window. +Add admin APIs to fetch messages within a particular window of time. From cc2887788bed4c6147bf2b7a77d9810a13c4dc1c Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Fri, 2 Sep 2022 09:05:43 -0400 Subject: [PATCH 11/12] Update docs/admin_api/rooms.md Co-authored-by: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> --- docs/admin_api/rooms.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 81f0dd16696b..8f727b363eb8 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -532,7 +532,7 @@ The following query parameters are available: * `ts` - a timestamp in milliseconds where we will find the closest event in the given direction. * `dir` - can be `f` or `b` to indicate forwards and backwards in time from the - given timestamp. + given timestamp. Defaults to `f`. **Response** From ca613841184455a3b2ce62823f2d2ad98a0a246b Mon Sep 17 00:00:00 2001 From: Connor Davis Date: Tue, 6 Sep 2022 08:39:25 -0400 Subject: [PATCH 12/12] Add back aggregations --- synapse/handlers/pagination.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 4050bb26490d..1f83bab836e6 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -572,12 +572,9 @@ async def get_messages( state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() - if not use_admin_priviledge: - aggregations = await self._relations_handler.get_bundled_aggregations( - events, user_id - ) - else: - aggregations = None # TODO: an admin might want aggregations + aggregations = await self._relations_handler.get_bundled_aggregations( + events, user_id + ) time_now = self.clock.time_msec()