Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove remaining usage of cursor_to_dict. #16564

Merged
merged 12 commits into from
Oct 31, 2023
47 changes: 33 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,52 +1620,71 @@ async def _remove_duplicate_outbound_pokes(
#
# For each duplicate, we delete all the existing rows and put one back.

KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
last_row = progress.get(
"last_row",
{"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
)

def _txn(txn: LoggingTransaction) -> int:
clause, args = make_tuple_comparison_clause(
[(x, last_row[x]) for x in KEY_COLS]
[
("stream_id", last_row["stream_id"]),
("destination", last_row["destination"]),
("user_id", last_row["user_id"]),
("device_id", last_row["device_id"]),
]
)
sql = """
SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
FROM device_lists_outbound_pokes
WHERE %s
GROUP BY %s
GROUP BY stream_id, destination, user_id, device_id
HAVING count(*) > 1
ORDER BY %s
ORDER BY stream_id, destination, user_id, device_id
LIMIT ?
""" % (
clause, # WHERE
clokep marked this conversation as resolved.
Show resolved Hide resolved
",".join(KEY_COLS), # GROUP BY
",".join(KEY_COLS), # ORDER BY
)
txn.execute(sql, args + [batch_size])
rows = self.db_pool.cursor_to_dict(txn)
rows = txn.fetchall()

row = None
for row in rows:
stream_id, destination, user_id, device_id = None, None, None, None
for stream_id, destination, user_id, device_id, _ in rows:
self.db_pool.simple_delete_txn(
txn,
"device_lists_outbound_pokes",
{x: row[x] for x in KEY_COLS},
{
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
},
)

row["sent"] = False
self.db_pool.simple_insert_txn(
txn,
"device_lists_outbound_pokes",
row,
{
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
"sent": False,
},
)

if row:
if rows:
self.db_pool.updates._background_update_progress_txn(
txn,
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES,
{"last_row": row},
{
"last_row": {
"stream_id": stream_id,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
}
},
)

return len(rows)
Expand Down