Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow multiple workers to write to receipts stream. #16432

Merged
merged 15 commits into from
Oct 25, 2023
Merged
1 change: 1 addition & 0 deletions changelog.d/16432.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow multiple workers to write to receipts stream.
4 changes: 2 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"Must only specify one instance to handle `account_data` messages."
)

if len(self.writers.receipts) != 1:
if len(self.writers.receipts) == 0:
raise ConfigError(
"Must only specify one instance to handle `receipts` messages."
"Must specify at least one instance to handle `receipts` messages."
Comment on lines +361 to +363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the documentation for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm somewhat inclined to leave this undocumented until we can test it? But I could also be persuaded otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. 👍

)

if len(self.writers.events) == 0:
Expand Down
42 changes: 23 additions & 19 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
DeviceListUpdates,
JsonDict,
JsonMapping,
MultiWriterStreamToken,
RoomAlias,
RoomStreamToken,
StreamKeyType,
Expand Down Expand Up @@ -217,7 +218,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
def notify_interested_services_ephemeral(
self,
stream_key: StreamKeyType,
new_token: Union[int, RoomStreamToken],
new_token: Union[int, RoomStreamToken, MultiWriterStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
"""
Expand Down Expand Up @@ -259,19 +260,6 @@ def notify_interested_services_ephemeral(
):
return

# Assert that new_token is an integer (and not a RoomStreamToken).
# All of the supported streams that this function handles use an
# integer to track progress (rather than a RoomStreamToken - a
# vector clock implementation) as they don't support multiple
# stream writers.
#
# As a result, we simply assert that new_token is an integer.
# If we do end up needing to pass a RoomStreamToken down here
# in the future, using RoomStreamToken.stream (the minimum stream
# position) to convert to an ascending integer value should work.
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)

# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == StreamKeyType.TO_DEVICE
Expand All @@ -286,6 +274,9 @@ def notify_interested_services_ephemeral(
):
return

# We know we're not a `RoomStreamToken` at this point.
assert not isinstance(new_token, RoomStreamToken)

# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
Expand Down Expand Up @@ -327,7 +318,7 @@ async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
stream_key: StreamKeyType,
new_token: int,
new_token: Union[int, MultiWriterStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
Expand All @@ -340,6 +331,7 @@ async def _notify_interested_services_ephemeral(
#
# Instead we simply grab the latest typing updates in _handle_typing
# and, if they apply to this application service, send it off.
assert isinstance(new_token, int)
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
Expand All @@ -350,15 +342,23 @@ async def _notify_interested_services_ephemeral(
(service.id, stream_key)
):
if stream_key == StreamKeyType.RECEIPT:
assert isinstance(new_token, MultiWriterStreamToken)

# We store appservice tokens as integers, so we ignore
# the `instance_map` components and instead simply
# follow the base stream position.
new_token = MultiWriterStreamToken(stream=new_token.stream)

events = await self._handle_receipts(service, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)

# Persist the latest handled stream token for this appservice
await self.store.set_appservice_stream_type_pos(
service, "read_receipt", new_token
service, "read_receipt", new_token.stream
)

elif stream_key == StreamKeyType.PRESENCE:
assert isinstance(new_token, int)
events = await self._handle_presence(service, users, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)

Expand All @@ -368,6 +368,7 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == StreamKeyType.TO_DEVICE:
assert isinstance(new_token, int)
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
Expand All @@ -383,6 +384,7 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == StreamKeyType.DEVICE_LIST:
assert isinstance(new_token, int)
device_list_summary = await self._get_device_list_summary(
service, new_token
)
Expand Down Expand Up @@ -432,7 +434,7 @@ async def _handle_typing(
return typing

async def _handle_receipts(
self, service: ApplicationService, new_token: int
self, service: ApplicationService, new_token: MultiWriterStreamToken
) -> List[JsonMapping]:
"""
Return the latest read receipts that the given application service should receive.
Expand All @@ -455,15 +457,17 @@ async def _handle_receipts(
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
if new_token is not None and new_token.stream <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []

from_token = MultiWriterStreamToken(stream=from_key)

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key, to_key=new_token
service=service, from_key=from_token, to_key=new_token
)
return receipts

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def _snapshot_all_rooms(
joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
receipt = await self.store.get_linearized_receipts_for_rooms(
joined_rooms,
to_key=int(now_token.receipt_key),
to_key=now_token.receipt_key,
)

receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
Expand Down
19 changes: 10 additions & 9 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.types import (
JsonDict,
JsonMapping,
MultiWriterStreamToken,
ReadReceipt,
StreamKeyType,
UserID,
Expand Down Expand Up @@ -200,7 +201,7 @@ async def received_client_receipt(
await self.federation_sender.send_read_receipt(receipt)


class ReceiptEventSource(EventSource[int, JsonMapping]):
class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.config = hs.config
Expand Down Expand Up @@ -273,13 +274,12 @@ def filter_out_private_receipts(
async def get_new_events(
self,
user: UserID,
from_key: int,
from_key: MultiWriterStreamToken,
limit: int,
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonMapping], int]:
from_key = int(from_key)
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
to_key = self.get_current_key()

if from_key == to_key:
Expand All @@ -296,8 +296,11 @@ async def get_new_events(
return events, to_key

async def get_new_events_as(
self, from_key: int, to_key: int, service: ApplicationService
) -> Tuple[List[JsonMapping], int]:
self,
from_key: MultiWriterStreamToken,
to_key: MultiWriterStreamToken,
service: ApplicationService,
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
"""Returns a set of new read receipt events that an appservice
may be interested in.

Expand All @@ -312,8 +315,6 @@ async def get_new_events_as(
appservice may be interested in.
* The current read receipt stream token.
"""
from_key = int(from_key)

if from_key == to_key:
return [], to_key

Expand All @@ -333,5 +334,5 @@ async def get_new_events_as(

return events, to_key

def get_current_key(self) -> int:
def get_current_key(self) -> MultiWriterStreamToken:
return self.store.get_max_receipt_stream_id()
7 changes: 6 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
DeviceListUpdates,
JsonDict,
JsonMapping,
MultiWriterStreamToken,
MutableStateMap,
Requester,
RoomStreamToken,
Expand Down Expand Up @@ -499,7 +500,11 @@ async def ephemeral_by_room(
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)

receipt_key = since_token.receipt_key if since_token else 0
receipt_key = (
since_token.receipt_key
if since_token
else MultiWriterStreamToken(stream=0)
)

receipt_source = self.event_sources.sources.receipt
receipts, receipt_key = await receipt_source.get_new_events(
Expand Down
45 changes: 43 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
Dict,
Iterable,
List,
Literal,
Optional,
Set,
Tuple,
TypeVar,
Union,
overload,
)

import attr
Expand All @@ -44,6 +46,7 @@
from synapse.streams.config import PaginationConfig
from synapse.types import (
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
RoomStreamToken,
StrCollection,
Expand Down Expand Up @@ -127,7 +130,7 @@ def __init__(
def notify(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken],
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
Expand Down Expand Up @@ -452,10 +455,48 @@ def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None:
except Exception:
logger.exception("Error pusher pool of event")

@overload
def on_new_event(
self,
stream_key: Literal[StreamKeyType.ROOM],
new_token: RoomStreamToken,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...
Comment on lines +458 to +466
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised we didn't already need this overload! Does it help avoid any asserts or anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't reduce assertions, as it only limits what is accepted as arguments (it doesn't change the return type). I added it to catch the case where we passed in an int when using StreamKeyType.RECEIPT.


@overload
def on_new_event(
self,
stream_key: Literal[StreamKeyType.RECEIPT],
new_token: MultiWriterStreamToken,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

@overload
def on_new_event(
self,
stream_key: Literal[
StreamKeyType.ACCOUNT_DATA,
StreamKeyType.DEVICE_LIST,
StreamKeyType.PRESENCE,
StreamKeyType.PUSH_RULES,
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
],
new_token: int,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

def on_new_event(
self,
stream_key: StreamKeyType,
new_token: Union[int, RoomStreamToken],
new_token: Union[int, RoomStreamToken, MultiWriterStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ async def on_rdata(
StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
)
elif stream_name == ReceiptsStream.NAME:
new_token = self.store.get_max_receipt_stream_id()
self.notifier.on_new_event(
StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
StreamKeyType.RECEIPT, new_token, rooms=[row.room_id for row in rows]
clokep marked this conversation as resolved.
Show resolved Hide resolved
)
await self._pusher_pool.on_new_receipts({row.user_id for row in rows})
elif stream_name == ToDeviceStream.NAME:
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def __init__(self, hs: "HomeServer"):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_receipt_stream_id),
store.get_receipt_stream_id_for_instance,
store.get_all_updated_receipts,
)

Expand Down
Loading
Loading