Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace or_ignore in simple_insert with simple_upsert #10442

Merged
merged 6 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10442.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.
51 changes: 20 additions & 31 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,31 +832,16 @@ async def simple_insert(
self,
table: str,
values: Dict[str, Any],
or_ignore: bool = False,
desc: str = "simple_insert",
) -> bool:
) -> None:
"""Executes an INSERT query on the named table.

Args:
table: string giving the table name
values: dict of new column names and values for them
or_ignore: bool stating whether an exception should be raised
when a conflicting row already exists. If True, False will be
returned by the function instead
desc: description of the transaction, for logging and metrics

Returns:
Whether the row was inserted or not. Only useful when `or_ignore` is True
"""
try:
await self.runInteraction(desc, self.simple_insert_txn, table, values)
except self.engine.module.IntegrityError:
# We have to do or_ignore flag at this layer, since we can't reuse
# a cursor after we receive an error from the db.
if not or_ignore:
raise
return False
return True
await self.runInteraction(desc, self.simple_insert_txn, table, values)

@staticmethod
def simple_insert_txn(
Expand Down Expand Up @@ -930,7 +915,7 @@ async def simple_upsert(
insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
lock: bool = True,
) -> Optional[bool]:
) -> bool:
"""

`lock` should generally be set to True (the default), but can be set
Expand All @@ -951,8 +936,8 @@ async def simple_upsert(
desc: description of the transaction, for logging and metrics
lock: True to lock the table when doing the upsert.
Returns:
Native upserts always return None. Emulated upserts return True if a
new entry was created, False if an existing one was updated.
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

Expand Down Expand Up @@ -995,7 +980,7 @@ def simple_upsert_txn(
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
lock: bool = True,
) -> Optional[bool]:
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
Expand All @@ -1008,16 +993,15 @@ def simple_upsert_txn(
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
Returns:
Native upserts always return None. Emulated upserts return True if a
new entry was created, False if an existing one was updated.
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
self.simple_upsert_txn_native_upsert(
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
return None
else:
return self.simple_upsert_txn_emulated(
txn,
Expand Down Expand Up @@ -1045,8 +1029,8 @@ def simple_upsert_txn_emulated(
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
Returns:
Returns True if a new entry was created, False if an existing
one was updated.
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

Expand Down Expand Up @@ -1086,8 +1070,7 @@ def _getwhere(key):

txn.execute(sql, sqlargs)
if txn.rowcount > 0:
# successfully updated at least one row.
return False
return True

# We didn't find any existing rows, so insert a new one
allvalues: Dict[str, Any] = {}
Expand All @@ -1111,15 +1094,19 @@ def simple_upsert_txn_native_upsert(
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
) -> None:
) -> bool:
"""
Use the native UPSERT functionality in recent PostgreSQL versions.
Use the native UPSERT functionality in PostgreSQL.

Args:
table: The table to upsert into
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting

Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
allvalues: Dict[str, Any] = {}
allvalues.update(keyvalues)
Expand All @@ -1140,6 +1127,8 @@ def simple_upsert_txn_native_upsert(
)
txn.execute(sql, list(allvalues.values()))

return bool(txn.rowcount)

async def simple_upsert_many(
self,
table: str,
Expand Down
9 changes: 6 additions & 3 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,16 +1078,18 @@ async def store_device(
return False

try:
inserted = await self.db_pool.simple_insert(
inserted = await self.db_pool.simple_upsert(
"devices",
values={
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={},
insertion_values={
"display_name": initial_device_display_name,
"hidden": False,
},
desc="store_device",
or_ignore=True,
)
if not inserted:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block will now never run for postgresql, this seems like a change in behavior, but I'm not sure if it matters or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should? The idea is that simple_upsert will return False if we didn't insert or update a new row, and since values={} we'll not update rows and so we'll hit this condition if we didn't insert a new entry?

Copy link
Member

@clokep clokep Jul 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking inserted would also be true, but yeah I see if values is {} then this isn't accurate.

# if the device already exists, check if it's a real device, or
Expand All @@ -1099,6 +1101,7 @@ async def store_device(
)
if hidden:
raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)

self.device_id_exists_cache.set(key, True)
return inserted
except StoreError:
Expand Down
8 changes: 1 addition & 7 deletions synapse/storage/databases/main/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,13 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
Args:
txn (cursor):
user_id (str): user to add/update

Returns:
bool: True if a new entry was created, False if an
existing one was updated.
"""

# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = self.db_pool.simple_upsert_txn(
self.db_pool.simple_upsert_txn(
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
txn,
table="monthly_active_users",
keyvalues={"user_id": user_id},
Expand All @@ -322,8 +318,6 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
txn, self.user_last_seen_monthly_active, (user_id,)
)

return is_insert

async def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally
add the user to the monthly active tables
Expand Down
8 changes: 5 additions & 3 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,18 @@ async def set_received_txn_response(
response_dict: The response, to be encoded into JSON.
"""

await self.db_pool.simple_insert(
await self.db_pool.simple_upsert(
table="received_transactions",
values={
keyvalues={
"transaction_id": transaction_id,
"origin": origin,
},
values={},
insertion_values={
"response_code": code,
"response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(),
},
or_ignore=True,
desc="set_received_txn_response",
)

Expand Down
66 changes: 11 additions & 55 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async def update_profile_in_user_dir(
avatar_url = None

def _update_profile_in_user_dir_txn(txn):
new_entry = self.db_pool.simple_upsert_txn(
self.db_pool.simple_upsert_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
Expand All @@ -388,67 +388,23 @@ def _update_profile_in_user_dir_txn(txn):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
if self.database_engine.can_native_upsert:
sql = """
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
else:
# TODO: Remove this code after we've bumped the minimum version
# of postgres to always support upserts, so we can get rid of
# `new_entry` usage
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
if new_entry is True:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
)
"""
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
elif new_entry is False:
sql = """
UPDATE user_directory_search
SET vector = setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
WHERE user_id = ?
"""
txn.execute(
sql,
(
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
user_id,
),
)
else:
raise RuntimeError(
"upsert returned None when 'can_native_upsert' is False"
)
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name) if display_name else user_id
self.db_pool.simple_upsert_txn(
Expand Down