Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace or_ignore in simple_insert with simple_upsert #10442

Merged
merged 6 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10442.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.
49 changes: 19 additions & 30 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,31 +832,16 @@ async def simple_insert(
self,
table: str,
values: Dict[str, Any],
or_ignore: bool = False,
desc: str = "simple_insert",
) -> bool:
):
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""Executes an INSERT query on the named table.

Args:
table: string giving the table name
values: dict of new column names and values for them
or_ignore: bool stating whether an exception should be raised
when a conflicting row already exists. If True, False will be
returned by the function instead
desc: description of the transaction, for logging and metrics

Returns:
Whether the row was inserted or not. Only useful when `or_ignore` is True
"""
try:
await self.runInteraction(desc, self.simple_insert_txn, table, values)
except self.engine.module.IntegrityError:
# We have to do or_ignore flag at this layer, since we can't reuse
# a cursor after we receive an error from the db.
if not or_ignore:
raise
return False
return True
await self.runInteraction(desc, self.simple_insert_txn, table, values)

@staticmethod
def simple_insert_txn(
Expand Down Expand Up @@ -930,7 +915,7 @@ async def simple_upsert(
insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
lock: bool = True,
) -> Optional[bool]:
) -> bool:
"""

`lock` should generally be set to True (the default), but can be set
Expand All @@ -951,8 +936,8 @@ async def simple_upsert(
desc: description of the transaction, for logging and metrics
lock: True to lock the table when doing the upsert.
Returns:
Native upserts always return None. Emulated upserts return True if a
new entry was created, False if an existing one was updated.
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

Expand Down Expand Up @@ -995,7 +980,7 @@ def simple_upsert_txn(
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
lock: bool = True,
) -> Optional[bool]:
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
Expand All @@ -1008,16 +993,15 @@ def simple_upsert_txn(
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
Returns:
Native upserts always return None. Emulated upserts return True if a
new entry was created, False if an existing one was updated.
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
self.simple_upsert_txn_native_upsert(
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
return None
else:
return self.simple_upsert_txn_emulated(
txn,
Expand Down Expand Up @@ -1045,8 +1029,8 @@ def simple_upsert_txn_emulated(
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
Returns:
Returns True if a new entry was created, False if an existing
one was updated.
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

Expand Down Expand Up @@ -1086,8 +1070,7 @@ def _getwhere(key):

txn.execute(sql, sqlargs)
if txn.rowcount > 0:
# successfully updated at least one row.
return False
return True

# We didn't find any existing rows, so insert a new one
allvalues: Dict[str, Any] = {}
Expand All @@ -1111,7 +1094,7 @@ def simple_upsert_txn_native_upsert(
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
) -> None:
) -> bool:
"""
Use the native UPSERT functionality in recent PostgreSQL versions.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -1120,6 +1103,10 @@ def simple_upsert_txn_native_upsert(
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting

Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
allvalues: Dict[str, Any] = {}
allvalues.update(keyvalues)
Expand All @@ -1140,6 +1127,8 @@ def simple_upsert_txn_native_upsert(
)
txn.execute(sql, list(allvalues.values()))

return bool(txn.rowcount)

async def simple_upsert_many(
self,
table: str,
Expand Down
9 changes: 6 additions & 3 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,16 +1078,18 @@ async def store_device(
return False

try:
inserted = await self.db_pool.simple_insert(
inserted = await self.db_pool.simple_upsert(
"devices",
values={
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={},
insertion_values={
"display_name": initial_device_display_name,
"hidden": False,
},
desc="store_device",
or_ignore=True,
)
if not inserted:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block will now never run for postgresql, this seems like a change in behavior, but I'm not sure if it matters or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should? The idea is that simple_upsert will return False if we didn't insert or update a new row, and since values={} we'll not update rows and so we'll hit this condition if we didn't insert a new entry?

Copy link
Member

@clokep clokep Jul 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking inserted would also be true, but yeah I see if values is {} then this isn't accurate.

# if the device already exists, check if it's a real device, or
Expand All @@ -1099,6 +1101,7 @@ async def store_device(
)
if hidden:
raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)

self.device_id_exists_cache.set(key, True)
return inserted
except StoreError:
Expand Down
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = self.db_pool.simple_upsert_txn(
self.db_pool.simple_upsert_txn(
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
txn,
table="monthly_active_users",
keyvalues={"user_id": user_id},
Expand All @@ -322,8 +322,6 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
txn, self.user_last_seen_monthly_active, (user_id,)
)

return is_insert

async def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally
add the user to the monthly active tables
Expand Down
7 changes: 4 additions & 3 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,17 @@ async def set_received_txn_response(
response_dict: The response, to be encoded into JSON.
"""

await self.db_pool.simple_insert(
await self.db_pool.simple_upsert(
table="received_transactions",
values={
keyvalues={
"transaction_id": transaction_id,
"origin": origin,
},
values={
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"response_code": code,
"response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(),
},
or_ignore=True,
desc="set_received_txn_response",
)

Expand Down
66 changes: 11 additions & 55 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async def update_profile_in_user_dir(
avatar_url = None

def _update_profile_in_user_dir_txn(txn):
new_entry = self.db_pool.simple_upsert_txn(
self.db_pool.simple_upsert_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
Expand All @@ -388,67 +388,23 @@ def _update_profile_in_user_dir_txn(txn):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
if self.database_engine.can_native_upsert:
sql = """
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
else:
# TODO: Remove this code after we've bumped the minimum version
# of postgres to always support upserts, so we can get rid of
# `new_entry` usage
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
if new_entry is True:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
)
"""
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
elif new_entry is False:
sql = """
UPDATE user_directory_search
SET vector = setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
WHERE user_id = ?
"""
txn.execute(
sql,
(
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
user_id,
),
)
else:
raise RuntimeError(
"upsert returned None when 'can_native_upsert' is False"
)
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name) if display_name else user_id
self.db_pool.simple_upsert_txn(
Expand Down