Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix federation stall on concurrent access errors #9639

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9639.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes #9635.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
45 changes: 9 additions & 36 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache

Expand Down Expand Up @@ -312,49 +311,23 @@ async def store_destination_rooms_entries(
stream_ordering: the stream_ordering of the event
"""

return await self.db_pool.runInteraction(
"store_destination_rooms_entries",
self._store_destination_rooms_entries_txn,
destinations,
room_id,
stream_ordering,
await self.db_pool.simple_upsert_many(
table="destinations",
key_names=("destination",),
key_values=[(d,) for d in destinations],
value_names=[],
value_values=[],
desc="store_destination_rooms_entries_dests",
)

def _store_destination_rooms_entries_txn(
self,
txn: LoggingTransaction,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:

# ensure we have a `destinations` row for this destination, as there is
# a foreign key constraint.
if isinstance(self.database_engine, PostgresEngine):
q = """
INSERT INTO destinations (destination)
VALUES (?)
ON CONFLICT DO NOTHING;
"""
elif isinstance(self.database_engine, Sqlite3Engine):
q = """
INSERT OR IGNORE INTO destinations (destination)
VALUES (?);
"""
else:
raise RuntimeError("Unknown database engine")

txn.execute_batch(q, ((destination,) for destination in destinations))

rows = [(destination, room_id) for destination in destinations]

self.db_pool.simple_upsert_many_txn(
txn,
await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
desc="store_destination_rooms_entries_rooms",
)

async def get_destination_last_successful_stream_ordering(
Expand Down