Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add presence federation stream #9819

Merged
merged 12 commits into from
Apr 20, 2021
19 changes: 16 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ async def process_replication_rows(
)

def get_federation_queue(self) -> "PresenceFederationQueue":
"""Get the presence federation queue, if any."""
"""Get the presence federation queue."""
return self._federation_queue

async def maybe_send_presence_to_interested_destinations(
Expand Down Expand Up @@ -1878,6 +1878,10 @@ class PresenceFederationQueue:
Only the last N minutes will be queued, so if a federation sender instance
is down for longer then some updates will be dropped. This is OK as presence
is ephemeral, and so it will self correct eventually.

On workers the class tracks the last received position of the stream from
replication, and handles querying for missed updates over HTTP replication,
c.f. `get_current_token` and `get_replication_rows`.
"""

# How long to keep entries in the queue for. Workers that are down for
Expand All @@ -1901,7 +1905,7 @@ def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
# Whether this instance is a presence writer.
self._presence_writer = hs.config.worker.worker_app is None

# The federation sender if this instance is a federation sender.
# The FederationSender instance, if this process sends federation traffic directly.
self._federation = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it called _federation if its a federation sender?

Small nitpick, resolve at will, I'm just pointing at the naming

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wait, i see, it is an interface to send to federation through

What is the actual type of this variable, though? Is there an interface type that describes this behaviour? (I'd like it to be annotated with it)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an Optional[FederationSender], aiui.


if hs.should_send_federation():
Expand Down Expand Up @@ -1946,6 +1950,8 @@ def send_presence_to_destinations(

Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).

Must only be called on the master process.
"""

# This should only be called on a presence writer.
Expand All @@ -1970,6 +1976,11 @@ def send_presence_to_destinations(
self._notifier.notify_replication()

def get_current_token(self, instance_name: str) -> int:
"""Get the current position of the stream.

On workers this returns the last stream ID received from replication.
"""

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
if instance_name == self._instance_name:
return self._next_id - 1
else:
Expand All @@ -1986,9 +1997,11 @@ async def get_replication_rows(

We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).

On workers this will query the master process via HTTP replication.
"""
if instance_name != self._instance_name:
# If not local we query over replication.
# If not local we query over http replication from the master
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def make_homeserver(self, reactor, clock):
return hs

def default_config(self):
config = unittest.default_config("test")
config = super().default_config()
config["send_federation"] = True
return config

Expand Down