Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Pass all replication rows to presence handler
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Apr 19, 2021
1 parent 2b7dd21 commit 25cb535
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
16 changes: 13 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
ReplicationPresenceSetState,
)
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceStream
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
Expand Down Expand Up @@ -253,8 +254,10 @@ async def update_external_syncs_clear(self, process_id):
"""
pass

async def process_replication_rows(self, token, rows):
"""Process presence stream rows received over replication."""
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
"""Process streams received over replication."""
pass

async def maybe_send_presence_to_interested_destinations(
Expand Down Expand Up @@ -421,7 +424,14 @@ async def notify_from_replication(self, states, stream_id):
# If this is a federation sender, notify about presence updates.
await self.maybe_send_presence_to_interested_destinations(states)

async def process_replication_rows(self, token, rows):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
await super().process_replication_rows(stream_name, instance_name, token, rows)

if stream_name != PresenceStream.NAME:
return

states = [
UserPresenceState(
row.user_id,
Expand Down
6 changes: 4 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ async def on_rdata(
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == PresenceStream.NAME:
await self._presence_handler.process_replication_rows(token, rows)
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down Expand Up @@ -221,6 +219,10 @@ async def on_rdata(
membership=row.data.membership,
)

await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
Expand Down

0 comments on commit 25cb535

Please sign in to comment.