Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Speed up user directory rebuild for users some more... (#15665)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored May 24, 2023
1 parent 1f55c04 commit c7e9c1d
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 76 deletions.
1 change: 1 addition & 0 deletions changelog.d/15665.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up rebuilding of the user directory for local users.
190 changes: 114 additions & 76 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import unicodedata
from typing import (
TYPE_CHECKING,
Collection,
Iterable,
List,
Mapping,
Expand Down Expand Up @@ -45,7 +46,7 @@
if TYPE_CHECKING:
from synapse.server import HomeServer

from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
Expand Down Expand Up @@ -356,13 +357,30 @@ async def _populate_user_directory_process_users(
Add all local users to the user directory.
"""

def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
sql = "SELECT user_id FROM %s LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())
def _populate_user_directory_process_users_txn(
txn: LoggingTransaction,
) -> Optional[int]:
if self.database_engine.supports_returning:
# Note: we use an ORDER BY in the SELECT to force usage of an
# index. Otherwise, postgres does a sequential scan that is
# surprisingly slow (I think due to the fact it will read/skip
# over lots of already deleted rows).
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(List[Tuple[str]], txn.fetchall())
else:
sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())

if not user_result:
return None
Expand All @@ -378,85 +396,81 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
assert count_result is not None
progress["remaining"] = count_result[0]

return users_to_work_on

users_to_work_on = await self.db_pool.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
if not users_to_work_on:
return None

# No more users -- complete the transaction.
if not users_to_work_on:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
logger.debug(
"Processing the next %d users of %d remaining",
len(users_to_work_on),
progress["remaining"],
)
return 1

logger.debug(
"Processing the next %d users of %d remaining"
% (len(users_to_work_on), progress["remaining"])
)

# First filter down to users we want to insert into the user directory.
users_to_insert = [
user_id
for user_id in users_to_work_on
if await self.should_include_local_user_in_dir(user_id)
]
# First filter down to users we want to insert into the user directory.
users_to_insert = self._filter_local_users_for_dir_txn(
txn, users_to_work_on
)

# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = await self.db_pool.simple_select_many_batch(
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
desc="populate_user_directory_process_users_get_profiles",
)
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = self.db_pool.simple_select_many_txn(
txn,
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
)
for row in profile_rows
}
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
)
for row in profile_rows
}

profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]
profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]

# Actually insert the users with their profiles into the directory.
self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)

# We've finished processing the users. Delete it from the table, if
# we haven't already.
if not self.database_engine.supports_returning:
self.db_pool.simple_delete_many_txn(
txn,
table=TEMP_TABLE + "_users",
column="user_id",
values=users_to_work_on,
keyvalues={},
)

# Actually insert the users with their profiles into the directory.
await self.db_pool.runInteraction(
"populate_user_directory_process_users_insertion",
self._update_profiles_in_user_dir_txn,
profiles_to_insert,
)
# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
self.db_pool.updates._background_update_progress_txn(
txn, "populate_user_directory_process_users", progress
)
return len(users_to_work_on)

# We've finished processing the users. Delete it from the table.
await self.db_pool.simple_delete_many(
table=TEMP_TABLE + "_users",
column="user_id",
iterable=users_to_work_on,
keyvalues={},
desc="populate_user_directory_process_users_delete",
processed_count = await self.db_pool.runInteraction(
"populate_user_directory_temp", _populate_user_directory_process_users_txn
)

# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)
# No more users -- complete the transaction.
if not processed_count:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
)
return 1

return len(users_to_work_on)
return processed_count

async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Expand Down Expand Up @@ -494,6 +508,30 @@ async def should_include_local_user_in_dir(self, user: str) -> bool:

return True

def _filter_local_users_for_dir_txn(
self, txn: LoggingTransaction, users: Collection[str]
) -> Collection[str]:
"""A batched version of `should_include_local_user_in_dir`"""
users = [
user
for user in users
if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined]
and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined]
]

rows = self.db_pool.simple_select_many_txn(
txn,
table="users",
column="name",
iterable=users,
keyvalues={
"deactivated": 0,
},
retcols=("name", "user_type"),
)

return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]

async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""

Expand Down

0 comments on commit c7e9c1d

Please sign in to comment.