Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19037.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move unique snowflake homeserver background tasks to `start_background_tasks` (the standard pattern for this kind of thing).
10 changes: 0 additions & 10 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import synapse.util.caches
from synapse.api.constants import MAX_PDU_SIZE
from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config import ConfigError
from synapse.config._base import format_config_error
from synapse.config.homeserver import HomeServerConfig
Expand Down Expand Up @@ -683,15 +682,6 @@ def log_shutdown() -> None:
if hs.config.worker.run_background_tasks:
hs.start_background_tasks()

# TODO: This should be moved to same pattern we use for other background tasks:
# Add to `REQUIRED_ON_BACKGROUND_TASK_STARTUP` and rely on
# `start_background_tasks` to start it.
await hs.get_common_usage_metrics_manager().setup()

# TODO: This feels like another pattern that should refactored as one of the
# `REQUIRED_ON_BACKGROUND_TASK_STARTUP`
start_phone_stats_home(hs)

if freeze:
# We now freeze all allocated objects in the hopes that (almost)
# everything currently allocated are things that will be used for the
Expand Down
2 changes: 1 addition & 1 deletion synapse/metrics/common_usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def get_metrics(self) -> CommonUsageMetrics:
"""
return await self._collect()

async def setup(self) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed async as nothing was using await here and fits the pattern better for start_background_tasks

def setup(self) -> None:
"""Keep the gauges for common usage metrics up to date."""
self._hs.run_as_background_process(
desc="common_usage_metrics_update_gauges",
Expand Down
3 changes: 3 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from synapse.api.filtering import Filtering
from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
from synapse.app._base import unregister_sighups
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.appservice.api import ApplicationServiceApi
from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.config.homeserver import HomeServerConfig
Expand Down Expand Up @@ -643,6 +644,8 @@ def start_background_tasks(self) -> None:
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
self.get_task_scheduler()
self.get_common_usage_metrics_manager().setup()
start_phone_stats_home(self)
Comment on lines +647 to +648
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to HomeServer.start_background_tasks() (base class for main and worker instances) as these were previously part of base.start() which also applies main and worker instances.


def get_reactor(self) -> ISynapseReactor:
"""
Expand Down
7 changes: 6 additions & 1 deletion tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ def request_factory(*args: Any, **kwargs: Any) -> SynapseRequest:
client_to_server_transport.loseConnection()

# there should have been exactly one request
self.assertEqual(len(requests), 1)
self.assertEqual(
len(requests),
1,
"Expected to handle exactly one HTTP replication request but saw %d - requests=%s"
% (len(requests), requests),
)

return requests[0]

Expand Down
49 changes: 33 additions & 16 deletions tests/replication/tcp/streams/test_account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,39 @@ def test_update_function_room_account_data_limit(self) -> None:

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
received_account_data_rows = [
row
for row in self.test_handler.received_rdata_rows
if row[0] == AccountDataStream.NAME
]
self.assertEqual([], received_account_data_rows)
Comment on lines +49 to +54
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to filter the rdata rows by relevant type. We already had this pattern for some tests. See

received_rows = [
upd
for upd in self.test_handler.received_rdata_rows
if upd[0] == ThreadSubscriptionsStream.NAME
]

Not all of these tests were failing but some tests were failing because we had some caches rows flying around because we moved start_phone_stats_home() to a place that also runs in the tests now. ex. ('caches', 2, CachesStream.CachesStreamRow(cache_func='user_last_seen_monthly_active', keys=None, invalidation_ts=0))

While we don't care about phone home stats in tests, it's at-least a good sign that our homeserver is more closely running to how it would in the real-life case.


# now reconnect to pull the updates
self.reconnect()
self.replicate()

# we should have received all the expected rows in the right order
received_rows = self.test_handler.received_rdata_rows
# We should have received all the expected rows in the right order
#
# Filter the updates to only include account data changes
received_account_data_rows = [
row
for row in self.test_handler.received_rdata_rows
if row[0] == AccountDataStream.NAME
]

for t in updates:
(stream_name, token, row) = received_rows.pop(0)
(stream_name, token, row) = received_account_data_rows.pop(0)
self.assertEqual(stream_name, AccountDataStream.NAME)
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
self.assertEqual(row.data_type, t)
self.assertEqual(row.room_id, "test_room")

(stream_name, token, row) = received_rows.pop(0)
(stream_name, token, row) = received_account_data_rows.pop(0)
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
self.assertEqual(row.data_type, "m.global")
self.assertIsNone(row.room_id)

self.assertEqual([], received_rows)
self.assertEqual([], received_account_data_rows)

def test_update_function_global_account_data_limit(self) -> None:
"""Test replication with many global account data updates"""
Expand All @@ -85,32 +96,38 @@ def test_update_function_global_account_data_limit(self) -> None:
store.add_account_data_to_room("test_user", "test_room", "m.per_room", {})
)

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
received_account_data_rows = [
row
for row in self.test_handler.received_rdata_rows
if row[0] == AccountDataStream.NAME
]
self.assertEqual([], received_account_data_rows)

# now reconnect to pull the updates
self.reconnect()
self.replicate()

# we should have received all the expected rows in the right order
received_rows = self.test_handler.received_rdata_rows
#
# Filter the updates to only include typing changes
received_account_data_rows = [
row
for row in self.test_handler.received_rdata_rows
if row[0] == AccountDataStream.NAME
]

for t in updates:
(stream_name, token, row) = received_rows.pop(0)
(stream_name, token, row) = received_account_data_rows.pop(0)
self.assertEqual(stream_name, AccountDataStream.NAME)
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
self.assertEqual(row.data_type, t)
self.assertIsNone(row.room_id)

(stream_name, token, row) = received_rows.pop(0)
(stream_name, token, row) = received_account_data_rows.pop(0)
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
self.assertEqual(row.data_type, "m.per_room")
self.assertEqual(row.room_id, "test_room")

self.assertEqual([], received_rows)
self.assertEqual([], received_account_data_rows)
Loading
Loading