Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up user directory rebuild for users some more... #15665

Merged
merged 6 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15665.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up rebuilding of the user directory for local users.
186 changes: 110 additions & 76 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import unicodedata
from typing import (
TYPE_CHECKING,
Collection,
Iterable,
List,
Mapping,
Expand Down Expand Up @@ -45,7 +46,7 @@
if TYPE_CHECKING:
from synapse.server import HomeServer

from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
Expand Down Expand Up @@ -356,13 +357,26 @@ async def _populate_user_directory_process_users(
Add all local users to the user directory.
"""

def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
sql = "SELECT user_id FROM %s LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())
def _populate_user_directory_process_users_txn(
txn: LoggingTransaction,
) -> Optional[int]:
if self.database_engine.supports_returning:
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(List[Tuple[str]], txn.fetchall())
else:
sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())

if not user_result:
return None
Expand All @@ -378,85 +392,81 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
assert count_result is not None
progress["remaining"] = count_result[0]

return users_to_work_on

users_to_work_on = await self.db_pool.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
if not users_to_work_on:
return None

# No more users -- complete the transaction.
if not users_to_work_on:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
logger.debug(
"Processing the next %d users of %d remaining",
len(users_to_work_on),
progress["remaining"],
)
return 1

logger.debug(
"Processing the next %d users of %d remaining"
% (len(users_to_work_on), progress["remaining"])
)

# First filter down to users we want to insert into the user directory.
users_to_insert = [
user_id
for user_id in users_to_work_on
if await self.should_include_local_user_in_dir(user_id)
]
# First filter down to users we want to insert into the user directory.
users_to_insert = self._filter_local_users_for_dir_txn(
txn, users_to_work_on
)

# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = await self.db_pool.simple_select_many_batch(
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
desc="populate_user_directory_process_users_get_profiles",
)
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = self.db_pool.simple_select_many_txn(
txn,
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
)
for row in profile_rows
}
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
)
for row in profile_rows
}

profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]
profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]

# Actually insert the users with their profiles into the directory.
self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)

# We've finished processing the users. Delete it from the table, if
# we haven't already.
if not self.database_engine.supports_returning:
self.db_pool.simple_delete_many_txn(
txn,
table=TEMP_TABLE + "_users",
column="user_id",
values=users_to_work_on,
keyvalues={},
)

# Actually insert the users with their profiles into the directory.
await self.db_pool.runInteraction(
"populate_user_directory_process_users_insertion",
self._update_profiles_in_user_dir_txn,
profiles_to_insert,
)
# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
self.db_pool.updates._background_update_progress_txn(
txn, "populate_user_directory_process_users", progress
)
return len(users_to_work_on)

# We've finished processing the users. Delete it from the table.
await self.db_pool.simple_delete_many(
table=TEMP_TABLE + "_users",
column="user_id",
iterable=users_to_work_on,
keyvalues={},
desc="populate_user_directory_process_users_delete",
processed_count = await self.db_pool.runInteraction(
"populate_user_directory_temp", _populate_user_directory_process_users_txn
)

# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)
# No more users -- complete the transaction.
if not processed_count:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
)
return 1

return len(users_to_work_on)
return processed_count

async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Expand Down Expand Up @@ -494,6 +504,30 @@ async def should_include_local_user_in_dir(self, user: str) -> bool:

return True

def _filter_local_users_for_dir_txn(
self, txn: LoggingTransaction, users: Collection[str]
) -> Collection[str]:
Comment on lines +511 to +513
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to just replace should_include_local_user_in_dir (or have it call this?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my original plan, but it turns out that the functions called in should_include_local_user_in_dir are cached. I'm really not sure how much we are relying on those caches, so decided to leave it as is 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

"""A batched version of `should_include_local_user_in_dir`"""
users = [
user
for user in users
if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined]
and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined]
]

rows = self.db_pool.simple_select_many_txn(
txn,
table="users",
column="name",
iterable=users,
keyvalues={
"deactivated": 0,
},
retcols=("name", "user_type"),
)

return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]

async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""

Expand Down