Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow running sendToDevice on workers #9044

Merged
merged 25 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8f7e6b7
Refactor add_messages_to_device_inbox
erikjohnston Jan 7, 2021
c132b39
Support routing edu's to multiple instances
erikjohnston Jan 7, 2021
691c373
Newsfile
erikjohnston Jan 7, 2021
124f415
Support resync clients off master
erikjohnston Jan 7, 2021
e81d693
Newsfile
erikjohnston Jan 7, 2021
40d9869
Only define _last_device_delete_cache once
erikjohnston Jan 7, 2021
6da60bf
Newsfile
erikjohnston Jan 7, 2021
7386f09
Merge branch 'erikj/resync_off_master' into erikj/split_to_device_sen…
erikjohnston Jan 7, 2021
08a4f88
Merge branch 'erikj/allow_routing_edus' into erikj/split_to_device_se…
erikjohnston Jan 7, 2021
1236ac3
Use MultiWriterIDGenerator for device inbox
erikjohnston Jan 7, 2021
248832e
Allow configuring which workers handle to_device messages
erikjohnston Jan 7, 2021
811cb59
Move DB write functions to worker store
erikjohnston Jan 7, 2021
eb6121a
Assert writing to device inbox only happens on appropriate workers
erikjohnston Jan 7, 2021
7d43cb7
Wire up SendToDeviceRestServlet to work on workers
erikjohnston Jan 7, 2021
5420f01
Newsfile
erikjohnston Jan 7, 2021
36f0d2b
Merge remote-tracking branch 'origin/develop' into erikj/split_to_dev…
erikjohnston Jan 7, 2021
9a91c2d
Ensure you can only config one instance to handle to device messages,…
erikjohnston Jan 7, 2021
82286cc
Merge remote-tracking branch 'origin/develop' into erikj/split_to_dev…
erikjohnston Jan 7, 2021
d371d96
Update changelog
erikjohnston Jan 7, 2021
fc38115
Fix port script to handle new sequences
erikjohnston Jan 7, 2021
2667875
Remove redundant '_device_inbox_id_gen'
erikjohnston Jan 7, 2021
171f360
Expand comment on federation_sender
erikjohnston Jan 7, 2021
867a21f
Remove spurious changelog
erikjohnston Jan 7, 2021
07f2d9a
Fixup comment
erikjohnston Jan 7, 2021
14624e7
Fix newsfile
erikjohnston Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 1 addition & 31 deletions synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,8 @@
# limitations under the License.

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
db_conn, "device_inbox", "stream_id"
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token(),
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token(),
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
row.entity, token
)
else:
self._device_federation_outbox_stream_cache.entity_has_changed(
row.entity, token
)
return super().process_replication_rows(stream_name, instance_name, token, rows)
pass
30 changes: 0 additions & 30 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,36 +189,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
prefilled_cache=presence_cache_prefill,
)

max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
entity_column="user_id",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
)
# The federation outbox and the local device inbox uses the same
# stream_id generator.
device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_federation_outbox",
entity_column="destination",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)

device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
Expand Down
70 changes: 70 additions & 0 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
from typing import List, Tuple

from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache

logger = logging.getLogger(__name__)

Expand All @@ -29,6 +33,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._instance_name = hs.get_instance_name()

# Map of (user_id, device_id) to the last stream_id that has been
# deleted up to. This is so that we can no op deletions.
self._last_device_delete_cache = ExpiringCache(
Expand All @@ -38,6 +44,68 @@ def __init__(self, database: DatabasePool, db_conn, hs):
expiry_ms=30 * 60 * 1000,
)

if isinstance(database.engine, PostgresEngine):
self._device_inbox_id_gen = MultiWriterIdGenerator(
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
db_conn=db_conn,
db=database,
stream_name="to_device",
instance_name=self._instance_name,
table="device_inbox",
instance_column="instance_name",
id_column="stream_id",
sequence_name="device_inbox_sequence",
writers=["master"], # FIXME
)
else:
self._device_inbox_id_gen = StreamIdGenerator(
db_conn, "device_inbox", "stream_id"
)

max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
entity_column="user_id",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda wish get_cache_dict and StreamChangeCache.__init__ had better docstrings, so I could figure out what these were doing... ahwell.

)

# The federation outbox and the local device inbox uses the same
# stream_id generator.
device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_federation_outbox",
entity_column="destination",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
row.entity, token
)
else:
self._device_federation_outbox_stream_cache.entity_has_changed(
row.entity, token
)
return super().process_replication_rows(stream_name, instance_name, token, rows)

def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()

Expand Down Expand Up @@ -358,6 +426,7 @@ def add_messages_txn(txn, now_ms, stream_id):
"stream_id": stream_id,
"queued_ts": now_ms,
"messages_json": json_encoder.encode(edu),
"instance_name": self._instance_name,
}
for destination, edu in remote_messages_by_destination.items()
],
Expand Down Expand Up @@ -481,6 +550,7 @@ def _add_messages_to_local_device_inbox_txn(
"device_id": device_id,
"stream_id": stream_id,
"message_json": message_json,
"instance_name": self._instance_name,
}
for user_id, messages_by_device in local_by_user_then_device.items()
for device_id, message_json in messages_by_device.items()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE device_inbox ADD COLUMN instance_name TEXT;
ALTER TABLE device_federation_inbox ADD COLUMN instance_name TEXT;
ALTER TABLE device_federation_outbox ADD COLUMN instance_name TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE SEQUENCE IF NOT EXISTS device_inbox_sequence;
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

-- We need to take the max across both device_inbox and device_federation_outbox
-- tables as they share the ID generator
SELECT setval('device_inbox_sequence', (
SELECT GREATEST(
(SELECT COALESCE(MAX(stream_id), 1) FROM device_inbox),
(SELECT COALESCE(MAX(stream_id), 1) FROM device_federation_outbox)
)
));