Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Support reactor timing metric on more reactors. (#16532)
Browse files Browse the repository at this point in the history
Previously only Twisted's EPollReactor was compatible with the
reactor timing metric, notably not working when asyncio was used.

After this change, the following configurations support the reactor
timing metric:

* poll, epoll, or select reactors
* asyncio reactor with a poll, epoll, select, /dev/poll, or kqueue event loop.
  • Loading branch information
clokep authored Nov 6, 2023
1 parent 1a9b22a commit cc4fe68
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 29 deletions.
1 change: 1 addition & 0 deletions changelog.d/16532.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support reactor tick timings on more types of event loops.
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ files =
build_rust.py

[mypy-synapse.metrics._reactor_metrics]
# This module imports select.epoll. That exists on Linux, but doesn't on macOS.
# See https://github.com/matrix-org/synapse/pull/11771.
# This module pokes at the internals of OS-specific classes, to appease mypy
# on different systems we add additional ignores.
warn_unused_ignores = False

[mypy-synapse.util.caches.treecache]
Expand Down
130 changes: 103 additions & 27 deletions synapse/metrics/_reactor_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import select
import logging
import time
from typing import Any, Iterable, List, Tuple
from selectors import SelectSelector, _PollLikeSelector # type: ignore[attr-defined]
from typing import Any, Callable, Iterable

from prometheus_client import Histogram, Metric
from prometheus_client.core import REGISTRY, GaugeMetricFamily

from twisted.internet import reactor
from twisted.internet import reactor, selectreactor
from twisted.internet.asyncioreactor import AsyncioSelectorReactor

from synapse.metrics._types import Collector

try:
from selectors import KqueueSelector
except ImportError:

class KqueueSelector: # type: ignore[no-redef]
pass


try:
from twisted.internet.epollreactor import EPollReactor
except ImportError:

class EPollReactor: # type: ignore[no-redef]
pass


try:
from twisted.internet.pollreactor import PollReactor
except ImportError:

class PollReactor: # type: ignore[no-redef]
pass


logger = logging.getLogger(__name__)

#
# Twisted reactor metrics
#
Expand All @@ -34,52 +62,100 @@
)


class EpollWrapper:
"""a wrapper for an epoll object which records the time between polls"""
class CallWrapper:
"""A wrapper for a callable which records the time between calls"""

def __init__(self, poller: "select.epoll"): # type: ignore[name-defined]
def __init__(self, wrapped: Callable[..., Any]):
self.last_polled = time.time()
self._poller = poller
self._wrapped = wrapped

def poll(self, *args, **kwargs) -> List[Tuple[int, int]]: # type: ignore[no-untyped-def]
# record the time since poll() was last called. This gives a good proxy for
def __call__(self, *args, **kwargs) -> Any: # type: ignore[no-untyped-def]
# record the time since this was last called. This gives a good proxy for
# how long it takes to run everything in the reactor - ie, how long anything
# waiting for the next tick will have to wait.
tick_time.observe(time.time() - self.last_polled)

ret = self._poller.poll(*args, **kwargs)
ret = self._wrapped(*args, **kwargs)

self.last_polled = time.time()
return ret


class ObjWrapper:
"""A wrapper for an object which wraps a specified method in CallWrapper.
Other methods/attributes are passed to the original object.
This is necessary when the wrapped object does not allow the attribute to be
overwritten.
"""

def __init__(self, wrapped: Any, method_name: str):
self._wrapped = wrapped
self._method_name = method_name
self._wrapped_method = CallWrapper(getattr(wrapped, method_name))

def __getattr__(self, item: str) -> Any:
return getattr(self._poller, item)
if item == self._method_name:
return self._wrapped_method

return getattr(self._wrapped, item)


class ReactorLastSeenMetric(Collector):
def __init__(self, epoll_wrapper: EpollWrapper):
self._epoll_wrapper = epoll_wrapper
def __init__(self, call_wrapper: CallWrapper):
self._call_wrapper = call_wrapper

def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
)
cm.add_metric([], time.time() - self._epoll_wrapper.last_polled)
cm.add_metric([], time.time() - self._call_wrapper.last_polled)
yield cm


# Twisted has already select a reasonable reactor for us, so assumptions can be
# made about the shape.
wrapper = None
try:
# if the reactor has a `_poller` attribute, which is an `epoll` object
# (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will
# measure the time between ticks
from select import epoll # type: ignore[attr-defined]

poller = reactor._poller # type: ignore[attr-defined]
except (AttributeError, ImportError):
pass
else:
if isinstance(poller, epoll):
poller = EpollWrapper(poller)
reactor._poller = poller # type: ignore[attr-defined]
REGISTRY.register(ReactorLastSeenMetric(poller))
if isinstance(reactor, (PollReactor, EPollReactor)):
reactor._poller = ObjWrapper(reactor._poller, "poll") # type: ignore[attr-defined]
wrapper = reactor._poller._wrapped_method # type: ignore[attr-defined]

elif isinstance(reactor, selectreactor.SelectReactor):
# Twisted uses a module-level _select function.
wrapper = selectreactor._select = CallWrapper(selectreactor._select)

elif isinstance(reactor, AsyncioSelectorReactor):
# For asyncio look at the underlying asyncio event loop.
asyncio_loop = reactor._asyncioEventloop # A sub-class of BaseEventLoop,

# A sub-class of BaseSelector.
selector = asyncio_loop._selector # type: ignore[attr-defined]

if isinstance(selector, SelectSelector):
wrapper = selector._select = CallWrapper(selector._select) # type: ignore[attr-defined]

# poll, epoll, and /dev/poll.
elif isinstance(selector, _PollLikeSelector):
selector._selector = ObjWrapper(selector._selector, "poll") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]

elif isinstance(selector, KqueueSelector):
selector._selector = ObjWrapper(selector._selector, "control") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]

else:
# E.g. this does not support the (Windows-only) ProactorEventLoop.
logger.warning(
"Skipping configuring ReactorLastSeenMetric: unexpected asyncio loop selector: %r via %r",
selector,
asyncio_loop,
)
except Exception as e:
logger.warning("Configuring ReactorLastSeenMetric failed: %r", e)


if wrapper:
REGISTRY.register(ReactorLastSeenMetric(wrapper))

0 comments on commit cc4fe68

Please sign in to comment.