Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Commit

Permalink
Add background job to populate event_stream_ordering on membership …
Browse files Browse the repository at this point in the history
…tables
  • Loading branch information
Fizzadar committed Feb 21, 2023
1 parent 9e15a7f commit da71761
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 1 deletion.
104 changes: 103 additions & 1 deletion synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import attr

from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -71,6 +71,10 @@ class _BackgroundUpdates:

EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"

POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = (
"populate_membership_event_stream_ordering"
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
Expand Down Expand Up @@ -99,6 +103,10 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
self._populate_membership_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
self._background_reindex_origin_server_ts,
Expand Down Expand Up @@ -1498,3 +1506,97 @@ def _populate_txn(txn: LoggingTransaction) -> bool:
)

return batch_size

async def _populate_membership_event_stream_ordering(
self, progress: JsonDict, batch_size: int
) -> int:
def _populate_membership_event_stream_ordering(
txn: LoggingTransaction,
) -> bool:

if "max_stream_ordering" in progress:
max_stream_ordering = progress["max_stream_ordering"]
else:
txn.execute("SELECT max(stream_ordering) FROM events")
res = txn.fetchone()
if res is None or res[0] is None:
return True
else:
max_stream_ordering = res[0]

start = progress.get("stream_ordering", 0)
stop = start + batch_size

sql = f"""
SELECT room_id, event_id, stream_ordering
FROM events
WHERE
type = '{EventTypes.Member}'
AND stream_ordering >= ?
AND stream_ordering < ?
"""
txn.execute(sql, (start, stop))

rows: List[Tuple[str, str, int]] = cast(
List[Tuple[str, str, int]], txn.fetchall()
)

event_ids: List[Tuple[str]] = []
event_stream_orderings: List[Tuple[int]] = []

for _, event_id, event_stream_ordering in rows:
event_ids.append((event_id,))
event_stream_orderings.append((event_stream_ordering,))

self.db_pool.simple_update_many_txn(
txn,
table="current_state_events",
key_names=("event_id",),
key_values=event_ids,
value_names=("event_stream_ordering",),
value_values=event_stream_orderings,
)

self.db_pool.simple_update_many_txn(
txn,
table="room_memberships",
key_names=("event_id",),
key_values=event_ids,
value_names=("event_stream_ordering",),
value_values=event_stream_orderings,
)

# NOTE: local_current_membership has no index on event_id, so only
# the room ID here will reduce the query rows read.
for room_id, event_id, event_stream_ordering in rows:
txn.execute(
"""
UPDATE local_current_membership
SET event_stream_ordering = ?
WHERE room_id = ? AND event_id = ?
""",
(event_stream_ordering, room_id, event_id),
)

self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
{
"stream_ordering": stop,
"max_stream_ordering": max_stream_ordering,
},
)

return stop > max_stream_ordering

finished = await self.db_pool.runInteraction(
"_populate_membership_event_stream_ordering",
_populate_membership_event_stream_ordering,
)

if finished:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING
)

return batch_size
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;

INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7401, 'populate_membership_event_stream_ordering', '{}');

0 comments on commit da71761

Please sign in to comment.