Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove option to skip locking of tables during emulated upserts
Browse files Browse the repository at this point in the history
To perform an emulated upsert into a table safely, we must either:
 * lock the table,
 * be the only writer upserting into the table
 * or rely on another unique index being present.

When the 2nd or 3rd cases were applicable, we previously avoided locking
the table as an optimization. However, as seen in #14406, it is easy to
slip up when adding new schema deltas and corrupt the database.

Since #13760, Synapse has required SQLite >= 3.27.0, which has support
for native upserts. This means that we now only perform emulated upserts
while waiting for background updates to add unique indexes.

Since emulated upserts are far less frequent now, let's remove the
option to skip locking tables, so that we don't shoot ourselves in the
foot again.

Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
Sean Quah committed Nov 16, 2022
1 parent a84744f commit 52c054f
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 74 deletions.
55 changes: 17 additions & 38 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,6 @@ async def simple_upsert(
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
lock: bool = True,
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
Expand All @@ -1154,21 +1153,11 @@ async def simple_upsert(
requiring that a unique index exist on the column names used to detect a
conflict (i.e. `keyvalues.keys()`).
If there is no such index, we can "emulate" an upsert with a SELECT followed
by either an INSERT or an UPDATE. This is unsafe: we cannot make the same
atomicity guarantees that a native upsert can and are very vulnerable to races
and crashes. Therefore if we wish to upsert without an appropriate unique index,
we must either:
1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
2. VERY CAREFULLY ensure that we are the only thread and worker which will be
writing to this table, in which case we can proceed without a lock
(`lock=False`).
Generally speaking, you should use `lock=True`. If the table in question has a
unique index[*], this class will use a native upsert (which is atomic and so can
ignore the `lock` argument). Otherwise this class will use an emulated upsert,
in which case we want the safer option unless we been VERY CAREFUL.
If there is no such index yet[*], we can "emulate" an upsert with a SELECT
followed by either an INSERT or an UPDATE. This is unsafe: we cannot make the
same atomicity guarantees that a native upsert can and are very vulnerable to
races and crashes. Therefore if we wish to upsert without an appropriate unique
index, we must acquire a table-level lock before the emulated upsert.
[*]: Some tables have unique indices added to them in the background. Those
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
Expand All @@ -1189,7 +1178,6 @@ async def simple_upsert(
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
desc: description of the transaction, for logging and metrics
lock: True to lock the table when doing the upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
Expand All @@ -1209,7 +1197,6 @@ async def simple_upsert(
keyvalues,
values,
insertion_values,
lock=lock,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
Expand All @@ -1232,7 +1219,6 @@ def simple_upsert_txn(
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None,
lock: bool = True,
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
Expand All @@ -1245,8 +1231,6 @@ def simple_upsert_txn(
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
Expand All @@ -1270,7 +1254,6 @@ def simple_upsert_txn(
values,
insertion_values=insertion_values,
where_clause=where_clause,
lock=lock,
)

def simple_upsert_txn_emulated(
Expand All @@ -1291,14 +1274,15 @@ def simple_upsert_txn_emulated(
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert.
Must not be False unless the table has already been locked.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

# We need to lock the table :(, unless we're *really* careful
if lock:
# We need to lock the table :(
self.engine.lock_table(txn, table)

def _getwhere(key: str) -> str:
Expand Down Expand Up @@ -1406,7 +1390,6 @@ async def simple_upsert_many(
value_names: Collection[str],
value_values: Collection[Collection[Any]],
desc: str,
lock: bool = True,
) -> None:
"""
Upsert, many times.
Expand All @@ -1418,8 +1401,6 @@ async def simple_upsert_many(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""

# We can autocommit if it safe to upsert
Expand All @@ -1433,7 +1414,6 @@ async def simple_upsert_many(
key_values,
value_names,
value_values,
lock=lock,
db_autocommit=autocommit,
)

Expand All @@ -1445,7 +1425,6 @@ def simple_upsert_many_txn(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times.
Expand All @@ -1457,16 +1436,19 @@ def simple_upsert_many_txn(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
else:
return self.simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values, lock=lock
txn,
table,
key_names,
key_values,
value_names,
value_values,
)

def simple_upsert_many_txn_emulated(
Expand All @@ -1477,7 +1459,6 @@ def simple_upsert_many_txn_emulated(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Expand All @@ -1489,18 +1470,16 @@ def simple_upsert_many_txn_emulated(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]

if lock:
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, "user_ips")
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, "user_ips")

for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
Expand Down
8 changes: 0 additions & 8 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,6 @@ async def add_account_data_to_room(
content_json = json_encoder.encode(content)

async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
await self.db_pool.simple_upsert(
desc="add_room_account_data",
table="room_account_data",
Expand All @@ -471,7 +468,6 @@ async def add_account_data_to_room(
"account_data_type": account_data_type,
},
values={"stream_id": next_id, "content": content_json},
lock=False,
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
Expand Down Expand Up @@ -527,15 +523,11 @@ def _add_account_data_for_user(
) -> None:
content_json = json_encoder.encode(content)

# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
self.db_pool.simple_upsert_txn(
txn,
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
lock=False,
)

# Ignored users get denormalized into a separate table as an optimisation.
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,6 @@ async def set_appservice_stream_type_pos(
table="application_services_state",
keyvalues={"as_id": service.id},
values={f"{stream_type}_stream_id": pos},
# no need to lock when emulating upsert: as_id is a unique key
lock=False,
desc="set_appservice_stream_type_pos",
)

Expand Down
9 changes: 0 additions & 9 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1747,9 +1747,6 @@ def _update_remote_device_list_cache_entry_txn(
table="device_lists_remote_cache",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"content": json_encoder.encode(content)},
# we don't need to lock, because we assume we are the only thread
# updating this user's devices.
lock=False,
)

txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
Expand All @@ -1763,9 +1760,6 @@ def _update_remote_device_list_cache_entry_txn(
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
# again, we can assume we are the only thread updating this user's
# extremity.
lock=False,
)

async def update_remote_device_list_cache(
Expand Down Expand Up @@ -1818,9 +1812,6 @@ def _update_remote_device_list_cache_txn(
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
# we don't need to lock, because we can assume we are the only thread
# updating this user's extremity.
lock=False,
)

async def add_device_change_to_streams(
Expand Down
1 change: 0 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,6 @@ async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
},
insertion_values={},
desc="insert_insertion_extremity",
lock=False,
)

async def insert_received_event_to_staging(
Expand Down
6 changes: 0 additions & 6 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,11 @@ async def get_throttle_params_by_room(
async def set_throttle_params(
self, pusher_id: str, room_id: str, params: ThrottleParams
) -> None:
# no need to lock because `pusher_throttle` has a primary key on
# (pusher, room_id) so simple_upsert will retry
await self.db_pool.simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
desc="set_throttle_params",
lock=False,
)

async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
Expand Down Expand Up @@ -595,8 +592,6 @@ async def add_pusher(
device_id: Optional[str] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
Expand All @@ -615,7 +610,6 @@ async def add_pusher(
"device_id": device_id,
},
desc="add_pusher",
lock=False,
)

user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
Expand Down
6 changes: 0 additions & 6 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1843,9 +1843,6 @@ async def upsert_room_on_join(
"creator": room_creator,
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
)

async def store_partial_state_room(
Expand Down Expand Up @@ -1966,9 +1963,6 @@ async def maybe_store_room_on_outlier_membership(
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
)

async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,4 @@ async def store_state_group_id_for_event_id(
table="event_to_state_groups",
keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id},
# Unique constraint on event_id so we don't have to lock
lock=False,
)
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
lock=False, # We're only inserter
)

if isinstance(self.database_engine, PostgresEngine):
Expand Down Expand Up @@ -511,7 +510,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
lock=False, # We're only inserter
)
else:
# This should be unreachable.
Expand Down

0 comments on commit 52c054f

Please sign in to comment.