Skip to content

Commit

Permalink
topotato: close-at-stop ctx manager, plug FD leaks
Browse files Browse the repository at this point in the history
Turns out when you look for FD leaks, you'll find some.

Add a context manager that allows cleaning up stuff on stopping a
virtual router, and use it for scapy & multicast subscriptions.

Signed-off-by: David Lamparter <[email protected]>
  • Loading branch information
eqvinox committed Oct 1, 2024
1 parent a3331b9 commit f4d21af
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 5 deletions.
2 changes: 1 addition & 1 deletion topotato/livescapy.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, ifname: str, sock: SuperSocket):
self._sock = sock

def fileno(self):
if self._sock is None:
if self._sock is None or self._sock.fileno() == -1:
return None
return self._sock.fileno()

Expand Down
7 changes: 5 additions & 2 deletions topotato/multicast.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ def __init__(self, rtr, iface):
def _get_sock_ifindex(self, router, af):
if not self._sock:
with router:
self._sock = socket.socket(af, socket.SOCK_DGRAM, 0)
self._ifindex = socket.if_nametoindex(self._iface.ifname)
with router.ctx_until_stop(
socket.socket(af, socket.SOCK_DGRAM, 0)
) as sock:
self._sock = sock
self._ifindex = socket.if_nametoindex(self._iface.ifname)

return self._sock, self._ifindex

Expand Down
154 changes: 154 additions & 0 deletions topotato/topobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@

from abc import ABC, abstractmethod
import os
import weakref
import warnings
import typing
from typing import (
cast,
Any,
Callable,
ContextManager,
Dict,
List,
Literal,
Expand All @@ -39,6 +43,83 @@
from .timeline import Timeline


class _ContextAtexit:
"""
Context Manager wrapper to hold things until a virtual system stops.
Used primarily for sockets in a virtual system. Refer to
:py:meth:`BaseNS.ctx_until_stop`.
"""

ns: "BaseNS"

hold: Callable[[], Optional[ContextManager]]
"""
Either a regular closure, or a weakref, to get the wrapped context manager.
Using a weakref allows the wrapped context object's lifetime to end earlier.
"""

exit_on_exception: bool

_strong_ref: Optional[ContextManager]
"""
Used only during the span between ``__init__`` and ``__exit__``.
"""

def __init__(
self,
ns: "BaseNS",
context: ContextManager,
*,
exit_on_exception=True,
weak=False,
):
self.ns = ns
self._strong_ref = context
self.exit_on_exception = exit_on_exception

if weak:
self.hold = weakref.ref(context)
else:
self.hold = lambda: context

def __enter__(self):
assert self._strong_ref is not None

return self._strong_ref.__enter__()

def __exit__(self, exc_type, exc_value, tb):
assert self._strong_ref is not None

if self.exit_on_exception and exc_value is not None:
self._strong_ref.__exit__(exc_type, exc_value, tb)
else:
self.ns.atexit(self)

# context manager may now drop if weak=True
self._strong_ref = None

def __call__(self):
assert self._strong_ref is None

context = self.hold()
if context is None:
# weakref gone away
return
context.__exit__(None, None, None)


class AtexitExceptionIgnoredWarning(UserWarning):
"""
:py:mod:`warnings` category for exceptions ignored during router atexit.
Exceptions from a router's :py:meth:`BaseNS.atexit` functions shouldn't
prevent other atexit functions from running (or really abort things
in general.) They're therefore converted to warnings of this category
(much like exceptions during normal ``atexit`` or ``__del__``.)
"""


class BaseNS:
"""
Common interface to a virtual host/router.
Expand All @@ -53,12 +134,14 @@ class BaseNS:
"""

instance: "NetworkInstance"
_atexit: List[Callable[[], None]]

def __init__(self, *, instance: "NetworkInstance", **kw) -> None:
self.instance = instance
self_or_kwarg(self, kw, "name")

super().__init__(**kw)
self._atexit = []

def tempfile(self, name: str) -> str:
"""
Expand Down Expand Up @@ -93,7 +176,78 @@ def end_prep(self) -> None:
def end(self) -> None:
"""
Stop this virtual system.
.. warning::
This should call :py:meth:`_do_atexit`, but currently doesn't.
Needs a cleanup pass on sequencing of things between
:py:meth:`end_prep`, :py:meth:`end`, and MRO/super() call order.
(The main concern is losing important logs/events on shutdown if
closing things too early.)
"""

def _do_atexit(self) -> None:
for atexit in self._atexit:
try:
atexit()
except Exception as e: # pylint: disable=broad-exception-caught
filename, lineno, module_globals = "<???>", 0, {}

tb = e.__traceback__
if tb:
while tb.tb_next:
tb = tb.tb_next

filename = tb.tb_frame.f_code.co_filename
lineno = tb.tb_frame.f_lineno
module_globals = tb.tb_frame.f_globals
del tb

warnings.warn_explicit(
f"Exception during atexit: {e!r}",
AtexitExceptionIgnoredWarning,
filename=filename,
lineno=lineno,
module_globals=module_globals,
)

def atexit(self, fn: Callable[[], None]):
"""
Call given function when this system stops.
Exceptions from the function are converted into warnings of category
:py:class:`AtexitExceptionIgnoredWarning`, i.e. won't crash out.
"""
self._atexit.insert(0, fn)

def ctx_until_stop(
self, context: ContextManager, *, exit_on_exception=True, weak=False
):
"""
Wrap a context manager and keep it alive until this system stops.
Use like::
with router.ctx_until_stop(socket(...)) as sock:
sock.connect(...)
The socket will then be closed when router is stopped, or will be
immediately closed if an exception occurs inside the ``with`` block.
:param ContextManager context: context manager to wrap.
:param bool exit_on_exception: if True (default), clean up the context
manager immediately if an exception happens in the ``with`` block.
If False, the context manager always lives until the router shuts
down.
:param bool weak: use a weak reference to hold the context manager.
May be useful to allow some object to go out of scope earlier.
:return: a context manager wrapping the original one, with the same
return value from ``__enter__``.
"""
return _ContextAtexit(
self, context, exit_on_exception=exit_on_exception, weak=weak
)


class SwitchyNS(BaseNS):
Expand Down
10 changes: 8 additions & 2 deletions topotato/topolinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,11 @@ def start(self):
for br in self.bridges:
args.extend(["-i", br])

self.scapys[br] = scapy.config.conf.L2socket(iface=br)
os.set_blocking(self.scapys[br].fileno(), False)
with self.switch_ns.ctx_until_stop(
scapy.config.conf.L2socket(iface=br)
) as sock:
self.scapys[br] = sock
os.set_blocking(sock.fileno(), False)

self.switch_ns.start_run()
for rns in self.routers.values():
Expand Down Expand Up @@ -528,7 +531,10 @@ def stop(self):
for rns in self.routers.values():
rns.end_prep()
for rns in self.routers.values():
rns._do_atexit()
rns.end()

self.switch_ns._do_atexit()
self.switch_ns.end()
self._gcov_collect()

Expand Down

0 comments on commit f4d21af

Please sign in to comment.