Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert state delta processing from a dict to attrs. (#16469)
Browse files Browse the repository at this point in the history
For improved type checking & memory usage.
  • Loading branch information
clokep authored Oct 16, 2023
1 parent 4fe73f8 commit e3e0ae4
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 109 deletions.
1 change: 1 addition & 0 deletions changelog.d/16469.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve type hints.
32 changes: 16 additions & 16 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -1499,9 +1500,9 @@ async def _unsafe_process(self) -> None:
# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
deltas_by_room: Dict[str, List[JsonDict]] = {}
deltas_by_room: Dict[str, List[StateDelta]] = {}
for delta in deltas:
deltas_by_room.setdefault(delta["room_id"], []).append(delta)
deltas_by_room.setdefault(delta.room_id, []).append(delta)

for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)
Expand All @@ -1513,7 +1514,7 @@ async def _unsafe_process(self) -> None:
max_pos
)

async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
Expand All @@ -1524,39 +1525,38 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non
newly_joined_users = set()

for delta in deltas:
assert room_id == delta["room_id"]
assert room_id == delta.room_id

typ = delta["type"]
state_key = delta["state_key"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

# Drop any event that isn't a membership join
if typ != EventTypes.Member:
if delta.event_type != EventTypes.Member:
continue

if event_id is None:
if delta.event_id is None:
# state has been deleted, so this is not a join. We only care about
# joins.
continue

event = await self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(delta.event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue

if prev_event_id:
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
if delta.prev_event_id:
prev_event = await self.store.get_event(
delta.prev_event_id, allow_none=True
)
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
):
# Ignore changes to join events.
continue

newly_joined_users.add(state_key)
newly_joined_users.add(delta.state_key)

if not newly_joined_users:
# If nobody has joined then there's nothing to do.
Expand Down
21 changes: 8 additions & 13 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
import random
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple

from synapse import types
from synapse.api.constants import (
Expand Down Expand Up @@ -44,6 +44,7 @@
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
JsonDict,
Requester,
Expand Down Expand Up @@ -2146,24 +2147,18 @@ async def _unsafe_process(self) -> None:

await self._store.update_room_forgetter_stream_pos(max_pos)

async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
async def _handle_deltas(self, deltas: List[StateDelta]) -> None:
"""Called with the state deltas to process"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

if typ != EventTypes.Member:
if delta.event_type != EventTypes.Member:
continue

if not self._hs.is_mine_id(state_key):
if not self._hs.is_mine_id(delta.state_key):
continue

change = await self._get_key_change(
prev_event_id,
event_id,
delta.prev_event_id,
delta.event_id,
key_name="membership",
public_value=Membership.JOIN,
)
Expand All @@ -2172,7 +2167,7 @@ async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
if is_leave:
try:
await self._room_member_handler.forget(
UserID.from_string(state_key), room_id
UserID.from_string(delta.state_key), delta.room_id
)
except SynapseError as e:
if e.code == 400:
Expand Down
64 changes: 32 additions & 32 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import JsonDict

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,7 +143,7 @@ async def _unsafe_process(self) -> None:
self.pos = max_pos

async def _handle_deltas(
self, deltas: Iterable[JsonDict]
self, deltas: Iterable[StateDelta]
) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]:
"""Called with the state deltas to process
Expand All @@ -157,51 +158,50 @@ async def _handle_deltas(
room_to_state_updates: Dict[str, Dict[str, Any]] = {}

for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]

logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
logger.debug(
"Handling: %r, %r %r, %s",
delta.room_id,
delta.event_type,
delta.state_key,
delta.event_id,
)

token = await self.store.get_earliest_token_for_stats("room", room_id)
token = await self.store.get_earliest_token_for_stats("room", delta.room_id)

# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
if token is not None and token >= stream_id:
if token is not None and token >= delta.stream_id:
logger.debug(
"Ignoring: %s as earlier than this room's initial ingestion event",
event_id,
delta.event_id,
)
continue

if event_id is None and prev_event_id is None:
if delta.event_id is None and delta.prev_event_id is None:
logger.error(
"event ID is None and so is the previous event ID. stream_id: %s",
stream_id,
delta.stream_id,
)
continue

event_content: JsonDict = {}

if event_id is not None:
event = await self.store.get_event(event_id, allow_none=True)
if delta.event_id is not None:
event = await self.store.get_event(delta.event_id, allow_none=True)
if event:
event_content = event.content or {}

# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
room_stats_delta = room_to_stats_deltas.setdefault(delta.room_id, Counter())

room_state = room_to_state_updates.setdefault(room_id, {})
room_state = room_to_state_updates.setdefault(delta.room_id, {})

if prev_event_id is None:
if delta.prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] += 1

if typ == EventTypes.Member:
if delta.event_type == EventTypes.Member:
# we could use StateDeltasHandler._get_key_change here but it's
# a bit inefficient given we're not testing for a specific
# result; might as well just grab the prev_membership and
Expand All @@ -210,9 +210,9 @@ async def _handle_deltas(
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
if delta.prev_event_id is not None:
prev_event = await self.store.get_event(
prev_event_id, allow_none=True
delta.prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
Expand Down Expand Up @@ -256,7 +256,7 @@ async def _handle_deltas(
else:
raise ValueError("%r is not a valid membership" % (membership,))

user_id = state_key
user_id = delta.state_key
if self.is_mine_id(user_id):
# this accounts for transitions like leave → ban and so on.
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
Expand All @@ -272,30 +272,30 @@ async def _handle_deltas(

room_stats_delta["local_users_in_room"] += membership_delta

elif typ == EventTypes.Create:
elif delta.event_type == EventTypes.Create:
room_state["is_federatable"] = (
event_content.get(EventContentFields.FEDERATE, True) is True
)
room_type = event_content.get(EventContentFields.ROOM_TYPE)
if isinstance(room_type, str):
room_state["room_type"] = room_type
elif typ == EventTypes.JoinRules:
elif delta.event_type == EventTypes.JoinRules:
room_state["join_rules"] = event_content.get("join_rule")
elif typ == EventTypes.RoomHistoryVisibility:
elif delta.event_type == EventTypes.RoomHistoryVisibility:
room_state["history_visibility"] = event_content.get(
"history_visibility"
)
elif typ == EventTypes.RoomEncryption:
elif delta.event_type == EventTypes.RoomEncryption:
room_state["encryption"] = event_content.get("algorithm")
elif typ == EventTypes.Name:
elif delta.event_type == EventTypes.Name:
room_state["name"] = event_content.get("name")
elif typ == EventTypes.Topic:
elif delta.event_type == EventTypes.Topic:
room_state["topic"] = event_content.get("topic")
elif typ == EventTypes.RoomAvatar:
elif delta.event_type == EventTypes.RoomAvatar:
room_state["avatar"] = event_content.get("url")
elif typ == EventTypes.CanonicalAlias:
elif delta.event_type == EventTypes.CanonicalAlias:
room_state["canonical_alias"] = event_content.get("alias")
elif typ == EventTypes.GuestAccess:
elif delta.event_type == EventTypes.GuestAccess:
room_state["guest_access"] = event_content.get(
EventContentFields.GUEST_ACCESS
)
Expand Down
34 changes: 17 additions & 17 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, List, Optional, Set, Tuple

from twisted.internet.interfaces import IDelayedCall

Expand All @@ -23,6 +23,7 @@
from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
from synapse.types import UserID
Expand Down Expand Up @@ -247,32 +248,31 @@ async def _unsafe_process(self) -> None:

await self.store.update_user_directory_stream_pos(max_pos)

async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
async def _handle_deltas(self, deltas: List[StateDelta]) -> None:
"""Called with the state deltas to process"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id: Optional[str] = delta["event_id"]
prev_event_id: Optional[str] = delta["prev_event_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

# For join rule and visibility changes we need to check if the room
# may have become public or not and add/remove the users in said room
if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
if delta.event_type in (
EventTypes.RoomHistoryVisibility,
EventTypes.JoinRules,
):
await self._handle_room_publicity_change(
room_id, prev_event_id, event_id, typ
delta.room_id, delta.prev_event_id, delta.event_id, delta.event_type
)
elif typ == EventTypes.Member:
elif delta.event_type == EventTypes.Member:
await self._handle_room_membership_event(
room_id,
prev_event_id,
event_id,
state_key,
delta.room_id,
delta.prev_event_id,
delta.event_id,
delta.state_key,
)
else:
logger.debug("Ignoring irrelevant type: %r", typ)
logger.debug("Ignoring irrelevant type: %r", delta.event_type)

async def _handle_room_publicity_change(
self,
Expand Down
14 changes: 2 additions & 12 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Collection,
Dict,
Expand All @@ -32,6 +31,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.roommember import ProfileInfo
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
Expand Down Expand Up @@ -531,19 +531,9 @@ async def get_server_acl_for_room(
@tag_args
async def get_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
) -> Tuple[int, List[Dict[str, Any]]]:
) -> Tuple[int, List[StateDelta]]:
"""Fetch a list of room state changes since the given stream id
Each entry in the result contains the following fields:
- stream_id (int)
- room_id (str)
- type (str): event type
- state_key (str):
- event_id (str|None): new event_id for this state key. None if the
state has been deleted.
- prev_event_id (str|None): previous event_id for this state key. None
if it's new state.
Args:
prev_stream_id: point to get changes since (exclusive)
max_stream_id: the point that we know has been correctly persisted
Expand Down
Loading

0 comments on commit e3e0ae4

Please sign in to comment.