Skip to content

Commit

Permalink
Handle federation inbound instances being killed more gracefully (mat…
Browse files Browse the repository at this point in the history
…rix-org#11262)

* Make lock better handle process being killed

If the process gets killed and restarted (so that it didn't have a
chance to drop its locks gracefully) then there may still be locks in
the DB that are for the same instance that haven't yet timed out but are
safe to delete.

We handle this case by a) checking if the current instance already has
taken out the lock, and b) if not then ignoring locks that are for the
same instance.

* Periodically check for old staged events

This is to protect against other instances dying and their locks timing
out.
  • Loading branch information
erikjohnston authored Nov 8, 2021
1 parent 9799c56 commit 98c8fc6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
1 change: 1 addition & 0 deletions changelog.d/11262.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.
5 changes: 5 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ async def on_incoming_transaction(
self._started_handling_of_staged_events = True
self._handle_old_staged_events()

# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)

# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
Expand Down
31 changes: 21 additions & 10 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
from types import TracebackType
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
from weakref import WeakValueDictionary

from twisted.internet.interfaces import IReactorCore

Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"

# A map from `(lock_name, lock_key)` to the token of any locks that we
# think we currently hold.
self._live_tokens: Dict[Tuple[str, str], str] = {}
self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()

# When we shut down we want to remove the locks. Technically this can
# lead to a race, as we may drop the lock while we are still processing.
Expand All @@ -80,10 +81,10 @@ async def _on_shutdown(self) -> None:

# We need to take a copy of the tokens dict as dropping the locks will
# cause the dictionary to change.
tokens = dict(self._live_tokens)
locks = dict(self._live_tokens)

for (lock_name, lock_key), token in tokens.items():
await self._drop_lock(lock_name, lock_key, token)
for lock in locks.values():
await lock.release()

logger.info("Dropped locks due to shutdown")

Expand All @@ -93,14 +94,21 @@ async def try_acquire_lock(self, lock_name: str, lock_key: str) -> Optional["Loc
used (otherwise the lock will leak).
"""

# Check if this process has taken out a lock and if it's still valid.
lock = self._live_tokens.get((lock_name, lock_key))
if lock and await lock.is_still_valid():
return None

now = self._clock.time_msec()
token = random_string(6)

if self.db_pool.engine.can_native_upsert:

def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already or b) the existing row has timed out.
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
Expand All @@ -112,6 +120,7 @@ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
Expand Down Expand Up @@ -148,11 +157,11 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
WHERE
lock_name = ?
AND lock_key = ?
AND last_renewed_ts < ?
AND (last_renewed_ts < ? OR instance_name = ?)
"""
txn.execute(
sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS),
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
)

inserted = self.db_pool.simple_upsert_txn_emulated(
Expand All @@ -179,9 +188,7 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
if not did_lock:
return None

self._live_tokens[(lock_name, lock_key)] = token

return Lock(
lock = Lock(
self._reactor,
self._clock,
self,
Expand All @@ -190,6 +197,10 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
token=token,
)

self._live_tokens[(lock_name, lock_key)] = lock

return lock

async def _is_lock_still_valid(
self, lock_name: str, lock_key: str, token: str
) -> bool:
Expand Down

0 comments on commit 98c8fc6

Please sign in to comment.