Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow running sendToDevice on workers (#9044)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Jan 7, 2021
1 parent 5e99a94 commit b530eaa
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 105 deletions.
1 change: 1 addition & 0 deletions changelog.d/9044.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for handling and persistence of to-device messages to happen on worker processes.
27 changes: 27 additions & 0 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ class Porter(object):
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
await self._setup_device_inbox_seq()

# Step 3. Get tables.
self.progress.set_state("Fetching tables")
Expand Down Expand Up @@ -911,6 +912,32 @@ class Porter(object):
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)

async def _setup_device_inbox_seq(self):
"""Set the device inbox sequence to the correct value.
"""
curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_inbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)

curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_federation_outbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)

next_id = max(curr_local_id, curr_federation_id) + 1

def r(txn):
txn.execute(
"ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
)

return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)


##############################################
# The following is simply UI stuff
Expand Down
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
)
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
Expand Down Expand Up @@ -520,6 +521,8 @@ def _listen_http(self, listener_config: ListenerConfig):
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)

SendToDeviceRestServlet(self).register(resource)

user_directory.register_servlets(self, resource)

# If presence is disabled, use the stub servlet that does
Expand Down
10 changes: 9 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class WriterLocations:
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str)
to_device = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter,
)


class WorkerConfig(Config):
Expand Down Expand Up @@ -124,7 +127,7 @@ def read_config(self, config, **kwargs):

# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
for stream in ("events", "typing", "to_device"):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
Expand All @@ -133,6 +136,11 @@ def read_config(self, config, **kwargs):
% (instance, stream)
)

if len(self.writers.to_device) != 1:
raise ConfigError(
"Must only specify one instance to handle `to_device` messages."
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

# Whether this worker should run background tasks or not.
Expand Down
31 changes: 23 additions & 8 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,25 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()

hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
# We only need to poke the federation sender explicitly if its on the
# same instance. Other federation sender instances will get notified by
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
# in the to-device replication stream.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()

# If we can handle the to device EDUs we do so, otherwise we route them
# to the appropriate worker.
if hs.get_instance_name() in hs.config.worker.writers.to_device:
hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
else:
hs.get_federation_registry().register_instances_for_edu(
"m.direct_to_device", hs.config.worker.writers.to_device,
)

# The handler to call when we think a user's device list might be out of
# sync. We do all device list resyncing on the master instance, so if
Expand Down Expand Up @@ -204,7 +218,8 @@ async def send_device_message(
)

log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation.send_device_messages(destination)
if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)
32 changes: 1 addition & 31 deletions synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,8 @@
# limitations under the License.

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
db_conn, "device_inbox", "stream_id"
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token(),
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token(),
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
row.entity, token
)
else:
self._device_federation_outbox_stream_cache.entity_has_changed(
row.entity, token
)
return super().process_replication_rows(stream_name, instance_name, token, rows)
pass
9 changes: 9 additions & 0 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
EventsStream,
FederationStream,
Stream,
ToDeviceStream,
TypingStream,
)

Expand Down Expand Up @@ -115,6 +116,14 @@ def __init__(self, hs):

continue

if isinstance(stream, ToDeviceStream):
# Only add ToDeviceStream as a source on instances in charge of
# sending to device messages.
if hs.get_instance_name() in hs.config.worker.writers.to_device:
self._streams_to_replicate.append(stream)

continue

if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
Expand Down
33 changes: 0 additions & 33 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
self._device_inbox_id_gen = StreamIdGenerator(
db_conn, "device_inbox", "stream_id"
)
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
Expand Down Expand Up @@ -189,36 +186,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
prefilled_cache=presence_cache_prefill,
)

max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
entity_column="user_id",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
)
# The federation outbox and the local device inbox uses the same
# stream_id generator.
device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_federation_outbox",
entity_column="destination",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)

device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
Expand Down
Loading

0 comments on commit b530eaa

Please sign in to comment.