From 7fee8fe94e8b38f69725e885e3c29a8820084fac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Apr 2022 16:35:15 +0100 Subject: [PATCH 1/4] Only send out device list updates for our own users Broke in #12365 --- synapse/handlers/device.py | 10 +++++++--- synapse/storage/databases/main/devices.py | 4 +++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 958599e7b816..3c0fc756d46b 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -649,9 +649,13 @@ async def _handle_new_device_update_async(self) -> None: return for user_id, device_id, room_id, stream_id, opentracing_context in rows: - joined_user_ids = await self.store.get_users_in_room(room_id) - hosts = {get_domain_from_id(u) for u in joined_user_ids} - hosts.discard(self.server_name) + hosts = set() + + # Ignore any users that aren't ours + if self.hs.is_mine_id(user_id): + joined_user_ids = await self.store.get_users_in_room(room_id) + hosts = {get_domain_from_id(u) for u in joined_user_ids} + hosts.discard(self.server_name) # Check if we've already sent this update to some hosts if current_stream_id == stream_id: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 74e4e2122a20..318e4df376b2 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1703,7 +1703,9 @@ def _add_device_outbound_poke_to_stream_txn( next(stream_id_iterator), user_id, device_id, - False, + not self.hs.is_mine_id( + user_id + ), # We only need to send out update for *our* users now, encoded_context if whitelisted_homeserver(destination) else "{}", ) From dcdeba2bc61b6329bca51a057b7c84eee2fa2ae1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Apr 2022 16:36:58 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/12465.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12465.feature diff --git a/changelog.d/12465.feature b/changelog.d/12465.feature new file mode 100644 index 000000000000..642dea966c44 --- /dev/null +++ b/changelog.d/12465.feature @@ -0,0 +1 @@ +Enable processing of device list updates asynchronously. From 28d9b3cdaf97871f3f03213a121702b89d141e53 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Apr 2022 16:54:58 +0100 Subject: [PATCH 3/4] Fix up tests --- tests/storage/test_devices.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index 5491fbf6da7b..ccc3893869db 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -118,7 +118,7 @@ def test_get_device_updates_by_remote(self): device_ids = ["device_id1", "device_id2"] # Add two device updates with sequential `stream_id`s - self.add_device_change("user_id", device_ids, "somehost") + self.add_device_change("@user_id:test", device_ids, "somehost") # Get all device updates ever meant for this remote now_stream_id, device_updates = self.get_success( @@ -142,7 +142,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self): "device_id4", "device_id5", ] - self.add_device_change("user_id", device_ids, "somehost") + self.add_device_change("@user_id:test", device_ids, "somehost") # Get device updates meant for this remote next_stream_id, device_updates = self.get_success( @@ -162,7 +162,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self): # Add some more device updates to ensure it still resumes properly device_ids = ["device_id6", "device_id7"] - self.add_device_change("user_id", device_ids, "somehost") + self.add_device_change("@user_id:test", device_ids, "somehost") # Get the next batch of device updates next_stream_id, device_updates = self.get_success( From 77a987ba0b3343a9d2f3f5ad9864cf235a3bdcf4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2022 09:39:50 +0100 Subject: [PATCH 4/4] Add test --- tests/federation/test_federation_sender.py | 43 +++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 63ea4f9ee475..91f982518e69 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -162,7 +162,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): def make_homeserver(self, reactor, clock): return self.setup_test_homeserver( - federation_transport_client=Mock(spec=["send_transaction"]), + federation_transport_client=Mock( + spec=["send_transaction", "query_user_devices"] + ), ) def default_config(self): @@ -218,6 +220,45 @@ def test_send_device_updates(self): self.assertEqual(len(self.edus), 1) self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id) + def test_dont_send_device_updates_for_remote_users(self): + """Check that we don't send device updates for remote users""" + + # Send the server a device list EDU for the other user, this will cause + # it to try and resync the device lists. + self.hs.get_federation_transport_client().query_user_devices.return_value = ( + defer.succeed( + { + "stream_id": "1", + "user_id": "@user2:host2", + "devices": [{"device_id": "D1"}], + } + ) + ) + + self.get_success( + self.hs.get_device_handler().device_list_updater.incoming_device_list_update( + "host2", + { + "user_id": "@user2:host2", + "device_id": "D1", + "stream_id": "1", + "prev_ids": [], + }, + ) + ) + + self.reactor.advance(1) + + # We shouldn't see an EDU for that update + self.assertEqual(self.edus, []) + + # Check that we did successfully process the inbound EDU (otherwise this + # test would pass if we failed to process the EDU) + devices = self.get_success( + self.hs.get_datastores().main.get_cached_devices_for_user("@user2:host2") + ) + self.assertIn("D1", devices) + def test_upload_signatures(self): """Uploading signatures on some devices should produce updates for that user"""