Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow multiple workers to write to receipts stream. #16432

Merged
merged 15 commits into from
Oct 25, 2023
Merged
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, StreamKeyType, StreamToken
from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
Expand Down Expand Up @@ -314,7 +314,7 @@ def _get_recent_references_for_event_txn(
room_key=next_key,
presence_key=0,
typing_key=0,
receipt_key=0,
receipt_key=MultiWriterStreamToken(stream=0),
account_data_key=0,
push_rules_key=0,
to_device_key=0,
Expand Down
4 changes: 2 additions & 2 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from synapse.handlers.typing import TypingNotificationEventSource
from synapse.logging.opentracing import trace
from synapse.streams import EventSource
from synapse.types import StreamKeyType, StreamToken
from synapse.types import MultiWriterStreamToken, StreamKeyType, StreamToken

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -111,7 +111,7 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
room_key=await self.sources.room.get_current_key_for_room(room_id),
presence_key=0,
typing_key=0,
receipt_key=0,
receipt_key=MultiWriterStreamToken(stream=0),
account_data_key=0,
push_rules_key=0,
to_device_key=0,
Expand Down
40 changes: 36 additions & 4 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,9 @@ class StreamToken:
)
presence_key: int
typing_key: int
receipt_key: int
receipt_key: MultiWriterStreamToken = attr.ib(
validator=attr.validators.instance_of(MultiWriterStreamToken)
)
account_data_key: int
push_rules_key: int
to_device_key: int
Expand All @@ -883,8 +885,31 @@ async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
while len(keys) < len(attr.fields(cls)):
# i.e. old token from before receipt_key
keys.append("0")
clokep marked this conversation as resolved.
Show resolved Hide resolved

(
room_key,
presence_key,
typing_key,
receipt_key,
account_data_key,
push_rules_key,
to_device_key,
device_list_key,
groups_key,
un_partial_stated_rooms_key,
) = keys

return cls(
await RoomStreamToken.parse(store, keys[0]), *(int(k) for k in keys[1:])
room_key=await RoomStreamToken.parse(store, room_key),
presence_key=int(presence_key),
typing_key=int(typing_key),
receipt_key=await MultiWriterStreamToken.parse(store, receipt_key),
account_data_key=int(account_data_key),
push_rules_key=int(push_rules_key),
to_device_key=int(to_device_key),
device_list_key=int(device_list_key),
groups_key=int(groups_key),
un_partial_stated_rooms_key=int(un_partial_stated_rooms_key),
)
except CancelledError:
raise
Expand All @@ -897,7 +922,7 @@ async def to_string(self, store: "DataStore") -> str:
await self.room_key.to_string(store),
str(self.presence_key),
str(self.typing_key),
str(self.receipt_key),
await self.receipt_key.to_string(store),
str(self.account_data_key),
str(self.push_rules_key),
str(self.to_device_key),
Expand Down Expand Up @@ -925,6 +950,11 @@ def copy_and_advance(self, key: StreamKeyType, new_value: Any) -> "StreamToken":
StreamKeyType.ROOM, self.room_key.copy_and_advance(new_value)
)
return new_token
elif key == StreamKeyType.RECEIPT:
new_token = self.copy_and_replace(
StreamKeyType.RECEIPT, self.receipt_key.copy_and_advance(new_value)
)
return new_token

new_token = self.copy_and_replace(key, new_value)
new_id = new_token.get_field(key)
Expand Down Expand Up @@ -967,7 +997,9 @@ def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]:
return getattr(self, key.value)


StreamToken.START = StreamToken(RoomStreamToken(stream=0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
StreamToken.START = StreamToken(
RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down