Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support reactor timing on more reactors. #16532

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16532.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support reactor tick timings on more types of event loops.
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ files =
build_rust.py

[mypy-synapse.metrics._reactor_metrics]
# This module imports select.epoll. That exists on Linux, but doesn't on macOS.
# See https://github.com/matrix-org/synapse/pull/11771.
# This module pokes at the internals of OS-specific classes, to appease mypy
# on different systems we add additional ignores.
warn_unused_ignores = False

[mypy-synapse.util.caches.treecache]
Expand Down
130 changes: 103 additions & 27 deletions synapse/metrics/_reactor_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import select
import logging
import time
from typing import Any, Iterable, List, Tuple
from selectors import SelectSelector, _PollLikeSelector # type: ignore[attr-defined]
from typing import Any, Callable, Iterable

from prometheus_client import Histogram, Metric
from prometheus_client.core import REGISTRY, GaugeMetricFamily

from twisted.internet import reactor
from twisted.internet import reactor, selectreactor
from twisted.internet.asyncioreactor import AsyncioSelectorReactor

from synapse.metrics._types import Collector

try:
from selectors import KqueueSelector
except ImportError:

class KqueueSelector: # type: ignore[no-redef]
pass


try:
from twisted.internet.epollreactor import EPollReactor
except ImportError:

class EPollReactor: # type: ignore[no-redef]
pass


try:
from twisted.internet.pollreactor import PollReactor
except ImportError:

class PollReactor: # type: ignore[no-redef]
pass


logger = logging.getLogger(__name__)

#
# Twisted reactor metrics
#
Expand All @@ -34,52 +62,100 @@
)


class EpollWrapper:
"""a wrapper for an epoll object which records the time between polls"""
class CallWrapper:
"""A wrapper for a callable which records the time between calls"""

def __init__(self, poller: "select.epoll"): # type: ignore[name-defined]
def __init__(self, wrapped: Callable[..., Any]):
self.last_polled = time.time()
self._poller = poller
self._wrapped = wrapped

def poll(self, *args, **kwargs) -> List[Tuple[int, int]]: # type: ignore[no-untyped-def]
# record the time since poll() was last called. This gives a good proxy for
def __call__(self, *args, **kwargs) -> Any: # type: ignore[no-untyped-def]
# record the time since this was last called. This gives a good proxy for
# how long it takes to run everything in the reactor - ie, how long anything
# waiting for the next tick will have to wait.
tick_time.observe(time.time() - self.last_polled)

ret = self._poller.poll(*args, **kwargs)
ret = self._wrapped(*args, **kwargs)

self.last_polled = time.time()
return ret


class ObjWrapper:
"""A wrapper for an object which wraps a specified method in CallWrapper.

Other methods/attributes are passed to the original object.

This is necessary when the wrapped object does not allow the attribute to be
overwritten.
"""

def __init__(self, wrapped: Any, method_name: str):
self._wrapped = wrapped
self._method_name = method_name
self._wrapped_method = CallWrapper(getattr(wrapped, method_name))

def __getattr__(self, item: str) -> Any:
return getattr(self._poller, item)
if item == self._method_name:
return self._wrapped_method

return getattr(self._wrapped, item)


class ReactorLastSeenMetric(Collector):
def __init__(self, epoll_wrapper: EpollWrapper):
self._epoll_wrapper = epoll_wrapper
def __init__(self, call_wrapper: CallWrapper):
self._call_wrapper = call_wrapper

def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
)
cm.add_metric([], time.time() - self._epoll_wrapper.last_polled)
cm.add_metric([], time.time() - self._call_wrapper.last_polled)
yield cm


# Twisted has already select a reasonable reactor for us, so assumptions can be
# made about the shape.
wrapper = None
try:
# if the reactor has a `_poller` attribute, which is an `epoll` object
# (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will
# measure the time between ticks
from select import epoll # type: ignore[attr-defined]

poller = reactor._poller # type: ignore[attr-defined]
except (AttributeError, ImportError):
pass
else:
if isinstance(poller, epoll):
poller = EpollWrapper(poller)
reactor._poller = poller # type: ignore[attr-defined]
REGISTRY.register(ReactorLastSeenMetric(poller))
if isinstance(reactor, (PollReactor, EPollReactor)):
reactor._poller = ObjWrapper(reactor._poller, "poll") # type: ignore[attr-defined]
wrapper = reactor._poller._wrapped_method # type: ignore[attr-defined]

elif isinstance(reactor, selectreactor.SelectReactor):
# Twisted uses a module-level _select function.
wrapper = selectreactor._select = CallWrapper(selectreactor._select)

elif isinstance(reactor, AsyncioSelectorReactor):
# For asyncio look at the underlying asyncio event loop.
asyncio_loop = reactor._asyncioEventloop # A sub-class of BaseEventLoop,

# A sub-class of BaseSelector.
selector = asyncio_loop._selector # type: ignore[attr-defined]

if isinstance(selector, SelectSelector):
wrapper = selector._select = CallWrapper(selector._select) # type: ignore[attr-defined]

# poll, epoll, and /dev/poll.
elif isinstance(selector, _PollLikeSelector):
selector._selector = ObjWrapper(selector._selector, "poll") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]

elif isinstance(selector, KqueueSelector):
selector._selector = ObjWrapper(selector._selector, "control") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]

else:
# E.g. this does not support the (Windows-only) ProactorEventLoop.
logger.warning(
"Skipping configuring ReactorLastSeenMetric: unexpected asyncio loop selector: %r via %r",
selector,
asyncio_loop,
)
except Exception as e:
logger.warning("Configuring ReactorLastSeenMetric failed: %r", e)


if wrapper:
REGISTRY.register(ReactorLastSeenMetric(wrapper))
Loading