Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include thread information when sending receipts over federation. #14466

Merged
merged 13 commits into from
Nov 28, 2022
1 change: 1 addition & 0 deletions changelog.d/14466.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where a receipt's thread ID was not sent over federation.
59 changes: 48 additions & 11 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
from synapse.types import JsonDict, ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.visibility import filter_events_for_server

Expand Down Expand Up @@ -136,8 +136,10 @@ def __init__(
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}

# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
# room_id -> receipt_type -> thread_id -> user_id -> receipt_dict
self._pending_rrs: Dict[
str, Dict[str, Dict[Optional[str], Dict[str, dict]]]
] = {}
self._rrs_pending_flush = False

# stream_id of last successfully sent to-device message.
Expand Down Expand Up @@ -202,9 +204,15 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
Args:
receipt: receipt to be queued
"""
serialized_receipt: JsonDict = {
"event_ids": receipt.event_ids,
"data": receipt.data,
}
if receipt.thread_id is not None:
serialized_receipt["data"]["thread_id"] = receipt.thread_id
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
).setdefault(receipt.thread_id, {})[receipt.user_id] = serialized_receipt

def flush_read_receipts_for_room(self, room_id: str) -> None:
# if we don't have any read-receipts for this room, it may be that we've already
Expand Down Expand Up @@ -549,15 +557,44 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
# not yet time for this lot
return

edu = Edu(
origin=self._server_name,
destination=self._destination,
edu_type=EduTypes.RECEIPT,
content=self._pending_rrs,
)
# Build the EDUs needed to send these receipts. This is a bit complicated
# since we can share one for each unique (room, receipt type, user), but
# need additional ones for different threads. The result is that we will
# send N EDUs where N is the number of unique threads across rooms.
#
# This could be more efficient by bundling users who have sent receipts
# for different threads.
clokep marked this conversation as resolved.
Show resolved Hide resolved
while self._pending_rrs:
# The next EDU's content.
content: JsonDict = {}

# Iterate each room's receipt types and threads, adding it to the content.
for room_id in list(self._pending_rrs.keys()):
for receipt_type in list(self._pending_rrs[room_id].keys()):
thread_ids = self._pending_rrs[room_id][receipt_type]
# The thread ID itself doesn't matter at this point.
content.setdefault(room_id, {})[
receipt_type
] = thread_ids.popitem()[1]

# If there are no threads left in this room / receipt type.
# Clear it out.
if not thread_ids:
del self._pending_rrs[room_id][receipt_type]

# Again, clear out any blank rooms.
if not self._pending_rrs[room_id]:
del self._pending_rrs[room_id]

yield Edu(
origin=self._server_name,
destination=self._destination,
edu_type=EduTypes.RECEIPT,
content=content,
)

squahtx marked this conversation as resolved.
Show resolved Hide resolved
self._pending_rrs = {}
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._rrs_pending_flush = False
yield edu

def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
Expand Down
1 change: 0 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None
continue

# Check if these receipts apply to a thread.
thread_id = None
data = user_values.get("data", {})
thread_id = data.get("thread_id")
# If the thread ID is invalid, consider it missing.
Expand Down
68 changes: 68 additions & 0 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,74 @@ def test_send_receipts(self):
],
)

@override_config({"send_federation": True})
def test_send_receipts_thread(self):
mock_send_transaction = (
self.hs.get_federation_transport_client().send_transaction
)
mock_send_transaction.return_value = make_awaitable({})

# Create receipts for:
#
# * The same room / user on multiple threads.
# * A different user in the same room.
sender = self.hs.get_federation_sender()
for user, thread in (("alice", None), ("alice", "thread"), ("bob", None)):
receipt = ReadReceipt(
"room_id",
"m.read",
user,
["event_id"],
thread_id=thread,
data={"ts": 1234},
)
self.successResultOf(
defer.ensureDeferred(sender.send_read_receipt(receipt))
)

self.pump()

# expect a call to send_transaction with two EDUs to separate threads.
mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
# Note that the ordering of the EDUs doesn't matter.
self.assertCountEqual(
data["edus"],
[
{
"edu_type": EduTypes.RECEIPT,
"content": {
"room_id": {
"m.read": {
"alice": {
"event_ids": ["event_id"],
"data": {"ts": 1234, "thread_id": "thread"},
}
}
}
},
},
{
"edu_type": EduTypes.RECEIPT,
"content": {
"room_id": {
"m.read": {
"alice": {
"event_ids": ["event_id"],
"data": {"ts": 1234},
},
"bob": {
"event_ids": ["event_id"],
"data": {"ts": 1234},
},
}
}
},
},
],
)

@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
Expand Down