Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add Admin API to Fetch Messages Within a Particular Window #13672

Merged
merged 18 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13672.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Admin APIs to Fetch Messages Within a Particular Window.
46 changes: 30 additions & 16 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType
Expand Down Expand Up @@ -423,6 +424,7 @@ async def get_messages(
pagin_config: PaginationConfig,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
use_admin_priviledge: bool = False,
) -> JsonDict:
"""Get messages in a room.

Expand All @@ -432,10 +434,16 @@ async def get_messages(
pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None
use_admin_priviledge: if `True`, return all events, regardless
of whether `user` has access to them. To be used **ONLY**
from the admin API.

Returns:
Pagination API results
"""
if use_admin_priviledge:
await assert_user_is_admin(self.auth, requester)

user_id = requester.user.to_string()

if pagin_config.from_token:
Expand All @@ -458,12 +466,14 @@ async def get_messages(
room_token = from_token.room_key

async with self.pagination_lock.read(room_id):
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)

if pagin_config.direction == "b":
# if we're going backwards, we might need to backfill. This
Expand All @@ -475,7 +485,7 @@ async def get_messages(
room_id, room_token.stream
)

if membership == Membership.LEAVE:
if not use_admin_priviledge and membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
Expand Down Expand Up @@ -528,12 +538,13 @@ async def get_messages(
if event_filter:
events = await event_filter.filter(events)

events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)
if not use_admin_priviledge:
events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)

# if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch
Expand Down Expand Up @@ -561,9 +572,12 @@ async def get_messages(
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()

aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)
if not use_admin_priviledge:
aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)
else:
aggregations = None # TODO: an admin might want aggregations
dav-is marked this conversation as resolved.
Show resolved Hide resolved

time_now = self.clock.time_msec()

Expand Down
4 changes: 4 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@
MakeRoomAdminRestServlet,
RoomEventContextServlet,
RoomMembersRestServlet,
RoomMessagesRestServlet,
RoomRestServlet,
RoomRestV2Servlet,
RoomStateRestServlet,
RoomTimestampToEventRestServlet,
)
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
Expand Down Expand Up @@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server)

# Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None:
Expand Down
106 changes: 106 additions & 0 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.util import json_decoder

Expand Down Expand Up @@ -858,3 +859,108 @@ async def on_PUT(
await self._store.unblock_room(room_id)

return HTTPStatus.OK, {"block": block}


class RoomMessagesRestServlet(RestServlet):
"""
Get messages list of a room.
"""

PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")

def __init__(self, hs: "HomeServer"):
super().__init__()
dav-is marked this conversation as resolved.
Show resolved Hide resolved
self._hs = hs
self.clock = hs.get_clock()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
dav-is marked this conversation as resolved.
Show resolved Hide resolved

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)

pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
)
# Twisted will have processed the args by now.
assert request.args is not None
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
== "federation"
):
as_client_event = False
else:
event_filter = None

msgs = await self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event,
event_filter=event_filter,
use_admin_priviledge=True,
)

return 200, msgs
dav-is marked this conversation as resolved.
Show resolved Hide resolved


class RoomTimestampToEventRestServlet(RestServlet):
"""
API endpoint to fetch the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query
parameter).

Useful for cases like jump to date so you can start paginating messages from
a given date in the archive.

`ts` is a timestamp in milliseconds where we will find the closest event in
the given direction.

`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.

GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""

PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")

def __init__(self, hs: "HomeServer"):
super().__init__()
dav-is marked this conversation as resolved.
Show resolved Hide resolved
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
dav-is marked this conversation as resolved.
Show resolved Hide resolved

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)

timestamp = parse_integer(request, "ts", required=True)
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])

(
event_id,
origin_server_ts,
) = await self.timestamp_lookup_handler.get_event_for_timestamp(
requester, room_id, timestamp, direction
)

return HTTPStatus.OK, {
"event_id": event_id,
"origin_server_ts": origin_server_ts,
}
133 changes: 132 additions & 1 deletion tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import urllib.parse
from http import HTTPStatus
from typing import List, Optional
from unittest.mock import Mock

Expand All @@ -22,10 +24,11 @@
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership, RoomTypes
from synapse.api.errors import Codes
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.pagination import PaginationHandler, PurgeStatus
from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.stringutils import random_string

from tests import unittest

Expand Down Expand Up @@ -1792,6 +1795,134 @@ def test_get_joined_members_after_leave_room(self) -> None:
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])

def test_topo_token_is_accepted(self) -> None:
user = self.register_user("foo1", "pass")
user_tok = self.login("foo1", "pass")
room_id = self.helper.create_room_as(user, tok=user_tok)

token = "t1-0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s"
% (room_id, self.admin_user_tok, token),
)
self.assertEqual(HTTPStatus.OK, channel.code)
dav-is marked this conversation as resolved.
Show resolved Hide resolved
self.assertTrue("start" in channel.json_body)
self.assertEqual(token, channel.json_body["start"])
self.assertTrue("chunk" in channel.json_body)
self.assertTrue("end" in channel.json_body)

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
user = self.register_user("foo2", "pass")
user_tok = self.login("foo2", "pass")
room_id = self.helper.create_room_as(user, tok=user_tok)

token = "s0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s"
% (room_id, self.admin_user_tok, token),
dav-is marked this conversation as resolved.
Show resolved Hide resolved
)
self.assertEqual(HTTPStatus.OK, channel.code)
dav-is marked this conversation as resolved.
Show resolved Hide resolved
self.assertTrue("start" in channel.json_body)
self.assertEqual(token, channel.json_body["start"])
self.assertTrue("chunk" in channel.json_body)
self.assertTrue("end" in channel.json_body)

def test_room_messages_purge(self) -> None:
user = self.register_user("foo3", "pass")
user_tok = self.login("foo3", "pass")
room_id = self.helper.create_room_as(user, tok=user_tok)

store = self.hs.get_datastores().main
pagination_handler = self.hs.get_pagination_handler()

# Send a first message in the room, which will be removed by the purge.
first_event_id = self.helper.send(room_id, body="message 1", tok=user_tok)[
"event_id"
]
first_token = self.get_success(
store.get_topological_token_for_event(first_event_id)
)
first_token_str = self.get_success(first_token.to_string(store))

# Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before.
second_event_id = self.helper.send(room_id, body="message 2", tok=user_tok)[
"event_id"
]
second_token = self.get_success(
store.get_topological_token_for_event(second_event_id)
)
second_token_str = self.get_success(second_token.to_string(store))

# Send a third event in the room to ensure we don't fall under any edge case
# due to our marker being the latest forward extremity in the room.
self.helper.send(room_id, body="message 3", tok=user_tok)

# Check that we get the first and second message when querying /messages.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s"
dav-is marked this conversation as resolved.
Show resolved Hide resolved
% (
room_id,
self.admin_user_tok,
second_token_str,
json.dumps({"types": [EventTypes.Message]}),
),
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
dav-is marked this conversation as resolved.
Show resolved Hide resolved

chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])

# Purge every event before the second event.
purge_id = random_string(16)
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
room_id=room_id,
token=second_token_str,
delete_local_events=True,
)
)

# Check that we only get the second message through /message now that the first
# has been purged.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s"
% (
room_id,
self.admin_user_tok,
second_token_str,
json.dumps({"types": [EventTypes.Message]}),
),
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
dav-is marked this conversation as resolved.
Show resolved Hide resolved

chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 1, [event["content"] for event in chunk])

# Check that we get no event, but also no error, when querying /messages with
# the token that was pointing at the first event, because we don't have it
# anymore.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?access_token=%s&from=%s&dir=b&filter=%s"
% (
room_id,
self.admin_user_tok,
first_token_str,
json.dumps({"types": [EventTypes.Message]}),
),
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
dav-is marked this conversation as resolved.
Show resolved Hide resolved

chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 0, [event["content"] for event in chunk])


class JoinAliasRoomTestCase(unittest.HomeserverTestCase):

Expand Down