Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert _insert_graph_receipts_txn to simple_upsert #16299

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16299.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `receipts_graph` Postgres transactions to stop error messages.
3 changes: 3 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,7 @@ async def simple_upsert(
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None,
desc: str = "simple_upsert",
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
Expand Down Expand Up @@ -1243,6 +1244,7 @@ async def simple_upsert(
keyvalues: The unique key columns and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
desc: description of the transaction, for logging and metrics
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
Expand All @@ -1263,6 +1265,7 @@ async def simple_upsert(
keyvalues,
values,
insertion_values,
where_clause,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
Expand Down
23 changes: 9 additions & 14 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,7 @@ async def insert_receipt(
now - event_ts,
)

await self.db_pool.runInteraction(
"insert_graph_receipt",
self._insert_graph_receipt_txn,
await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
Expand All @@ -810,9 +808,8 @@ async def insert_receipt(

return stream_id, max_persisted_id

def _insert_graph_receipt_txn(
async def _insert_graph_receipt(
self,
txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
Expand All @@ -822,13 +819,6 @@ def _insert_graph_receipt_txn(
) -> None:
assert self._can_write_to_receipts

txn.call_after(
self._get_receipts_for_user_with_orderings.invalidate,
(user_id, receipt_type),
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
Expand All @@ -840,8 +830,8 @@ def _insert_graph_receipt_txn(
else:
keyvalues["thread_id"] = thread_id

self.db_pool.simple_upsert_txn(
txn,
await self.db_pool.simple_upsert(
clokep marked this conversation as resolved.
Show resolved Hide resolved
desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
Expand All @@ -851,6 +841,11 @@ def _insert_graph_receipt_txn(
where_clause=where_clause,
)

self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))

# FIXME: This shouldn't invalidate the whole cache
self._get_linearized_receipts_for_room.invalidate((room_id,))


class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
Expand Down
Loading