Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Federation API for Space summary #9652

Merged
merged 7 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import functools
import logging
import re
from typing import Optional, Tuple, Type
from typing import TYPE_CHECKING, Container, Mapping, Optional, Sequence, Tuple, Type

import synapse
from synapse.api.constants import MAX_GROUP_CATEGORYID_LENGTH, MAX_GROUP_ROLEID_LENGTH
Expand All @@ -29,7 +29,7 @@
FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX,
)
from synapse.http.server import JsonResource
from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import (
parse_boolean_from_args,
parse_integer_from_args,
Expand All @@ -44,10 +44,14 @@
whitelisted_homeserver,
)
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.types import JsonDict, ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name
from synapse.util.versionstring import get_version_string

if TYPE_CHECKING:
import synapse.server

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -1376,6 +1380,40 @@ async def on_PUT(self, origin, content, query, group_id):
return 200, new_content


class FederationSpaceSummaryServlet(BaseFederationServlet):
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/spaces/(?P<room_id>[^/]*)"

async def on_POST(
self,
origin: str,
content: JsonDict,
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = content.get("suggested_only", False)
if not isinstance(suggested_only, bool):
raise SynapseError(
400, "'suggested_only' must be a boolean", Codes.BAD_JSON
richvdh marked this conversation as resolved.
Show resolved Hide resolved
)

exclude_rooms = content.get("exclude_rooms", [])
if not isinstance(exclude_rooms, list) or any(
not isinstance(x, str) for x in exclude_rooms
):
raise SynapseError(400, "bad value for 'exclude_rooms'", Codes.BAD_JSON)

max_rooms_per_space = content.get("max_rooms_per_space")
if max_rooms_per_space is not None and not isinstance(max_rooms_per_space, int):
raise SynapseError(
400, "bad value for 'max_rooms_per_space'", Codes.BAD_JSON
)

return 200, await self.handler.federation_space_summary(
room_id, suggested_only, max_rooms_per_space, exclude_rooms
)


class RoomComplexityServlet(BaseFederationServlet):
"""
Indicates to other servers how complex (and therefore likely
Expand Down Expand Up @@ -1474,18 +1512,24 @@ async def on_GET(self, origin, content, query, room_id):
)


def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None):
def register_servlets(
hs: "synapse.server.HomeServer",
richvdh marked this conversation as resolved.
Show resolved Hide resolved
resource: HttpServer,
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
servlet_groups: Optional[Container[str]] = None,
):
"""Initialize and register servlet classes.

Will by default register all servlets. For custom behaviour, pass in
a list of servlet_groups to register.

Args:
hs (synapse.server.HomeServer): homeserver
resource (JsonResource): resource class to register to
authenticator (Authenticator): authenticator to use
ratelimiter (util.ratelimitutils.FederationRateLimiter): ratelimiter to use
servlet_groups (list[str], optional): List of servlet groups to register.
hs: homeserver
resource: resource class to register to
authenticator: authenticator to use
ratelimiter: ratelimiter to use
servlet_groups: List of servlet groups to register.
Defaults to ``DEFAULT_SERVLET_GROUPS``.
"""
if not servlet_groups:
Expand All @@ -1500,6 +1544,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N
server_name=hs.hostname,
).register(resource)

if hs.config.experimental.spaces_enabled:
FederationSpaceSummaryServlet(
handler=hs.get_space_summary_handler(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
).register(resource)

if "openid" in servlet_groups:
for servletclass in OPENID_SERVLET_CLASSES:
servletclass(
Expand Down
184 changes: 139 additions & 45 deletions synapse/handlers/space_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import itertools
import logging
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Set
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple

import attr

from synapse.api.constants import EventContentFields, EventTypes, HistoryVisibility
from synapse.api.errors import AuthError
Expand Down Expand Up @@ -54,7 +56,7 @@ async def get_space_summary(
max_rooms_per_space: Optional[int] = None,
) -> JsonDict:
"""
Implementation of the space summary API
Implementation of the space summary C-S API

Args:
requester: user id of the user making this request
Expand All @@ -66,7 +68,7 @@ async def get_space_summary(

max_rooms_per_space: an optional limit on the number of child rooms we will
return. This does not apply to the root room (ie, room_id), and
is overridden by ROOMS_PER_SPACE_LIMIT.
is overridden by MAX_ROOMS_PER_SPACE.

Returns:
summary dict to return
Expand All @@ -76,67 +78,154 @@ async def get_space_summary(
await self._auth.check_user_in_room_or_world_readable(room_id, requester)

# the queue of rooms to process
room_queue = deque((room_id,))
room_queue = deque((_RoomQueueEntry(room_id),))

processed_rooms = set() # type: Set[str]

rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict]

now = self._clock.time_msec()
while room_queue and len(rooms_result) < MAX_ROOMS:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)

# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing.
max_children = max_rooms_per_space if processed_rooms else None

rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
)

rooms_result.extend(rooms)
events_result.extend(events)

# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(_RoomQueueEntry(edge_event["state_key"]))

return {"rooms": rooms_result, "events": events_result}

async def federation_space_summary(
self,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: Iterable[str],
) -> JsonDict:
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""
Implementation of the space summary Federation API

Args:
room_id: room id to start the summary at

suggested_only: whether we should only return children with the "suggested"
flag set.

max_rooms_per_space: an optional limit on the number of child rooms we will
return. Unlike the C-S API, this applies to the root room (room_id).
It is clipped to MAX_ROOMS_PER_SPACE.

exclude_rooms: a list of rooms to skip over (presumably because the
calling server has already seen them).

Returns:
summary dict to return
"""
# the queue of rooms to process
room_queue = deque((room_id,))

# the set of rooms that we should not walk further. Initialise it with the
# excluded-rooms list; we will add other rooms as we process them so that
# we do not loop.
processed_rooms = set(exclude_rooms) # type: Set[str]

rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict]

while room_queue and len(rooms_result) < MAX_ROOMS:
room_id = room_queue.popleft()
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)

try:
await self._auth.check_user_in_room_or_world_readable(
room_id, requester
)
except AuthError:
logger.info(
"user %s cannot view room %s, omitting from summary",
requester,
room_id,
)
continue
rooms, events = await self._summarize_local_room(
None, room_id, suggested_only, max_rooms_per_space
)

room_entry = await self._build_room_entry(room_id)
rooms_result.append(room_entry)
rooms_result.extend(rooms)
events_result.extend(events)

# look for child rooms/spaces.
child_events = await self._get_child_events(room_id)
# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(edge_event["state_key"])

if suggested_only:
# we only care about suggested children
child_events = filter(_is_suggested_child_event, child_events)
return {"rooms": rooms_result, "events": events_result}

# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing. Otherwise, apply any client-specified
# limit, capping to our built-in limit.
if max_rooms_per_space is not None and len(processed_rooms) > 1:
max_rooms = min(MAX_ROOMS_PER_SPACE, max_rooms_per_space)
else:
max_rooms = MAX_ROOMS_PER_SPACE

for edge_event in itertools.islice(child_events, max_rooms):
edge_room_id = edge_event.state_key

events_result.append(
await self._event_serializer.serialize_event(
edge_event,
time_now=now,
event_format=format_event_for_client_v2,
)
async def _summarize_local_room(
self,
requester: Optional[str],
room_id: str,
suggested_only: bool,
max_children: Optional[int],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
if not await self._is_room_accessible(room_id, requester):
return (), ()

room_entry = await self._build_room_entry(room_id)

# look for child rooms/spaces.
child_events = await self._get_child_events(room_id)

if suggested_only:
# we only care about suggested children
child_events = filter(_is_suggested_child_event, child_events)

if max_children is None or max_children > MAX_ROOMS_PER_SPACE:
max_children = MAX_ROOMS_PER_SPACE

now = self._clock.time_msec()
events_result = [] # type: List[JsonDict]
for edge_event in itertools.islice(child_events, max_children):
events_result.append(
await self._event_serializer.serialize_event(
edge_event,
time_now=now,
event_format=format_event_for_client_v2,
)
)
return (room_entry,), events_result
richvdh marked this conversation as resolved.
Show resolved Hide resolved

# if we haven't yet visited the target of this link, add it to the queue
if edge_room_id not in processed_rooms:
room_queue.append(edge_room_id)
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
# if we have an authenticated requesting user, first check if they are in the
# room
if requester:
try:
await self._auth.check_user_in_room(room_id, requester)
return True
except AuthError:
pass

return {"rooms": rooms_result, "events": events_result}
# otherwise, check if the room is peekable
hist_vis = ""
hist_vis_ev = await self._state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if hist_vis_ev:
hist_vis = hist_vis_ev.content.get("history_visibility")
if hist_vis == HistoryVisibility.WORLD_READABLE:
return True

logger.info(
"room %s is unpeekable and user %s is not a member, omitting from summary",
room_id,
requester,
)
return False

async def _build_room_entry(self, room_id: str) -> JsonDict:
"""Generate en entry suitable for the 'rooms' list in the summary response"""
Expand Down Expand Up @@ -191,6 +280,11 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
return (e for e in events if e.content.get("via"))


@attr.s(frozen=True, slots=True)
class _RoomQueueEntry:
room_id = attr.ib(type=str)
richvdh marked this conversation as resolved.
Show resolved Hide resolved


def _is_suggested_child_event(edge_event: EventBase) -> bool:
suggested = edge_event.content.get("suggested")
if isinstance(suggested, bool) and suggested:
Expand Down