From 2b7c180879e5d62145feed88375ba55f18fc2ae5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Oct 2020 09:30:19 +0000 Subject: [PATCH] Start fewer opentracing spans (#8640) #8567 started a span for every background process. This is good as it means all Synapse code that gets run should be in a span (unless in the sentinel logging context), but it means we generate about 15x the number of spans as we did previously. This PR attempts to reduce that number by a) not starting one for send commands to Redis, and b) deferring starting background processes until after we're sure they're necessary. I don't really know how much this will help. --- changelog.d/8640.misc | 1 + synapse/handlers/appservice.py | 50 ++++++++++++++++--- synapse/logging/opentracing.py | 10 ++-- synapse/metrics/background_process_metrics.py | 12 +++-- synapse/notifier.py | 34 ++++--------- synapse/push/pusherpool.py | 18 ++++++- synapse/replication/tcp/redis.py | 4 +- tests/handlers/test_appservice.py | 20 +++----- 8 files changed, 96 insertions(+), 53 deletions(-) create mode 100644 changelog.d/8640.misc diff --git a/changelog.d/8640.misc b/changelog.d/8640.misc new file mode 100644 index 000000000000..cf6023f7835c --- /dev/null +++ b/changelog.d/8640.misc @@ -0,0 +1 @@ +Reduce number of OpenTracing spans started. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 07240d3a14ba..7826387e5385 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from prometheus_client import Counter @@ -30,7 +30,10 @@ event_processing_loop_counter, event_processing_loop_room_count, ) -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.types import Collection, JsonDict, RoomStreamToken, UserID from synapse.util.metrics import Measure @@ -53,7 +56,7 @@ def __init__(self, hs): self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, max_token: RoomStreamToken): + def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any @@ -72,6 +75,12 @@ async def notify_interested_services(self, max_token: RoomStreamToken): if self.is_processing: return + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_interested_services(max_token) + + @wrap_as_background_process("notify_interested_services") + async def _notify_interested_services(self, max_token: RoomStreamToken): with Measure(self.clock, "notify_interested_services"): self.is_processing = True try: @@ -166,8 +175,11 @@ async def handle_room_events(events): finally: self.is_processing = False - async def notify_interested_services_ephemeral( - self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [], + def notify_interested_services_ephemeral( + self, + stream_key: str, + new_token: Optional[int], + users: Collection[Union[str, UserID]] = [], ): """This is called by the notifier in the background when a ephemeral event handled by the homeserver. @@ -183,13 +195,34 @@ async def notify_interested_services_ephemeral( new_token: The latest stream token users: The user(s) involved with the event. """ + if not self.notify_appservices: + return + + if stream_key not in ("typing_key", "receipt_key", "presence_key"): + return + services = [ service for service in self.store.get_app_services() if service.supports_ephemeral ] - if not services or not self.notify_appservices: + if not services: return + + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_interested_services_ephemeral( + services, stream_key, new_token, users + ) + + @wrap_as_background_process("notify_interested_services_ephemeral") + async def _notify_interested_services_ephemeral( + self, + services: List[ApplicationService], + stream_key: str, + new_token: Optional[int], + users: Collection[Union[str, UserID]], + ): logger.info("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: @@ -237,7 +270,7 @@ async def _handle_receipts(self, service: ApplicationService): return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[UserID] + self, service: ApplicationService, users: Collection[Union[str, UserID]] ): events = [] # type: List[JsonDict] presence_source = self.event_sources.sources["presence"] @@ -245,6 +278,9 @@ async def _handle_presence( service, "presence" ) for user in users: + if isinstance(user, str): + user = UserID.from_string(user) + interested = await service.is_interested_in_presence(user, self.store) if not interested: continue diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index e58850faff86..ab586c318c03 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -317,7 +317,7 @@ def ensure_active_span_inner_2(*args, **kwargs): @contextlib.contextmanager -def _noop_context_manager(*args, **kwargs): +def noop_context_manager(*args, **kwargs): """Does exactly what it says on the tin""" yield @@ -413,7 +413,7 @@ def start_active_span( """ if opentracing is None: - return _noop_context_manager() + return noop_context_manager() return opentracing.tracer.start_active_span( operation_name, @@ -428,7 +428,7 @@ def start_active_span( def start_active_span_follows_from(operation_name, contexts): if opentracing is None: - return _noop_context_manager() + return noop_context_manager() references = [opentracing.follows_from(context) for context in contexts] scope = start_active_span(operation_name, references=references) @@ -459,7 +459,7 @@ def start_active_span_from_request( # Also, twisted uses byte arrays while opentracing expects strings. if opentracing is None: - return _noop_context_manager() + return noop_context_manager() header_dict = { k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() @@ -497,7 +497,7 @@ def start_active_span_from_edu( """ if opentracing is None: - return _noop_context_manager() + return noop_context_manager() carrier = json_decoder.decode(edu_content.get("context", "{}")).get( "opentracing", {} diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 08fbf78eeeca..658f6ecd72a3 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -24,7 +24,7 @@ from twisted.internet import defer from synapse.logging.context import LoggingContext, PreserveLoggingContext -from synapse.logging.opentracing import start_active_span +from synapse.logging.opentracing import noop_context_manager, start_active_span if TYPE_CHECKING: import resource @@ -167,7 +167,7 @@ def update_metrics(self): ) -def run_as_background_process(desc: str, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs): Args: desc: a description for this background process type func: a function, which may return a Deferred or a coroutine + bg_start_span: Whether to start an opentracing span. Defaults to True. + Should only be disabled for processes that will not log to or tag + a span. args: positional args for func kwargs: keyword args for func @@ -199,7 +202,10 @@ async def run(): with BackgroundProcessLoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) try: - with start_active_span(desc, tags={"request_id": context.request}): + ctx = noop_context_manager() + if bg_start_span: + ctx = start_active_span(desc, tags={"request_id": context.request}) + with ctx: result = func(*args, **kwargs) if inspect.isawaitable(result): diff --git a/synapse/notifier.py b/synapse/notifier.py index 858b487bec35..eb56b26f219c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -40,7 +40,6 @@ from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.streams.config import PaginationConfig from synapse.types import ( Collection, @@ -310,44 +309,37 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken): """ # poke any interested application service. - run_as_background_process( - "_notify_app_services", self._notify_app_services, max_room_stream_token - ) - - run_as_background_process( - "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token - ) + self._notify_app_services(max_room_stream_token) + self._notify_pusher_pool(max_room_stream_token) if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_token) - async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): + def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: - await self.appservice_handler.notify_interested_services( - max_room_stream_token - ) + self.appservice_handler.notify_interested_services(max_room_stream_token) except Exception: logger.exception("Error notifying application services of event") - async def _notify_app_services_ephemeral( + def _notify_app_services_ephemeral( self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Collection[UserID] = [], + users: Collection[Union[str, UserID]] = [], ): try: stream_token = None if isinstance(new_token, int): stream_token = new_token - await self.appservice_handler.notify_interested_services_ephemeral( + self.appservice_handler.notify_interested_services_ephemeral( stream_key, stream_token, users ) except Exception: logger.exception("Error notifying application services of event") - async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token) + self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") @@ -384,12 +376,8 @@ def on_new_event( self.notify_replication() # Notify appservices - run_as_background_process( - "_notify_app_services_ephemeral", - self._notify_app_services_ephemeral, - stream_key, - new_token, - users, + self._notify_app_services_ephemeral( + stream_key, new_token, users, ) def on_new_replication_data(self) -> None: diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0080c68ce28e..f32596498396 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,7 +19,10 @@ from prometheus_client import Gauge -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher @@ -187,7 +190,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens): ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_token: RoomStreamToken): + def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return @@ -201,6 +204,17 @@ async def on_new_notifications(self, max_token: RoomStreamToken): # Nothing to do return + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._on_new_notifications(max_token) + + @wrap_as_background_process("on_new_notifications") + async def _on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + prev_stream_id = self._last_room_stream_id_seen self._last_room_stream_id_seen = max_stream_id diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index de19705c1f41..bc6ba709a785 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -166,7 +166,9 @@ def send_command(self, cmd: Command): Args: cmd (Command) """ - run_as_background_process("send-cmd", self._async_send_command, cmd) + run_as_background_process( + "send-cmd", self._async_send_command, cmd, bg_start_span=False + ) async def _async_send_command(self, cmd: Command): """Encode a replication command and send it over our outbound connection""" diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ee4f3da31c83..53763cd0f989 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -42,7 +42,6 @@ def setUp(self): hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) - @defer.inlineCallbacks def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) services = [ @@ -62,14 +61,12 @@ def test_notify_interested_services(self): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) - @defer.inlineCallbacks def test_query_user_exists_unknown_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] @@ -83,12 +80,11 @@ def test_query_user_exists_unknown_user(self): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) - @defer.inlineCallbacks def test_query_user_exists_known_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] @@ -102,9 +98,9 @@ def test_query_user_exists_known_user(self): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.",