Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

WIP: Dmr/unblock catchup #15228

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,15 @@ async def _maybe_backfill_inner(
get_prev_content=False,
)

# We set `check_history_visibility_only` as we might otherwise get false
# We unset `filter_out_erased_senders` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = await filter_events_for_server(
self._storage_controllers,
self.server_name,
self.server_name,
events_to_check,
redact=False,
check_history_visibility_only=True,
filter_out_erased_senders=False,
)
if filtered_extremities:
extremities_to_request.append(bp.event_id)
Expand Down
48 changes: 29 additions & 19 deletions synapse/visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import logging
from enum import Enum, auto
from typing import Collection, Dict, FrozenSet, List, Optional, Tuple
from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Sequence, Tuple

import attr
from typing_extensions import Final
Expand Down Expand Up @@ -565,29 +565,40 @@ async def filter_events_for_server(
storage: StorageControllers,
target_server_name: str,
local_server_name: str,
events: List[EventBase],
events: Sequence[EventBase],
redact: bool = True,
check_history_visibility_only: bool = False,
filter_out_erased_senders: bool = True,
) -> List[EventBase]:
"""Filter a list of events based on whether given server is allowed to
"""Filter a list of events based on whether the target server is allowed to
see them.

For a fully stated room, the target server is allowed to see an event E if:
- the state at E has world readable or shared history vis, OR
- the state at E says that the target server is in the room.

For a partially stated room, the target server is allowed to see E if:
- E was created by this homeserver, AND:
- the partial state at E has world readable or shared history vis, OR
- the partial state at E says that the target server is in the room.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

TODO: state before or state after?

Args:
storage
server_name
target_server_name
local_server_name
events
redact: Whether to return a redacted version of the event, or
to filter them out entirely.
check_history_visibility_only: Whether to only check the
history visibility, rather than things like if the sender has been
redact: Controls what to do with events which have been filtered out.
If True, include their redacted forms; if False, omit them entirely.
filter_out_erased_senders: If true, also filter out events whose sender has been
erased. This is used e.g. during pagination to decide whether to
backfill or not.

Returns
The filtered events.
"""

def is_sender_erased(event: EventBase, erased_senders: Dict[str, bool]) -> bool:
def is_sender_erased(event: EventBase, erased_senders: Mapping[str, bool]) -> bool:
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
if erased_senders and erased_senders[event.sender]:
logger.info("Sender of %s has been erased, redacting", event.event_id)
return True
Expand Down Expand Up @@ -616,7 +627,7 @@ def check_event_is_visible(
# server has no users in the room: redact
return False

if not check_history_visibility_only:
if filter_out_erased_senders:
erased_senders = await storage.main.are_users_erased(e.sender for e in events)
else:
# We don't want to check whether users are erased, which is equivalent
Expand All @@ -632,14 +643,13 @@ def check_event_is_visible(
# this check but would base the filtering on an outdated view of the membership events.

partial_state_invisible_events = set()
if not check_history_visibility_only:
Copy link
Contributor

@MatMaul MatMaul Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to some failing tests we need to keep a way to shortcut this code for the backfill use case.
In this case this function is called with backward extremities we want to use as a base for backfilling to check each extremity in turn and ignore those which users on our server wouldn't be able to see.
So I think there is no way here that we will leak events we shouldn't, so skipping this should be fine.
Why does it make the tests fail ? I haven't looked deep enough yet to understand.

for e in events:
sender_domain = get_domain_from_id(e.sender)
if (
sender_domain != local_server_name
and await storage.main.is_partial_state_room(e.room_id)
):
partial_state_invisible_events.add(e)
for e in events:
sender_domain = get_domain_from_id(e.sender)
if (
sender_domain != local_server_name
and await storage.main.is_partial_state_room(e.room_id)
):
partial_state_invisible_events.add(e)

# Let's check to see if all the events have a history visibility
# of "shared" or "world_readable". If that's the case then we don't
Expand Down
77 changes: 76 additions & 1 deletion tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Callable, List, Optional, Tuple
from typing import Callable, Collection, List, Optional, Tuple
from unittest import mock
from unittest.mock import Mock

from twisted.test.proto_helpers import MemoryReactor
Expand Down Expand Up @@ -500,3 +501,77 @@ def test_not_latest_event(self) -> None:
self.assertEqual(len(sent_pdus), 1)
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
self.assertFalse(per_dest_queue._catching_up)

def test_catch_up_is_not_blocked_by_partial_state_room(self) -> None:
"""Detects (part of?) https://github.com/matrix-org/synapse/issues/15220."""
# ARRANGE:
# - a local user (u1)
# - a room with which contains u1 and two remote users, @u2:host2 and @u3:other
# - events in that room such that
# - history visibility is restricted
# - u1 sent message events
# - afterwards, u3 sent a remote event
# - catchup to begin for host2
per_dest_queue, sent_pdus = self.make_fake_destination_queue()

self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)
self.helper.send_state(
room_id=room,
event_type="m.room.history_visibility",
body={"history_visibility": "joined"},
tok=u1_token,
)
self.get_success(
event_injection.inject_member_event(self.hs, room, "@u2:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room, "@u3:other", "join")
)

# create some events
event_id_1 = self.helper.send(room, "hello", tok=u1_token)["event_id"]
event_id_2 = self.helper.send(room, "world", tok=u1_token)["event_id"]
# pretend that u3 changes their displayname
event_id_3 = self.get_success(
event_injection.inject_member_event(self.hs, room, "@u3:other", "join")
).event_id

# destination_rooms should already be populated, but let us pretend that we already
# sent (successfully) up to and including event id 1
event_1 = self.get_success(self.hs.get_datastores().main.get_event(event_id_1))
assert event_1.internal_metadata.stream_ordering is not None
self.get_success(
self.hs.get_datastores().main.set_destination_last_successful_stream_ordering(
"host2", event_1.internal_metadata.stream_ordering
)
)

# Mock event 3 as having partial state
self.get_success(
event_injection.mark_event_as_partial_state(self.hs, event_id_3, room)
)

# Fail the test if we block on full state for event 3.
async def mock_await_full_state(event_ids: Collection[str]) -> None:
if event_id_3 in event_ids:
raise AssertionError("Tried to await full state for event_id_3")

# ACT
with mock.patch.object(
self.hs.get_storage_controllers().state._partial_state_events_tracker,
"await_full_state",
mock_await_full_state,
):
self.get_success(per_dest_queue._catch_up_transmission_loop())

# ASSERT
# We should have:
# - not sent event 3: it's not ours, and the room is partial stated
# - fallen back to sending event 2: it's the most recent event in the room
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a little dodgy. In this situation, host2 is in the room prior to event_2 being created, so it's correct that we send event_2 as the catchup event.

BUT: suppose that the order of event was

  • event 2
  • host2 joins
  • event 3

then host 2 wouldn't be privvy to event 2. I think we'll either

  • block awaitng event2's full state, or
  • incorrectly send event 2 to host2

I need to probably make another test case and stare at this some more.

# we tried to send to host2
# - completed catch-up
self.assertEqual(len(sent_pdus), 1)
self.assertEqual(sent_pdus[0].event_id, event_id_2)
self.assertFalse(per_dest_queue._catching_up)
31 changes: 31 additions & 0 deletions tests/test_utils/event_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,34 @@ async def create_event(
context = await unpersisted_context.persist(event)

return event, context


async def mark_event_as_partial_state(
hs: synapse.server.HomeServer,
event_id: str,
room_id: str,
) -> None:
"""
(Falsely) mark an event as having partial state.

Naughty, but occasionally useful when checking that partial state doesn't
block something from happening.

If the event already has partial state, this insert will fail (event_id is unique
in this table).
"""
store = hs.get_datastores().main
await store.db_pool.simple_upsert(
table="partial_state_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"room_id": room_id},
)

await store.db_pool.simple_insert(
table="partial_state_events",
values={
"room_id": room_id,
"event_id": event_id,
},
)