Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert EventStreamResult to attrs. #11574

Merged
merged 2 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11574.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert `EventStreamResult` from a `namedtuple` to `attrs` to improve type hints.
7 changes: 4 additions & 3 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ async def get_stream(
# thundering herds on restart.
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))

events, tokens = await self.notifier.get_events_for(
stream_result = await self.notifier.get_events_for(
auth_user,
pagin_config,
timeout,
is_guest=is_guest,
explicit_room_id=room_id,
)
events = stream_result.events

time_now = self.clock.time_msec()

Expand Down Expand Up @@ -128,8 +129,8 @@ async def get_stream(

chunk = {
"chunk": chunks,
"start": await tokens[0].to_string(self.store),
"end": await tokens[1].to_string(self.store),
"start": await stream_result.start_token.to_string(self.store),
"end": await stream_result.end_token.to_string(self.store),
}

return chunk
Expand Down
25 changes: 19 additions & 6 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import logging
from collections import namedtuple
from typing import (
Awaitable,
Callable,
Expand Down Expand Up @@ -44,7 +43,13 @@
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
from synapse.types import (
JsonDict,
PersistedEventPosition,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -178,7 +183,12 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
return _NotificationListener(self.notify_deferred.observe())


class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventStreamResult:
events: List[Union[JsonDict, EventBase]]
start_token: StreamToken
end_token: StreamToken

def __bool__(self):
return bool(self.events)

Expand Down Expand Up @@ -582,9 +592,12 @@ async def check_for_updates(
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
if after_token == before_token:
return EventStreamResult([], (from_token, from_token))
return EventStreamResult([], from_token, from_token)

events: List[EventBase] = []
# The events fetched from each source are a JsonDict, EventBase, or
# UserPresenceState, but see below for UserPresenceState being
# converted to JsonDict.
events: List[Union[JsonDict, EventBase]] = []
end_token = from_token

for name, source in self.event_sources.sources.get_sources():
Expand Down Expand Up @@ -623,7 +636,7 @@ async def check_for_updates(
events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)

return EventStreamResult(events, (from_token, end_token))
return EventStreamResult(events, from_token, end_token)

user_id_for_stream = user.to_string()
if is_peeking:
Expand Down