Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix overflows in /messages backfill calculation (#13936)
Browse files Browse the repository at this point in the history
* Reproduce bug
* Compute `least_function` first
* Substitute `least_function` with an f-string
* Bugfix: avoid overflow

Co-authored-by: Eric Eastwood <[email protected]>
  • Loading branch information
David Robertson and MadLittleMods authored Sep 30, 2022
1 parent 1cc2ca8 commit e8f30a7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 41 deletions.
1 change: 1 addition & 0 deletions changelog.d/13936.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Exponentially backoff from backfilling the same event over and over.
82 changes: 53 additions & 29 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,30 @@

logger = logging.getLogger(__name__)

BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
datetime.timedelta(days=7).total_seconds()
)
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
datetime.timedelta(hours=1).total_seconds()
# Parameters controlling exponential backoff between backfill failures.
# After the first failure to backfill, we wait 2 hours before trying again. If the
# second attempt fails, we wait 4 hours before trying again. If the third attempt fails,
# we wait 8 hours before trying again, ... and so on.
#
# Each successive backoff period is twice as long as the last. However we cap this
# period at a maximum of 2^8 = 256 hours: a little over 10 days. (This is the smallest
# power of 2 which yields a maximum backoff period of at least 7 days---which was the
# original maximum backoff period.) Even when we hit this cap, we will continue to
# make backfill attempts once every 10 days.
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS = 8
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS = int(
datetime.timedelta(hours=1).total_seconds() * 1000
)

# We need a cap on the power of 2 or else the backoff period
# 2^N * (milliseconds per hour)
# will overflow when calcuated within the database. We ensure overflow does not occur
# by checking that the largest backoff period fits in a 32-bit signed integer.
_LONGEST_BACKOFF_PERIOD_MILLISECONDS = (
2**BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS
) * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
assert 0 < _LONGEST_BACKOFF_PERIOD_MILLISECONDS <= ((2**31) - 1)


# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
Expand Down Expand Up @@ -767,7 +784,15 @@ def get_backfill_points_in_room_txn(
# persisted in our database yet (meaning we don't know their depth
# specifically). So we need to look for the approximate depth from
# the events connected to the current backwards extremeties.
sql = """

if isinstance(self.database_engine, PostgresEngine):
least_function = "LEAST"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "MIN"
else:
raise RuntimeError("Unknown database engine")

sql = f"""
SELECT backward_extrem.event_id, event.depth FROM events AS event
/**
* Get the edge connections from the event_edges table
Expand Down Expand Up @@ -825,7 +850,10 @@ def get_backfill_points_in_room_txn(
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
(1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
* ? /* step */
)
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
Expand All @@ -837,22 +865,15 @@ def get_backfill_points_in_room_txn(
LIMIT ?
"""

if isinstance(self.database_engine, PostgresEngine):
least_function = "least"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "min"
else:
raise RuntimeError("Unknown database engine")

txn.execute(
sql % (least_function,),
sql,
(
room_id,
False,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
limit,
),
)
Expand Down Expand Up @@ -902,7 +923,14 @@ async def get_insertion_event_backward_extremities_in_room(
def get_insertion_event_backward_extremities_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
sql = """
if isinstance(self.database_engine, PostgresEngine):
least_function = "LEAST"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "MIN"
else:
raise RuntimeError("Unknown database engine")

sql = f"""
SELECT
insertion_event_extremity.event_id, event.depth
/* We only want insertion events that are also marked as backwards extremities */
Expand Down Expand Up @@ -942,7 +970,10 @@ def get_insertion_event_backward_extremities_in_room_txn(
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
(1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
* ? /* step */
)
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
Expand All @@ -954,21 +985,14 @@ def get_insertion_event_backward_extremities_in_room_txn(
LIMIT ?
"""

if isinstance(self.database_engine, PostgresEngine):
least_function = "least"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "min"
else:
raise RuntimeError("Unknown database engine")

txn.execute(
sql % (least_function,),
sql,
(
room_id,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
limit,
),
)
Expand Down
61 changes: 49 additions & 12 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,7 @@ def test_get_backfill_points_in_room(self):
self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(
backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
)
self.assertEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"])

# Try at "A"
backfill_points = self.get_success(
Expand Down Expand Up @@ -814,7 +812,7 @@ def test_get_backfill_points_in_room_excludes_events_we_have_attempted(
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
# Only the backfill points that we didn't record earlier exist here.
self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"])
self.assertEqual(backfill_event_ids, ["b6", "2", "b1"])

def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
self,
Expand Down Expand Up @@ -860,7 +858,7 @@ def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duratio
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(backfill_event_ids, ["b3", "b2"])
self.assertEqual(backfill_event_ids, ["b3", "b2"])

# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
# see if we can now backfill it
Expand All @@ -871,7 +869,48 @@ def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duratio
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"])
self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])

def test_get_backfill_points_in_room_works_after_many_failed_pull_attempts_that_could_naively_overflow(
self,
) -> None:
"""
A test that reproduces #13929 (Postgres only).
Test to make sure we can still get backfill points after many failed pull
attempts that cause us to backoff to the limit. Even if the backoff formula
would tell us to wait for more seconds than can be expressed in a 32 bit
signed int.
"""
setup_info = self._setup_room_for_backfill_tests()
room_id = setup_info.room_id
depth_map = setup_info.depth_map

# Pretend that we have tried and failed 10 times to backfill event b1.
for _ in range(10):
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
)

# If the backoff periods grow without limit:
# After the first failed attempt, we would have backed off for 1 << 1 = 2 hours.
# After the second failed attempt we would have backed off for 1 << 2 = 4 hours,
# so after the 10th failed attempt we should backoff for 1 << 10 == 1024 hours.
# Wait 1100 hours just so we have a nice round number.
self.reactor.advance(datetime.timedelta(hours=1100).total_seconds())

# 1024 hours in milliseconds is 1024 * 3600000, which exceeds the largest 32 bit
# signed integer. The bug we're reproducing is that this overflow causes an
# error in postgres preventing us from fetching a set of backwards extremities
# to retry fetching.
backfill_points = self.get_success(
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
)

# We should aim to fetch all backoff points: b1's latest backoff period has
# expired, and we haven't tried the rest.
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])

def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
"""
Expand Down Expand Up @@ -965,9 +1004,7 @@ def test_get_insertion_event_backward_extremities_in_room(self):
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(
backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
)
self.assertEqual(backfill_event_ids, ["insertion_eventB", "insertion_eventA"])

# Try at "insertion_eventA"
backfill_points = self.get_success(
Expand Down Expand Up @@ -1011,7 +1048,7 @@ def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_hav
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
# Only the backfill points that we didn't record earlier exist here.
self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
self.assertEqual(backfill_event_ids, ["insertion_eventB"])

def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
self,
Expand Down Expand Up @@ -1069,7 +1106,7 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(backfill_event_ids, [])
self.assertEqual(backfill_event_ids, [])

# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
# see if we can now backfill it
Expand All @@ -1083,7 +1120,7 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
self.assertEqual(backfill_event_ids, ["insertion_eventA"])


@attr.s
Expand Down

0 comments on commit e8f30a7

Please sign in to comment.