From cf04f1aca38b80a259e2d0e508d7987db20d4695 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Apr 2022 12:21:35 +0100 Subject: [PATCH] Use different stream IDs for device_list_outbound_pokes --- synapse/storage/databases/main/devices.py | 30 ++++++++++++++--------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 36c7d9abba15..0d4ccb104a93 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1553,20 +1553,16 @@ async def add_device_change_to_streams( if not device_ids: return None - num_stream_ids = len(device_ids) - if hosts: - # The `device_lists_outbound_pokes` wants a different stream ID for - # each row, which is a row per host per device update. - num_stream_ids = len(hosts) * len(device_ids) - context = get_active_span_text_map() - def add_device_changes_txn(txn, stream_ids): + def add_device_changes_txn( + txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes + ): self._add_device_change_to_stream_txn( txn, user_id, device_ids, - stream_ids, + stream_ids_for_device_change, ) self._add_device_outbound_room_poke_txn( @@ -1574,7 +1570,7 @@ def add_device_changes_txn(txn, stream_ids): user_id, device_ids, room_ids, - stream_ids, + stream_ids_for_device_change, context, hosts_have_been_calculated=hosts is not None, ) @@ -1590,15 +1586,27 @@ def add_device_changes_txn(txn, stream_ids): user_id, device_ids, hosts, - stream_ids, + stream_ids_for_outbound_pokes, context, ) + # The `device_lists_stream` wants a stream ID per device update. + num_stream_ids = len(device_ids) + + if hosts: + # The `device_lists_outbound_pokes` wants a different stream ID for + # each row, which is a row per host per device update. + num_stream_ids += len(hosts) * len(device_ids) + async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids: + stream_ids_for_device_change = stream_ids[: len(device_ids)] + stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :] + await self.db_pool.runInteraction( "add_device_change_to_stream", add_device_changes_txn, - stream_ids, + stream_ids_for_device_change, + stream_ids_for_outbound_pokes, ) return stream_ids[-1]