Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add a presence federation replication stream
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Apr 19, 2021
1 parent 25cb535 commit 885c3d9
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 13 deletions.
204 changes: 192 additions & 12 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import abc
import contextlib
import logging
from bisect import bisect
from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -53,8 +54,9 @@
ReplicationBumpPresenceActiveTime,
ReplicationPresenceSetState,
)
from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceStream
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
Expand Down Expand Up @@ -133,6 +135,8 @@ def __init__(self, hs: "HomeServer"):

self._send_federation = hs.should_send_federation()

self._federation_queue = PresenceFederationQueue(hs, self)

self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

active_presence = self.store.take_presence_startup_info()
Expand Down Expand Up @@ -258,7 +262,13 @@ async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
"""Process streams received over replication."""
pass
await self._federation_queue.process_replication_rows(
stream_name, instance_name, token, rows
)

def get_federation_queue(self) -> "PresenceFederationQueue":
"""Get the presence federation queue, if any."""
return self._federation_queue

async def maybe_send_presence_to_interested_destinations(
self, states: List[UserPresenceState]
Expand Down Expand Up @@ -734,12 +744,10 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
self.state,
)

# Since this is master we know that we have a federation sender or
# queue, and so this will be defined.
assert self._federation

for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
self._federation_queue.send_presence_to_destinations(
states, destinations
)

async def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
Expand Down Expand Up @@ -1218,13 +1226,9 @@ async def _handle_state_delta(self, deltas):
user_presence_states
)

# Since this is master we know that we have a federation sender or
# queue, and so this will be defined.
assert self._federation

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self._federation.send_presence_to_destinations(
self._federation_queue.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

Expand Down Expand Up @@ -1869,3 +1873,179 @@ async def get_interested_remotes(
hosts_and_states.append(([host], states))

return hosts_and_states


class PresenceFederationQueue:
"""Handles sending ad hoc presence updates over federation, which are *not*
due to state updates (that get handled via the presence stream), e.g.
federation pings and sending existing present states to newly joined hosts.
Only the last N minutes will be queued, so if a federation sender instance
is down for longer then some updates will be dropped. This is OK as presence
is ephemeral, and so it will self correct eventually.
"""

# How long to keep entries in the queue for. Workers that are down for
# longer than this duration will miss out on older updates.
_KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000

# How often to check if we can expire entries from the queue.
_CLEAR_ITEMS_EVERY_MS = 60 * 1000

def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
self._clock = hs.get_clock()
self._notifier = hs.get_notifier()
self._instance_name = hs.get_instance_name()
self._presence_handler = presence_handler
self._repl_client = ReplicationGetStreamUpdates.make_client(hs)

# Should we keep a queue of recent presence updates? We only bother if
# another process may be handling federation sending.
self._queue_presence_updates = True

# Whether this instance is a presence writer.
self._presence_writer = hs.config.worker.worker_app is None

# The federation sender if this instance is a federation sender.
self._federation = None

if hs.should_send_federation():
self._federation = hs.get_federation_sender()

# We don't bother queuing up presence states if only this instance
# is sending federation.
if hs.config.worker.federation_shard_config.instances == [
self._instance_name
]:
self._queue_presence_updates = False

# The queue of recently queued updates as tuples of: `(timestamp,
# stream_id, destinations, user_ids)`. We don't store the full states
# for efficiency, and remote workers will already have the full states
# cached.
self._queue = [] # type: List[Tuple[int, int, Collection[str], Set[str]]]

self._next_id = 1

# Map from instance name to current token
self._current_tokens = {} # type: Dict[str, int]

if self._queue_presence_updates:
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)

def _clear_queue(self):
"""Clear out older entries from the queue."""
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS

# The queue is sorted by timestamp, so we can bisect to find the right
# place to purge before. Note that we are searching using a 1-tuple with
# the time, which does The Right Thing since the queue is a tuple where
# the first item is a timestamp.
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]

def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: Collection[str]
) -> None:
"""Send the presence states to the given destinations.
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).
"""

# This should only be called on a presence writer.
assert self._presence_writer

if self._federation:
self._federation.send_presence_to_destinations(states, destinations)

if not self._queue_presence_updates:
return

now = self._clock.time_msec()

stream_id = self._next_id
self._next_id += 1

self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))

self._notifier.notify_replication()

def get_current_token(self, instance_name: str) -> int:
if instance_name == self._instance_name:
return self._next_id - 1
else:
return self._current_tokens.get(instance_name, 0)

async def get_replication_rows(
self,
instance_name: str,
from_token: int,
upto_token: int,
target_row_count: int,
) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
"""Get all the updates between the two tokens.
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).
"""
if instance_name != self._instance_name:
# If not local we query over replication.
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
from_token=from_token,
upto_token=upto_token,
)
return result["updates"], result["upto_token"], result["limited"]

# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
start_idx = max(from_token - self._next_id, -len(self._queue))

to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
if stream_id > upto_token:
break

new_id = stream_id

to_send.extend(
(stream_id, (destination, user_id))
for destination in destinations
for user_id in user_ids
)

if len(to_send) > target_row_count:
limited = True
break

return to_send, new_id, limited

async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
if stream_name != PresenceFederationStream.NAME:
return

# We keep track of the current tokens
self._current_tokens[instance_name] = token

# If we're a federation sender we pull out the presence states to send
# and forward them on.
if not self._federation:
return

hosts_to_users = {} # type: Dict[str, Set[str]]
for row in rows:
hosts_to_users.setdefault(row.destination, set()).add(row.user_id)

for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
self._federation.send_presence_to_destinations(states.values(), [host])
1 change: 0 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
AccountDataStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
CachesStream,
DeviceListsStream,
GroupServerStream,
PresenceFederationStream,
PresenceStream,
PublicRoomsStream,
PushersStream,
Expand All @@ -50,6 +51,7 @@
EventsStream,
BackfillStream,
PresenceStream,
PresenceFederationStream,
TypingStream,
ReceiptsStream,
PushRulesStream,
Expand All @@ -71,6 +73,7 @@
"Stream",
"BackfillStream",
"PresenceStream",
"PresenceFederationStream",
"TypingStream",
"ReceiptsStream",
"PushRulesStream",
Expand Down
24 changes: 24 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,30 @@ def __init__(self, hs):
)


class PresenceFederationStream(Stream):
"""A stream used to send ad hoc presence updates over federation.
Streams the remote destination and the user ID of the presence state to
send.
"""

@attr.s(slots=True, auto_attribs=True)
class PresenceFederationStreamRow:
destination: str
user_id: str

NAME = "presence_federation"
ROW_TYPE = PresenceFederationStreamRow

def __init__(self, hs: "HomeServer"):
federation_queue = hs.get_presence_handler().get_federation_queue()
super().__init__(
hs.get_instance_name(),
federation_queue.get_current_token,
federation_queue.get_replication_rows,
)


class TypingStream(Stream):
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
Expand Down
Loading

0 comments on commit 885c3d9

Please sign in to comment.