diff --git a/changelog.d/18601.misc b/changelog.d/18601.misc new file mode 100644 index 00000000000..c8893c2806b --- /dev/null +++ b/changelog.d/18601.misc @@ -0,0 +1 @@ +Refactor `Measure` block metrics to be homeserver-scoped. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index b5c9fcff7cf..e309836a526 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -156,7 +156,9 @@ def _clear_queue(self) -> None: def _clear_queue_before_pos(self, position_to_delete: int) -> None: """Clear all the queues from before a given position""" - with Measure(self.clock, "send_queue._clear"): + with Measure( + self.clock, name="send_queue._clear", server_name=self.server_name + ): # Delete things out of presence maps keys = self.presence_destinations.keys() i = self.presence_destinations.bisect_left(position_to_delete) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 2eef7b707d7..8010cc62f3a 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -657,7 +657,11 @@ async def handle_room_events(events: List[EventBase]) -> None: logger.debug( "Handling %i events in room %s", len(events), events[0].room_id ) - with Measure(self.clock, "handle_room_events"): + with Measure( + self.clock, + name="handle_room_events", + server_name=self.server_name, + ): for event in events: await handle_event(event) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index d8a3eaa525a..21e2fed085d 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -58,7 +58,7 @@ class TransactionManager: """ def __init__(self, hs: "synapse.server.HomeServer"): - self._server_name = hs.hostname + self.server_name = hs.hostname # nb must be called this for @measure_func self.clock = hs.get_clock() # nb must be called this for @measure_func self._store = hs.get_datastores().main self._transaction_actions = TransactionActions(self._store) @@ -116,7 +116,7 @@ async def send_new_transaction( transaction = Transaction( origin_server_ts=int(self.clock.time_msec()), transaction_id=txn_id, - origin=self._server_name, + origin=self.server_name, destination=destination, pdus=[p.get_pdu_json() for p in pdus], edus=[edu.get_dict() for edu in edus], diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f3bbdb5a05b..5aefc73abab 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -73,6 +73,7 @@ class ApplicationServicesHandler: def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self.store = hs.get_datastores().main self.is_mine_id = hs.is_mine_id self.appservice_api = hs.get_application_service_api() @@ -120,7 +121,9 @@ def notify_interested_services(self, max_token: RoomStreamToken) -> None: @wrap_as_background_process("notify_interested_services") async def _notify_interested_services(self, max_token: RoomStreamToken) -> None: - with Measure(self.clock, "notify_interested_services"): + with Measure( + self.clock, name="notify_interested_services", server_name=self.server_name + ): self.is_processing = True try: upper_bound = -1 @@ -329,7 +332,11 @@ async def _notify_interested_services_ephemeral( users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s", stream_key) - with Measure(self.clock, "notify_interested_services_ephemeral"): + with Measure( + self.clock, + name="notify_interested_services_ephemeral", + server_name=self.server_name, + ): for service in services: if stream_key == StreamKeyType.TYPING: # Note that we don't persist the token (via set_appservice_stream_type_pos) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index 80cb1cec9b0..beb0e819c2b 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -54,6 +54,7 @@ class DelayedEventsHandler: def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self._store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._config = hs.config @@ -159,7 +160,9 @@ async def _unsafe_process_new_event(self) -> None: # Loop round handling deltas until we're up to date while True: - with Measure(self._clock, "delayed_events_delta"): + with Measure( + self._clock, name="delayed_events_delta", server_name=self.server_name + ): room_max_stream_ordering = self._store.get_room_max_stream_ordering() if self._event_pos == room_max_stream_ordering: return diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c6e44dae6a3..e825626558a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -526,6 +526,8 @@ class DeviceHandler(DeviceWorkerHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) + self.server_name = hs.hostname # nb must be called this for @measure_func + self.clock = hs.get_clock() # nb must be called this for @measure_func self.federation_sender = hs.get_federation_sender() self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() @@ -1215,7 +1217,8 @@ class DeviceListUpdater(DeviceListWorkerUpdater): def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): self.store = hs.get_datastores().main self.federation = hs.get_federation_client() - self.clock = hs.get_clock() + self.server_name = hs.hostname # nb must be called this for @measure_func + self.clock = hs.get_clock() # nb must be called this for @measure_func self.device_handler = device_handler self._notifier = hs.get_notifier() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5d6ee6996f7..cb2ccde01b3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -476,16 +476,16 @@ async def _expire_event(self, event_id: str) -> None: class EventCreationHandler: def __init__(self, hs: "HomeServer"): self.hs = hs + self.validator = EventValidator() + self.event_builder_factory = hs.get_event_builder_factory() + self.server_name = hs.hostname # nb must be called this for @measure_func + self.clock = hs.get_clock() # nb must be called this for @measure_func self.auth_blocking = hs.get_auth_blocking() self._event_auth_handler = hs.get_event_auth_handler() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.state = hs.get_state_handler() - self.clock = hs.get_clock() - self.validator = EventValidator() self.profile_handler = hs.get_profile_handler() - self.event_builder_factory = hs.get_event_builder_factory() - self.server_name = hs.hostname self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = ( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fb5d691d656..c652e333a6d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -747,6 +747,7 @@ async def bump_presence_active_time( class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) + self.server_name = hs.hostname self.wheel_timer: WheelTimer[str] = WheelTimer() self.notifier = hs.get_notifier() @@ -941,7 +942,9 @@ async def _update_states( now = self.clock.time_msec() - with Measure(self.clock, "presence_update_states"): + with Measure( + self.clock, name="presence_update_states", server_name=self.server_name + ): # NOTE: We purposefully don't await between now and when we've # calculated what we want to do with the new states, to avoid races. @@ -1497,7 +1500,9 @@ async def _process_presence() -> None: async def _unsafe_process(self) -> None: # Loop round handling deltas until we're up to date while True: - with Measure(self.clock, "presence_delta"): + with Measure( + self.clock, name="presence_delta", server_name=self.server_name + ): room_max_stream_ordering = self.store.get_room_max_stream_ordering() if self._event_pos == room_max_stream_ordering: return @@ -1759,6 +1764,7 @@ def __init__(self, hs: "HomeServer"): # Same with get_presence_router: # # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler + self.server_name = hs.hostname self.get_presence_handler = hs.get_presence_handler self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() @@ -1792,7 +1798,9 @@ async def get_new_events( user_id = user.to_string() stream_change_cache = self.store.presence_stream_cache - with Measure(self.clock, "presence.get_new_events"): + with Measure( + self.clock, name="presence.get_new_events", server_name=self.server_name + ): if from_key is not None: from_key = int(from_key) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a400e63fd56..7b99defac11 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -329,6 +329,7 @@ class E2eeSyncResult: class SyncHandler: def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self.hs_config = hs.config self.store = hs.get_datastores().main self.notifier = hs.get_notifier() @@ -710,7 +711,9 @@ async def ephemeral_by_room( sync_config = sync_result_builder.sync_config - with Measure(self.clock, "ephemeral_by_room"): + with Measure( + self.clock, name="ephemeral_by_room", server_name=self.server_name + ): typing_key = since_token.typing_key if since_token else 0 room_ids = sync_result_builder.joined_room_ids @@ -783,7 +786,9 @@ async def _load_filtered_recents( and current token to send down to clients. newly_joined_room """ - with Measure(self.clock, "load_filtered_recents"): + with Measure( + self.clock, name="load_filtered_recents", server_name=self.server_name + ): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = ( sync_config.filter_collection.blocks_all_room_timeline() @@ -1174,7 +1179,9 @@ async def compute_state_delta( # updates even if they occurred logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - with Measure(self.clock, "compute_state_delta"): + with Measure( + self.clock, name="compute_state_delta", server_name=self.server_name + ): # The memberships needed for events in the timeline. # Only calculated when `lazy_load_members` is on. members_to_fetch: Optional[Set[str]] = None @@ -1791,7 +1798,9 @@ async def unread_notifs_for_room_id( # the DB. return RoomNotifCounts.empty() - with Measure(self.clock, "unread_notifs_for_room_id"): + with Measure( + self.clock, name="unread_notifs_for_room_id", server_name=self.server_name + ): return await self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8d693fee304..bbef3a59a59 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -503,6 +503,7 @@ def process_replication_rows( class TypingNotificationEventSource(EventSource[int, JsonMapping]): def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self._main_store = hs.get_datastores().main self.clock = hs.get_clock() # We can't call get_typing_handler here because there's a cycle: @@ -535,7 +536,9 @@ async def get_new_events_as( appservice may be interested in. * The latest known room serial. """ - with Measure(self.clock, "typing.get_new_events_as"): + with Measure( + self.clock, name="typing.get_new_events_as", server_name=self.server_name + ): handler = self.get_typing_handler() events = [] @@ -571,7 +574,9 @@ async def get_new_events( Find typing notifications for given rooms (> `from_token` and <= `to_token`) """ - with Measure(self.clock, "typing.get_new_events"): + with Measure( + self.clock, name="typing.get_new_events", server_name=self.server_name + ): from_key = int(from_key) handler = self.get_typing_handler() diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 1f692c79a02..5f9e96706a5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -237,7 +237,9 @@ async def _unsafe_process(self) -> None: # Loop round handling deltas until we're up to date while True: - with Measure(self.clock, "user_dir_delta"): + with Measure( + self.clock, name="user_dir_delta", server_name=self.server_name + ): room_max_stream_ordering = self.store.get_room_max_stream_ordering() if self.pos == room_max_stream_ordering: return diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index a7742fcea85..4a47665abd3 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -92,6 +92,7 @@ class MatrixFederationAgent: def __init__( self, + server_name: str, reactor: ISynapseReactor, tls_client_options_factory: Optional[FederationPolicyForHTTPS], user_agent: bytes, @@ -100,6 +101,11 @@ def __init__( _srv_resolver: Optional[SrvResolver] = None, _well_known_resolver: Optional[WellKnownResolver] = None, ): + """ + Args: + server_name: Our homeserver name (used to label metrics) (`hs.hostname`). + """ + # proxy_reactor is not blocklisting reactor proxy_reactor = reactor @@ -127,6 +133,7 @@ def __init__( if _well_known_resolver is None: _well_known_resolver = WellKnownResolver( + server_name, reactor, agent=BlocklistingAgentWrapper( ProxyAgent( diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 9a6bac7281a..911cbac7eae 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -91,12 +91,19 @@ class WellKnownResolver: def __init__( self, + server_name: str, reactor: IReactorTime, agent: IAgent, user_agent: bytes, well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None, had_well_known_cache: Optional[TTLCache[bytes, bool]] = None, ): + """ + Args: + server_name: Our homeserver name (used to label metrics) (`hs.hostname`). + """ + + self.server_name = server_name self._reactor = reactor self._clock = Clock(reactor) @@ -134,7 +141,13 @@ async def get_well_known(self, server_name: bytes) -> WellKnownLookupResult: # TODO: should we linearise so that we don't end up doing two .well-known # requests for the same server in parallel? try: - with Measure(self._clock, "get_well_known"): + with Measure( + self._clock, + name="get_well_known", + # This should be our homeserver where the the code is running (used to + # label metrics) + server_name=self.server_name, + ): result: Optional[bytes] cache_period: float diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 97a863a1189..67ea9cdb811 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -417,6 +417,7 @@ def __init__( if hs.get_instance_name() in outbound_federation_restricted_to: # Talk to federation directly federation_agent: IAgent = MatrixFederationAgent( + self.server_name, self.reactor, tls_client_options_factory, user_agent.encode("ascii"), @@ -697,7 +698,11 @@ async def _send_request( outgoing_requests_counter.labels(request.method).inc() try: - with Measure(self.clock, "outbound_request"): + with Measure( + self.clock, + name="outbound_request", + server_name=self.server_name, + ): # we don't want all the fancy cookie and redirect handling # that treq.request gives: just use the raw Agent. diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 86ac2c23959..7e508dba057 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -66,6 +66,21 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") +SERVER_NAME_LABEL = "server_name" +""" +The `server_name` label is used to identify the homeserver that the metrics correspond +to. Because we support multiple instances of Synapse running in the same process and all +metrics are in a single global `REGISTRY`, we need to manually label any metrics. + +In the case of a Synapse homeserver, this should be set to the homeserver name +(`hs.hostname`). + +We're purposely not using the `instance` label for this purpose as that should be "The +: part of the target's URL that was scraped.". Also: "In Prometheus +terms, an endpoint you can scrape is called an *instance*, usually corresponding to a +single process." (source: https://prometheus.io/docs/concepts/jobs_instances/) +""" + class _RegistryProxy: @staticmethod @@ -192,7 +207,16 @@ def register( same key. Note that `callback` may be called on a separate thread. + + Args: + key: A tuple of label values, which must match the order of the + `labels` given to the constructor. + callback """ + assert len(key) == len(self.labels), ( + f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}" + ) + with self._lock: self._registrations.setdefault(key, set()).add(callback) @@ -201,7 +225,17 @@ def unregister( key: Tuple[str, ...], callback: Callable[[MetricsEntry], None], ) -> None: - """Registers that we've exited a block with labels `key`.""" + """ + Registers that we've exited a block with labels `key`. + + Args: + key: A tuple of label values, which must match the order of the + `labels` given to the constructor. + callback + """ + assert len(key) == len(self.labels), ( + f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}" + ) with self._lock: self._registrations.setdefault(key, set()).discard(callback) @@ -225,7 +259,7 @@ def collect(self) -> Iterable[Metric]: with self._lock: callbacks = set(self._registrations[key]) - in_flight.add_metric(key, len(callbacks)) + in_flight.add_metric(labels=key, value=len(callbacks)) metrics = self._metrics_class() metrics_by_key[key] = metrics @@ -239,7 +273,7 @@ def collect(self) -> Iterable[Metric]: "_".join([self.name, name]), "", labels=self.labels ) for key, metrics in metrics_by_key.items(): - gauge.add_metric(key, getattr(metrics, name)) + gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge def _register_with_collector(self) -> None: diff --git a/synapse/module_api/callbacks/media_repository_callbacks.py b/synapse/module_api/callbacks/media_repository_callbacks.py index 6fa80a8eab1..2ab65f9fd64 100644 --- a/synapse/module_api/callbacks/media_repository_callbacks.py +++ b/synapse/module_api/callbacks/media_repository_callbacks.py @@ -31,6 +31,7 @@ class MediaRepositoryModuleApiCallbacks: def __init__(self, hs: "HomeServer") -> None: + self.server_name = hs.hostname self.clock = hs.get_clock() self._get_media_config_for_user_callbacks: List[ GET_MEDIA_CONFIG_FOR_USER_CALLBACK @@ -57,7 +58,11 @@ def register_callbacks( async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]: for callback in self._get_media_config_for_user_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res: Optional[JsonDict] = await delay_cancellation(callback(user_id)) if res: return res @@ -68,7 +73,11 @@ async def is_user_allowed_to_upload_media_of_size( self, user_id: str, size: int ) -> bool: for callback in self._is_user_allowed_to_upload_media_of_size_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res: bool = await delay_cancellation(callback(user_id, size)) if not res: return res diff --git a/synapse/module_api/callbacks/ratelimit_callbacks.py b/synapse/module_api/callbacks/ratelimit_callbacks.py index 64f9cc81e8a..a580ea7d7c4 100644 --- a/synapse/module_api/callbacks/ratelimit_callbacks.py +++ b/synapse/module_api/callbacks/ratelimit_callbacks.py @@ -43,6 +43,7 @@ class RatelimitOverride: class RatelimitModuleApiCallbacks: def __init__(self, hs: "HomeServer") -> None: + self.server_name = hs.hostname self.clock = hs.get_clock() self._get_ratelimit_override_for_user_callbacks: List[ GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK @@ -64,7 +65,11 @@ async def get_ratelimit_override_for_user( self, user_id: str, limiter_name: str ) -> Optional[RatelimitOverride]: for callback in self._get_ratelimit_override_for_user_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res: Optional[RatelimitOverride] = await delay_cancellation( callback(user_id, limiter_name) ) diff --git a/synapse/module_api/callbacks/spamchecker_callbacks.py b/synapse/module_api/callbacks/spamchecker_callbacks.py index c43824f2139..428e733979c 100644 --- a/synapse/module_api/callbacks/spamchecker_callbacks.py +++ b/synapse/module_api/callbacks/spamchecker_callbacks.py @@ -356,6 +356,7 @@ class SpamCheckerModuleApiCallbacks: NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM" def __init__(self, hs: "synapse.server.HomeServer") -> None: + self.server_name = hs.hostname self.clock = hs.get_clock() self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = [] @@ -490,7 +491,11 @@ async def check_event_for_spam( generally discouraged as it doesn't support internationalization. """ for callback in self._check_event_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation(callback(event)) if res is False or res == self.NOT_SPAM: # This spam-checker accepts the event. @@ -543,7 +548,11 @@ async def should_drop_federated_event( True if the event should be silently dropped """ for callback in self._should_drop_federated_event_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res: Union[bool, str] = await delay_cancellation(callback(event)) if res: return res @@ -565,7 +574,11 @@ async def user_may_join_room( NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise. """ for callback in self._user_may_join_room_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation(callback(user_id, room_id, is_invited)) # Normalize return values to `Codes` or `"NOT_SPAM"`. if res is True or res is self.NOT_SPAM: @@ -604,7 +617,11 @@ async def user_may_invite( NOT_SPAM if the operation is permitted, Codes otherwise. """ for callback in self._user_may_invite_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation( callback(inviter_userid, invitee_userid, room_id) ) @@ -643,7 +660,11 @@ async def federated_user_may_invite( NOT_SPAM if the operation is permitted, Codes otherwise. """ for callback in self._federated_user_may_invite_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation(callback(event)) # Normalize return values to `Codes` or `"NOT_SPAM"`. if res is True or res is self.NOT_SPAM: @@ -686,7 +707,11 @@ async def user_may_send_3pid_invite( NOT_SPAM if the operation is permitted, Codes otherwise. """ for callback in self._user_may_send_3pid_invite_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation( callback(inviter_userid, medium, address, room_id) ) @@ -722,7 +747,11 @@ async def user_may_create_room( room_config: The room creation configuration which is the body of the /createRoom request """ for callback in self._user_may_create_room_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): checker_args = inspect.signature(callback) # Also ensure backwards compatibility with spam checker callbacks # that don't expect the room_config argument. @@ -786,7 +815,11 @@ async def user_may_send_state_event( content: The content of the state event """ for callback in self._user_may_send_state_event_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): # We make a copy of the content to ensure that the spam checker cannot modify it. res = await delay_cancellation( callback(user_id, room_id, event_type, state_key, deepcopy(content)) @@ -814,7 +847,11 @@ async def user_may_create_room_alias( """ for callback in self._user_may_create_room_alias_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation(callback(userid, room_alias)) if res is True or res is self.NOT_SPAM: continue @@ -847,7 +884,11 @@ async def user_may_publish_room( room_id: The ID of the room that would be published """ for callback in self._user_may_publish_room_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation(callback(userid, room_id)) if res is True or res is self.NOT_SPAM: continue @@ -889,7 +930,11 @@ async def check_username_for_spam( True if the user is spammy. """ for callback in self._check_username_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): checker_args = inspect.signature(callback) # Make a copy of the user profile object to ensure the spam checker cannot # modify it. @@ -938,7 +983,11 @@ async def check_registration_for_spam( """ for callback in self._check_registration_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): behaviour = await delay_cancellation( callback(email_threepid, username, request_info, auth_provider_id) ) @@ -980,7 +1029,11 @@ async def check_media_file_for_spam( """ for callback in self._check_media_file_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation(callback(file_wrapper, file_info)) # Normalize return values to `Codes` or `"NOT_SPAM"`. if res is False or res is self.NOT_SPAM: @@ -1027,7 +1080,11 @@ async def check_login_for_spam( """ for callback in self._check_login_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + name=f"{callback.__module__}.{callback.__qualname__}", + server_name=self.server_name, + ): res = await delay_cancellation( callback( user_id, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1f4f5b90c3a..f20b98f73f7 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -129,7 +129,8 @@ class BulkPushRuleEvaluator: def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main - self.clock = hs.get_clock() + self.server_name = hs.hostname # nb must be called this for @measure_func + self.clock = hs.get_clock() # nb must be called this for @measure_func self._event_auth_handler = hs.get_event_auth_handler() self.should_calculate_push_rules = self.hs.config.push.enable_push diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index f3f8ddfd657..c29ed8d1498 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -76,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): def __init__(self, hs: "HomeServer"): super().__init__(hs) + self.server_name = hs.hostname self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() @@ -122,7 +123,9 @@ async def _serialize_payload( # type: ignore[override] async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict ) -> Tuple[int, JsonDict]: - with Measure(self.clock, "repl_fed_send_events_parse"): + with Measure( + self.clock, name="repl_fed_send_events_parse", server_name=self.server_name + ): room_id = content["room_id"] backfilled = content["backfilled"] diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 01952a8d592..edda419a035 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -76,6 +76,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): def __init__(self, hs: "HomeServer"): super().__init__(hs) + self.server_name = hs.hostname self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() @@ -121,7 +122,9 @@ async def _serialize_payload( # type: ignore[override] async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict, event_id: str ) -> Tuple[int, JsonDict]: - with Measure(self.clock, "repl_send_event_parse"): + with Measure( + self.clock, name="repl_send_event_parse", server_name=self.server_name + ): event_dict = content["event"] room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]] internal_metadata = content["internal_metadata"] diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py index d965ce54924..15e363b3eb6 100644 --- a/synapse/replication/http/send_events.py +++ b/synapse/replication/http/send_events.py @@ -77,6 +77,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): def __init__(self, hs: "HomeServer"): super().__init__(hs) + self.server_name = hs.hostname self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() @@ -122,7 +123,9 @@ async def _serialize_payload( # type: ignore[override] async def _handle_request( # type: ignore[override] self, request: Request, payload: JsonDict ) -> Tuple[int, JsonDict]: - with Measure(self.clock, "repl_send_events_parse"): + with Measure( + self.clock, name="repl_send_events_parse", server_name=self.server_name + ): events_and_context = [] events = payload["events"] rooms = set() diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 0bd5478cd35..e71588f3de8 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -75,6 +75,7 @@ class ReplicationDataHandler: """ def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self.store = hs.get_datastores().main self.notifier = hs.get_notifier() self._reactor = hs.get_reactor() @@ -342,7 +343,11 @@ async def wait_for_stream_position( waiting_list.add((position, deferred)) # We measure here to get in flight counts and average waiting time. - with Measure(self._clock, "repl.wait_for_stream_position"): + with Measure( + self._clock, + name="repl.wait_for_stream_position", + server_name=self.server_name, + ): logger.info( "Waiting for repl stream %r to reach %s (%s); currently at: %s", stream_name, diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index d647a2b3326..0080a76f6f1 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -78,6 +78,7 @@ class ReplicationStreamer: """ def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self.store = hs.get_datastores().main self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -155,7 +156,11 @@ async def _run_notifier_loop(self) -> None: while self.pending_updates: self.pending_updates = False - with Measure(self.clock, "repl.stream.get_updates"): + with Measure( + self.clock, + name="repl.stream.get_updates", + server_name=self.server_name, + ): all_streams = self.streams if self._replication_torture_level is not None: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 1c3e5d00a92..9c24525845f 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -189,7 +189,8 @@ class StateHandler: """ def __init__(self, hs: "HomeServer"): - self.clock = hs.get_clock() + self.server_name = hs.hostname # nb must be called this for @measure_func + self.clock = hs.get_clock() # nb must be called this for @measure_func self.store = hs.get_datastores().main self._state_storage_controller = hs.get_storage_controllers().state self.hs = hs @@ -631,6 +632,7 @@ class StateResolutionHandler: """ def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self.clock = hs.get_clock() self.resolve_linearizer = Linearizer(name="state_resolve_lock") @@ -747,7 +749,9 @@ async def resolve_state_groups( # which will be used as a cache key for future resolutions, but # not get persisted. - with Measure(self.clock, "state.create_group_ids"): + with Measure( + self.clock, name="state.create_group_ids", server_name=self.server_name + ): cache = _make_state_cache_entry(new_state, state_groups_ids) self._state_cache[group_names] = cache @@ -785,7 +789,9 @@ async def resolve_events_with_store( a map from (type, state_key) to event_id. """ try: - with Measure(self.clock, "state._resolve_events") as m: + with Measure( + self.clock, name="state._resolve_events", server_name=self.server_name + ) as m: room_version_obj = KNOWN_ROOM_VERSIONS[room_version] if room_version_obj.state_res == StateResolutionVersions.V1: return await v1.resolve_events_with_store( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2309b1648e3..548e7df9305 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -55,6 +55,7 @@ def __init__( hs: "HomeServer", ): self.hs = hs + self.server_name = hs.hostname self._clock = hs.get_clock() self.database_engine = database.engine self.db_pool = database diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f5131fe2915..9f54430a226 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -337,6 +337,7 @@ def __init__( assert stores.persist_events self.persist_events_store = stores.persist_events + self.server_name = hs.hostname self._clock = hs.get_clock() self._instance_name = hs.get_instance_name() self.is_mine_id = hs.is_mine_id @@ -616,7 +617,11 @@ async def _persist_event_batch( state_delta_for_room = None if not backfilled: - with Measure(self._clock, "_calculate_state_and_extrem"): + with Measure( + self._clock, + name="_calculate_state_and_extrem", + server_name=self.server_name, + ): # Work out the new "current state" for the room. # We do this by working out what the new extremities are and then # calculating the state from that. @@ -627,7 +632,11 @@ async def _persist_event_batch( room_id, chunk ) - with Measure(self._clock, "calculate_chain_cover_index_for_events"): + with Measure( + self._clock, + name="calculate_chain_cover_index_for_events", + server_name=self.server_name, + ): # We now calculate chain ID/sequence numbers for any state events we're # persisting. We ignore out of band memberships as we're not in the room # and won't have their auth chain (we'll fix it up later if we join the @@ -719,7 +728,11 @@ async def _calculate_new_forward_extremities_and_state_delta( break logger.debug("Calculating state delta for room %s", room_id) - with Measure(self._clock, "persist_events.get_new_state_after_events"): + with Measure( + self._clock, + name="persist_events.get_new_state_after_events", + server_name=self.server_name, + ): res = await self._get_new_state_after_events( room_id, ev_ctx_rm, @@ -746,7 +759,11 @@ async def _calculate_new_forward_extremities_and_state_delta( # removed keys entirely. delta = DeltaState([], delta_ids) elif current_state is not None: - with Measure(self._clock, "persist_events.calculate_state_delta"): + with Measure( + self._clock, + name="persist_events.calculate_state_delta", + server_name=self.server_name, + ): delta = await self._calculate_state_delta(room_id, current_state) if delta: diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index f28f5d7e039..d79791fed4f 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -68,6 +68,7 @@ class StateStorageController: """ def __init__(self, hs: "HomeServer", stores: "Databases"): + self.server_name = hs.hostname self._is_mine_id = hs.is_mine_id self._clock = hs.get_clock() self.stores = stores @@ -812,7 +813,9 @@ async def get_joined_hosts( state_group = object() assert state_group is not None - with Measure(self._clock, "get_joined_hosts"): + with Measure( + self._clock, name="get_joined_hosts", server_name=self.server_name + ): return await self._get_joined_hosts( room_id, state_group, state_entry=state_entry ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9cc0493307c..2929b1d57a1 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1246,7 +1246,9 @@ def _fetch_event_list( to event row. Note that it may well contain additional events that were not part of this request. """ - with Measure(self._clock, "_fetch_event_list"): + with Measure( + self._clock, name="_fetch_event_list", server_name=self.server_name + ): try: events_to_fetch = { event_id for events, _ in event_list for event_id in events diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 7ca73abb835..ce77e0b0d64 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -983,7 +983,11 @@ async def get_joined_user_ids_from_state( `_get_user_ids_from_membership_event_ids` for any uncached events. """ - with Measure(self._clock, "get_joined_user_ids_from_state"): + with Measure( + self._clock, + name="get_joined_user_ids_from_state", + server_name=self.server_name, + ): users_in_room = set() member_event_ids = [ e_id for key, e_id in state.items() if key[0] == EventTypes.Member diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6a389f7a7e8..608a4d48489 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -41,59 +41,100 @@ LoggingContext, current_context, ) -from synapse.metrics import InFlightGauge +from synapse.metrics import SERVER_NAME_LABEL, InFlightGauge from synapse.util import Clock logger = logging.getLogger(__name__) -block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) +# Metrics to see the number of and how much time is spend in various blocks of code. +# +block_counter = Counter( + "synapse_util_metrics_block_count", + documentation="The number of times this block has been called.", + labelnames=["block_name", SERVER_NAME_LABEL], +) +"""The number of times this block has been called.""" -block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) +block_timer = Counter( + "synapse_util_metrics_block_time_seconds", + documentation="The cumulative time spent executing this block across all calls, in seconds.", + labelnames=["block_name", SERVER_NAME_LABEL], +) +"""The cumulative time spent executing this block across all calls, in seconds.""" block_ru_utime = Counter( - "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"] + "synapse_util_metrics_block_ru_utime_seconds", + documentation="Resource usage: user CPU time in seconds used in this block", + labelnames=["block_name", SERVER_NAME_LABEL], ) +"""Resource usage: user CPU time in seconds used in this block""" block_ru_stime = Counter( - "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"] + "synapse_util_metrics_block_ru_stime_seconds", + documentation="Resource usage: system CPU time in seconds used in this block", + labelnames=["block_name", SERVER_NAME_LABEL], ) +"""Resource usage: system CPU time in seconds used in this block""" block_db_txn_count = Counter( - "synapse_util_metrics_block_db_txn_count", "", ["block_name"] + "synapse_util_metrics_block_db_txn_count", + documentation="Number of database transactions completed in this block", + labelnames=["block_name", SERVER_NAME_LABEL], ) +"""Number of database transactions completed in this block""" # seconds spent waiting for db txns, excluding scheduling time, in this block block_db_txn_duration = Counter( - "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"] + "synapse_util_metrics_block_db_txn_duration_seconds", + documentation="Seconds spent waiting for database txns, excluding scheduling time, in this block", + labelnames=["block_name", SERVER_NAME_LABEL], ) +"""Seconds spent waiting for database txns, excluding scheduling time, in this block""" # seconds spent waiting for a db connection, in this block block_db_sched_duration = Counter( - "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"] + "synapse_util_metrics_block_db_sched_duration_seconds", + documentation="Seconds spent waiting for a db connection, in this block", + labelnames=["block_name", SERVER_NAME_LABEL], ) +"""Seconds spent waiting for a db connection, in this block""" # This is dynamically created in InFlightGauge.__init__. -class _InFlightMetric(Protocol): +class _BlockInFlightMetric(Protocol): + """ + Sub-metrics used for the `InFlightGauge` for blocks. + """ + real_time_max: float + """The longest observed duration of any single execution of this block, in seconds.""" real_time_sum: float + """The cumulative time spent executing this block across all calls, in seconds.""" -# Tracks the number of blocks currently active -in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge( +in_flight: InFlightGauge[_BlockInFlightMetric] = InFlightGauge( "synapse_util_metrics_block_in_flight", - "", - labels=["block_name"], + desc="Tracks the number of blocks currently active", + labels=["block_name", SERVER_NAME_LABEL], + # Matches the fields in the `_BlockInFlightMetric` sub_metrics=["real_time_max", "real_time_sum"], ) - +"""Tracks the number of blocks currently active""" P = ParamSpec("P") R = TypeVar("R") -class HasClock(Protocol): +class HasClockAndServerName(Protocol): clock: Clock + """ + Used to measure functions + """ + server_name: str + """ + The homeserver name that this Measure is associated with (used to label the metric) + (`hs.hostname`). + """ def measure_func( @@ -101,8 +142,9 @@ def measure_func( ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: """Decorate an async method with a `Measure` context manager. - The Measure is created using `self.clock`; it should only be used to decorate - methods in classes defining an instance-level `clock` attribute. + The Measure is created using `self.clock` and `self.server_name; it should only be + used to decorate methods in classes defining an instance-level `clock` and + `server_name` attributes. Usage: @@ -116,16 +158,21 @@ async def foo(...): with Measure(...): ... + Args: + name: The name of the metric to report (the block name) (used to label the + metric). Defaults to the name of the decorated function. """ def wrapper( - func: Callable[Concatenate[HasClock, P], Awaitable[R]], + func: Callable[Concatenate[HasClockAndServerName, P], Awaitable[R]], ) -> Callable[P, Awaitable[R]]: block_name = func.__name__ if name is None else name @wraps(func) - async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R: - with Measure(self.clock, block_name): + async def measured_func( + self: HasClockAndServerName, *args: P.args, **kwargs: P.kwargs + ) -> R: + with Measure(self.clock, name=block_name, server_name=self.server_name): r = await func(self, *args, **kwargs) return r @@ -142,19 +189,24 @@ class Measure: __slots__ = [ "clock", "name", + "server_name", "_logging_context", "start", ] - def __init__(self, clock: Clock, name: str) -> None: + def __init__(self, clock: Clock, *, name: str, server_name: str) -> None: """ Args: clock: An object with a "time()" method, which returns the current time in seconds. - name: The name of the metric to report. + name: The name of the metric to report (the block name) (used to label the + metric). + server_name: The homeserver name that this Measure is associated with (used to + label the metric) (`hs.hostname`). """ self.clock = clock self.name = name + self.server_name = server_name curr_context = current_context() if not curr_context: logger.warning( @@ -174,7 +226,7 @@ def __enter__(self) -> "Measure": self.start = self.clock.time() self._logging_context.__enter__() - in_flight.register((self.name,), self._update_in_flight) + in_flight.register((self.name, self.server_name), self._update_in_flight) logger.debug("Entering block %s", self.name) @@ -194,19 +246,20 @@ def __exit__( duration = self.clock.time() - self.start usage = self.get_resource_usage() - in_flight.unregister((self.name,), self._update_in_flight) + in_flight.unregister((self.name, self.server_name), self._update_in_flight) self._logging_context.__exit__(exc_type, exc_val, exc_tb) try: - block_counter.labels(self.name).inc() - block_timer.labels(self.name).inc(duration) - block_ru_utime.labels(self.name).inc(usage.ru_utime) - block_ru_stime.labels(self.name).inc(usage.ru_stime) - block_db_txn_count.labels(self.name).inc(usage.db_txn_count) - block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) - block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) - except ValueError: - logger.warning("Failed to save metrics! Usage: %s", usage) + labels = {"block_name": self.name, SERVER_NAME_LABEL: self.server_name} + block_counter.labels(**labels).inc() + block_timer.labels(**labels).inc(duration) + block_ru_utime.labels(**labels).inc(usage.ru_utime) + block_ru_stime.labels(**labels).inc(usage.ru_stime) + block_db_txn_count.labels(**labels).inc(usage.db_txn_count) + block_db_txn_duration.labels(**labels).inc(usage.db_txn_duration_sec) + block_db_sched_duration.labels(**labels).inc(usage.db_sched_duration_sec) + except ValueError as exc: + logger.warning("Failed to save metrics! Usage: %s Error: %s", usage, exc) def get_resource_usage(self) -> ContextResourceUsage: """Get the resources used within this Measure block @@ -215,7 +268,7 @@ def get_resource_usage(self) -> ContextResourceUsage: """ return self._logging_context.get_resource_usage() - def _update_in_flight(self, metrics: _InFlightMetric) -> None: + def _update_in_flight(self, metrics: _BlockInFlightMetric) -> None: """Gets called when processing in flight metrics""" assert self.start is not None duration = self.clock.time() - self.start diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 9d8960315fe..394315d2b0b 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -86,6 +86,7 @@ def make_homeserver( self.mock_federation_client = AsyncMock(spec=["put_json"]) self.mock_federation_client.put_json.return_value = (200, "OK") self.mock_federation_client.agent = MatrixFederationAgent( + "OUR_STUB_HOMESERVER_NAME", reactor, tls_client_options_factory=None, user_agent=b"SynapseInTrialTest/0.0.0", diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 6b25e53c28e..eb859ca47a1 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -91,6 +91,7 @@ def setUp(self) -> None: "test_cache", timer=self.reactor.seconds ) self.well_known_resolver = WellKnownResolver( + "OUR_STUB_HOMESERVER_NAME", self.reactor, Agent(self.reactor, contextFactory=self.tls_factory), b"test-agent", @@ -269,6 +270,7 @@ def _make_agent(self) -> MatrixFederationAgent: because it is created too early during setUp """ return MatrixFederationAgent( + "OUR_STUB_HOMESERVER_NAME", reactor=cast(ISynapseReactor, self.reactor), tls_client_options_factory=self.tls_factory, user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided. @@ -1011,6 +1013,7 @@ def test_get_well_known_unsigned_cert(self) -> None: # Build a new agent and WellKnownResolver with a different tls factory tls_factory = FederationPolicyForHTTPS(config) agent = MatrixFederationAgent( + "OUR_STUB_HOMESERVER_NAME", reactor=self.reactor, tls_client_options_factory=tls_factory, user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below. @@ -1018,6 +1021,7 @@ def test_get_well_known_unsigned_cert(self) -> None: ip_blocklist=IPSet(), _srv_resolver=self.mock_resolver, _well_known_resolver=WellKnownResolver( + "OUR_STUB_HOMESERVER_NAME", cast(ISynapseReactor, self.reactor), Agent(self.reactor, contextFactory=tls_factory), b"test-agent", diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 58a7a9dc721..6c4145f2c26 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -68,6 +68,7 @@ def setUp(self) -> None: reactor, _ = get_clock() self.matrix_federation_agent = MatrixFederationAgent( + "OUR_STUB_HOMESERVER_NAME", reactor, tls_client_options_factory=None, user_agent=b"SynapseInTrialTest/0.0.0",