Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

No read receipts in large rooms #67

Open
wants to merge 8 commits into
base: beeper
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ class ReceiptTypes:
READ: Final = "m.read"
READ_PRIVATE: Final = "m.read.private"
FULLY_READ: Final = "m.fully_read"
BEEPER_INBOX_DONE: Final = "com.beeper.inbox.done"


RECEIPTS_MAX_ROOM_SIZE = 100


class PublicRoomsFilterFields:
Expand Down
10 changes: 9 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple

from synapse.api.constants import EduTypes, ReceiptTypes
from synapse.api.constants import RECEIPTS_MAX_ROOM_SIZE, EduTypes, ReceiptTypes
from synapse.appservice import ApplicationService
from synapse.streams import EventSource
from synapse.types import (
Expand Down Expand Up @@ -116,6 +116,14 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
min_batch_id: Optional[int] = None
max_batch_id: Optional[int] = None

# Beeper: we don't want to send read receipts to large rooms,
# so we convert messages to private, that are over RECEIPT_MAX_ROOM_SIZE.
for i, r in enumerate(receipts):
if r.receipt_type != ReceiptTypes.READ_PRIVATE:
num_users = await self.store.get_number_joined_users_in_room(r.room_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we gather all of these ahead of time? Also a little clunky awaited on these one at a time, can we bulk grab these with twisted async somehow? @Fizzadar plz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this handles federation as well so there could be up to 100 read receipts here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to ensure we only check the size of each room once beeper/synapse@fbcd582

But would be great to hear if anyone knows how to do the equivalent of asyncio.gather in twisted, I haven't found it myself yet....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s a gatherResults that roughly does the same!

Copy link
Author

@incognitobeeper incognitobeeper Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I'll check that out when I next get a chance to work on this. thanks @Fizzadar

if num_users > RECEIPTS_MAX_ROOM_SIZE:
receipts[i] = r.make_private_copy()

for receipt in receipts:
res = await self.store.insert_receipt(
receipt.room_id,
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user_id,
[
ReceiptTypes.READ,
ReceiptTypes.BEEPER_INBOX_DONE,
ReceiptTypes.READ_PRIVATE,
],
)
Expand Down
11 changes: 10 additions & 1 deletion synapse/rest/client/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Tuple

from synapse.api.constants import ReceiptTypes
from synapse.api.constants import RECEIPTS_MAX_ROOM_SIZE, ReceiptTypes
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
Expand All @@ -38,6 +38,7 @@ def __init__(self, hs: "HomeServer"):
self.config = hs.config
self.receipts_handler = hs.get_receipts_handler()
self.read_marker_handler = hs.get_read_marker_handler()
self.store = hs.get_datastores().main
self.presence_handler = hs.get_presence_handler()

self._known_receipt_types = {
Expand All @@ -55,6 +56,14 @@ async def on_POST(

body = parse_json_object_from_request(request)

# Beeper: we don't want to send read receipts to large rooms,
# so we convert messages to private, that are over RECEIPT_MAX_ROOM_SIZE.
if ReceiptTypes.READ in body:
num_users = await self.store.get_number_joined_users_in_room(room_id)
if num_users > RECEIPTS_MAX_ROOM_SIZE:
body[ReceiptTypes.READ_PRIVATE] = body[ReceiptTypes.READ]
del body[ReceiptTypes.READ]

unrecognized_types = set(body.keys()) - self._known_receipt_types
if unrecognized_types:
# It's fine if there are unrecognized receipt types, but let's log
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(self, hs: "HomeServer"):
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.FULLY_READ,
ReceiptTypes.BEEPER_INBOX_DONE,
}

async def on_POST(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def _get_room_summary_txn(
"get_room_summary", _get_room_summary_txn
)

@cached()
@cached(max_entries=100000)
async def get_number_joined_users_in_room(self, room_id: str) -> int:
return await self.db_pool.simple_select_one_onecol(
table="current_state_events",
Expand Down
16 changes: 16 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import abc
import re
import string
Expand Down Expand Up @@ -52,6 +54,7 @@
IReactorTime,
)

from synapse.api.constants import ReceiptTypes
from synapse.api.errors import Codes, SynapseError
from synapse.util.cancellation import cancellable
from synapse.util.stringutils import parse_and_validate_server_name
Expand Down Expand Up @@ -853,6 +856,19 @@ class ReadReceipt:
thread_id: Optional[str]
data: JsonDict

# Beeper: we don't want to alter the frozen attr, but
# do occasionally need to make a private copy to
# avoid traffic to large rooms.
def make_private_copy(self) -> ReadReceipt:
return ReadReceipt(
room_id=self.room_id,
user_id=self.user_id,
event_ids=self.event_ids,
thread_id=self.thread_id,
data=self.data,
receipt_type=ReceiptTypes.READ_PRIVATE,
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class DeviceListUpdates:
Expand Down
87 changes: 87 additions & 0 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import synapse.rest.admin
from synapse.api.constants import (
RECEIPTS_MAX_ROOM_SIZE,
EduTypes,
EventContentFields,
EventTypes,
Expand Down Expand Up @@ -382,6 +383,7 @@ class ReadReceiptsTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
read_marker.register_servlets,
receipts.register_servlets,
room.register_servlets,
sync.register_servlets,
Expand Down Expand Up @@ -499,6 +501,82 @@ def test_read_receipt_with_empty_body_is_rejected(self) -> None:
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST)
self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON", channel.json_body)

def test_read_receipt_not_sent_to_large_rooms(self) -> None:
# Beeper: we don't send read receipts on rooms with more
# users than # RECEIPTS_MAX_ROOM_SIZE
for i in range(RECEIPTS_MAX_ROOM_SIZE):
user = self.register_user(f"user_{i}", f"secure_password_{i}")
tok = self.login(f"user_{i}", f"secure_password_{i}")
self.helper.join(room=self.room_id, user=user, tok=tok)

res = self.helper.send(
self.room_id, body="woah, this is a big room!", tok=self.tok
)

for receipt_type in (
ReceiptTypes.FULLY_READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.READ,
):
# Send a read receipt
channel = self.make_request(
"POST",
f"/rooms/{self.room_id}/receipt/{receipt_type}/{res['event_id']}",
{},
access_token=self.tok2,
)
self.assertEqual(channel.code, 200)

# Test that we didn't get a read receipt.
self.assertIsNone(self._get_read_receipt())

def test_read_marker_doesnt_send_receipt_to_large_rooms(self) -> None:
# Beeper: we don't send read receipts on rooms with over 100 users
# add another 100 users to the room
for i in range(RECEIPTS_MAX_ROOM_SIZE):
user = self.register_user(f"user_{i}", f"secure_password_{i}")
tok = self.login(f"user_{i}", f"secure_password_{i}")
self.helper.join(room=self.room_id, user=user, tok=tok)

res = self.helper.send(
self.room_id, body="woah, this is a big room!", tok=self.tok
)

# Send a read receipt
channel = self.make_request(
"POST",
f"/rooms/{self.room_id}/read_markers",
{
ReceiptTypes.FULLY_READ: res["event_id"],
ReceiptTypes.READ: res["event_id"],
},
access_token=self.tok2,
)

self.assertEqual(channel.code, 200, channel.json_body)
# Test that we didn't get a read receipt.
self.assertIsNone(self._get_read_receipt())

def test_read_marker_does_send_receipt_to_small_rooms(self) -> None:
res = self.helper.send(
self.room_id, body="woah, this is room is tiny!", tok=self.tok
)

# Send a read receipt
channel = self.make_request(
"POST",
f"/rooms/{self.room_id}/read_markers",
{
ReceiptTypes.FULLY_READ: res["event_id"],
ReceiptTypes.READ: res["event_id"],
},
access_token=self.tok2,
)

self.assertEqual(channel.code, 200, channel.json_body)
# Test that we didn't get a read receipt.
self.assertIsNotNone(self._get_read_receipt())

def _get_read_receipt(self) -> Optional[JsonDict]:
"""Syncs and returns the read receipt."""

Expand Down Expand Up @@ -728,6 +806,15 @@ def test_unread_counts(self) -> None:
self.assertEqual(channel.code, 200, channel.json_body)
self._check_unread_count(0)

def test_large_rooms_dont_alter_unread_count_behaviour(self) -> None:
# Beeper: create a lot of users and join them to the room
for i in range(RECEIPTS_MAX_ROOM_SIZE):
user = self.register_user(f"user_{i}", f"secure_password_{i}")
tok = self.login(f"user_{i}", f"secure_password_{i}")
self.helper.join(self.room_id, user=user, tok=tok)

self.test_unread_counts()

# We test for all three receipt types that influence notification counts
@parameterized.expand(
[
Expand Down