Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18591.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `Measure` block metrics to be homeserver-scoped.
3 changes: 2 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.is_mine_server_name = hs.is_mine_server_name
Expand Down Expand Up @@ -156,7 +157,7 @@ def _clear_queue(self) -> None:

def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
with Measure(self.clock, self.metrics_manager, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
Expand Down
5 changes: 4 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()

self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.is_mine_id = hs.is_mine_id
self.is_mine_server_name = hs.is_mine_server_name

Expand Down Expand Up @@ -657,7 +658,9 @@ async def handle_room_events(events: List[EventBase]) -> None:
logger.debug(
"Handling %i events in room %s", len(events), events[0].room_id
)
with Measure(self.clock, "handle_room_events"):
with Measure(
self.clock, self.metrics_manager, "handle_room_events"
):
for event in events:
await handle_event(event)

Expand Down
3 changes: 3 additions & 0 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class TransactionManager:
def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func
self._store = hs.get_datastores().main
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, hs: "HomeServer"):
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.notify_appservices = hs.config.worker.should_notify_appservices
self.event_sources = hs.get_event_sources()
self._msc2409_to_device_messages_enabled = (
Expand Down Expand Up @@ -120,7 +121,7 @@ def notify_interested_services(self, max_token: RoomStreamToken) -> None:

@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
with Measure(self.clock, self.metrics_manager, "notify_interested_services"):
self.is_processing = True
try:
upper_bound = -1
Expand Down Expand Up @@ -329,7 +330,9 @@ async def _notify_interested_services_ephemeral(
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
with Measure(
self.clock, self.metrics_manager, "notify_interested_services_ephemeral"
):
for service in services:
if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
self._clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()

Expand Down Expand Up @@ -159,7 +160,7 @@ async def _unsafe_process_new_event(self) -> None:

# Loop round handling deltas until we're up to date
while True:
with Measure(self._clock, "delayed_events_delta"):
with Measure(self._clock, self.metrics_manager, "delayed_events_delta"):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
Expand Down
11 changes: 10 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func

self.federation_sender = hs.get_federation_sender()
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
Expand Down Expand Up @@ -1214,10 +1219,14 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
self.store = hs.get_datastores().main
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
self._notifier = hs.get_notifier()

self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func

self._remote_edu_linearizer = Linearizer(name="remote_device_list")
self._resync_linearizer = Linearizer(name="remote_device_resync")

Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,13 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.profile_handler = hs.get_profile_handler()
self.event_builder_factory = hs.get_event_builder_factory()
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func
self.profile_handler = hs.get_profile_handler()
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.config = hs.config
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
self.metrics_manager = hs.get_metrics_manager()

federation_registry = hs.get_federation_registry()

Expand Down Expand Up @@ -941,7 +942,7 @@ async def _update_states(

now = self.clock.time_msec()

with Measure(self.clock, "presence_update_states"):
with Measure(self.clock, self.metrics_manager, "presence_update_states"):
# NOTE: We purposefully don't await between now and when we've
# calculated what we want to do with the new states, to avoid races.

Expand Down Expand Up @@ -1497,7 +1498,7 @@ async def _process_presence() -> None:
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
with Measure(self.clock, self.metrics_manager, "presence_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
Expand Down Expand Up @@ -1762,6 +1763,7 @@ def __init__(self, hs: "HomeServer"):
self.get_presence_handler = hs.get_presence_handler
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.store = hs.get_datastores().main

async def get_new_events(
Expand Down Expand Up @@ -1792,7 +1794,7 @@ async def get_new_events(
user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache

with Measure(self.clock, "presence.get_new_events"):
with Measure(self.clock, self.metrics_manager, "presence.get_new_events"):
if from_key is not None:
from_key = int(from_key)

Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def __init__(self, hs: "HomeServer"):
self._push_rules_handler = hs.get_push_rules_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.state = hs.get_state_handler()
self.auth_blocking = hs.get_auth_blocking()
self._storage_controllers = hs.get_storage_controllers()
Expand Down Expand Up @@ -710,7 +711,7 @@ async def ephemeral_by_room(

sync_config = sync_result_builder.sync_config

with Measure(self.clock, "ephemeral_by_room"):
with Measure(self.clock, self.metrics_manager, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else 0

room_ids = sync_result_builder.joined_room_ids
Expand Down Expand Up @@ -783,7 +784,7 @@ async def _load_filtered_recents(
and current token to send down to clients.
newly_joined_room
"""
with Measure(self.clock, "load_filtered_recents"):
with Measure(self.clock, self.metrics_manager, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
sync_config.filter_collection.blocks_all_room_timeline()
Expand Down Expand Up @@ -1174,7 +1175,7 @@ async def compute_state_delta(
# updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.

with Measure(self.clock, "compute_state_delta"):
with Measure(self.clock, self.metrics_manager, "compute_state_delta"):
# The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on.
members_to_fetch: Optional[Set[str]] = None
Expand Down Expand Up @@ -1791,7 +1792,7 @@ async def unread_notifs_for_room_id(
# the DB.
return RoomNotifCounts.empty()

with Measure(self.clock, "unread_notifs_for_room_id"):
with Measure(self.clock, self.metrics_manager, "unread_notifs_for_room_id"):
return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
# We can't call get_typing_handler here because there's a cycle:
#
# Typing -> Notifier -> TypingNotificationEventSource -> Typing
Expand Down Expand Up @@ -535,7 +536,7 @@ async def get_new_events_as(
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
with Measure(self.clock, self.metrics_manager, "typing.get_new_events_as"):
handler = self.get_typing_handler()

events = []
Expand Down Expand Up @@ -571,7 +572,7 @@ async def get_new_events(
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
"""

with Measure(self.clock, "typing.get_new_events"):
with Measure(self.clock, self.metrics_manager, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.worker.should_update_user_directory
Expand Down Expand Up @@ -237,7 +238,7 @@ async def _unsafe_process(self) -> None:

# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
with Measure(self.clock, self.metrics_manager, "user_dir_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering:
return
Expand Down
3 changes: 3 additions & 0 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.homeserver_metrics_manager import HomeserverMetricsManager
from synapse.types import ISynapseReactor
from synapse.util import Clock

Expand Down Expand Up @@ -93,6 +94,7 @@ class MatrixFederationAgent:
def __init__(
self,
reactor: ISynapseReactor,
metrics_manager: HomeserverMetricsManager,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_allowlist: Optional[IPSet],
Expand Down Expand Up @@ -128,6 +130,7 @@ def __init__(
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
reactor,
metrics_manager,
agent=BlocklistingAgentWrapper(
ProxyAgent(
reactor,
Expand Down
5 changes: 4 additions & 1 deletion synapse/http/federation/well_known_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics.homeserver_metrics_manager import HomeserverMetricsManager
from synapse.util import Clock, json_decoder
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -92,13 +93,15 @@ class WellKnownResolver:
def __init__(
self,
reactor: IReactorTime,
metrics_manager: HomeserverMetricsManager,
agent: IAgent,
user_agent: bytes,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
):
self._reactor = reactor
self._clock = Clock(reactor)
self.metrics_manager = metrics_manager

if well_known_cache is None:
well_known_cache = _well_known_cache
Expand Down Expand Up @@ -134,7 +137,7 @@ async def get_well_known(self, server_name: bytes) -> WellKnownLookupResult:
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
with Measure(self._clock, self.metrics_manager, "get_well_known"):
result: Optional[bytes]
cache_period: float

Expand Down
7 changes: 6 additions & 1 deletion synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ def __init__(
self.server_name = hs.hostname

self.reactor = hs.get_reactor()
self.metrics_manager = hs.get_metrics_manager()

user_agent = hs.version_string
if hs.config.server.user_agent_suffix:
Expand All @@ -418,6 +419,7 @@ def __init__(
# Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent(
self.reactor,
self.metrics_manager,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
Expand Down Expand Up @@ -451,6 +453,7 @@ def __init__(
)

self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
Expand Down Expand Up @@ -697,7 +700,9 @@ async def _send_request(
outgoing_requests_counter.labels(request.method).inc()

try:
with Measure(self.clock, "outbound_request"):
with Measure(
self.clock, self.metrics_manager, "outbound_request"
):
# we don't want all the fancy cookie and redirect handling
# that treq.request gives: just use the raw Agent.

Expand Down
Loading
Loading