Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a background task to purge unused chain IDs. #9542

Merged
merged 8 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9542.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly purge the event chain cover index when purging history.
clokep marked this conversation as resolved.
Show resolved Hide resolved
79 changes: 79 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._chain_cover_index,
)

self.db_pool.updates.register_background_update_handler(
"purged_chain_cover",
self._purged_chain_cover_index,
)

async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
Expand Down Expand Up @@ -932,3 +937,77 @@ def _calculate_chain_cover_txn(
processed_count=count,
finished_room_map=finished_rooms,
)

async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int:
"""
A background updates that iterates over the chain cover and deletes the
chain cover for events that have been purged.

This may be due to fully purging a room or via setting a retention policy.
"""
current_event_id = progress.get("current_event_id", "")

def purged_chain_cover_txn(txn) -> int:
# The event ID from events will be null if the chain ID / sequence
# number points to a purged event.
sql = """
SELECT event_id, chain_id, sequence_number, e.event_id
FROM event_auth_chains
LEFT JOIN events AS e USING (event_id)
WHERE event_id > ? ORDER BY event_id ASC LIMIT ?
"""
txn.execute(sql, (current_event_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

# The event IDs and chain IDs / sequence numbers where the event has
# been purged.
unreferenced_event_ids = []
unreferenced_chain_id_tuples = []
event_id = ""
for event_id, chain_id, sequence_number, events_event_id in rows:
clokep marked this conversation as resolved.
Show resolved Hide resolved
if not events_event_id:
unreferenced_event_ids.append(event_id)
unreferenced_chain_id_tuples.append((chain_id, sequence_number))

# Delete the unreferenced auth chains from event_auth_chain_links and
# event_auth_chains.
txn.executemany(
"""
DELETE FROM event_auth_chains WHERE event_id = ?
""",
unreferenced_event_ids,
)
txn.executemany(
"""
DELETE FROM event_auth_chain_links WHERE
(origin_chain_id = ? AND origin_sequence_number = ?) OR
(target_chain_id = ? AND target_sequence_number = ?)
clokep marked this conversation as resolved.
Show resolved Hide resolved
""",
(
(chain_id, seq_num, chain_id, seq_num)
for (chain_id, seq_num) in unreferenced_chain_id_tuples
),
)

progress = {
"current_event_id": event_id,
}

self.db_pool.updates._background_update_progress_txn(
txn, "purged_chain_cover", progress
)

return len(rows)

result = await self.db_pool.runInteraction(
"_purged_chain_cover_index",
purged_chain_cover_txn,
)

if not result:
await self.db_pool.updates._end_background_update("purged_chain_cover")

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5910, 'purged_chain_cover', '{}');