diff --git a/nixos/lib/test-driver/default.nix b/nixos/lib/test-driver/default.nix index ed8eb2c8c7711..72aa6b11bfe5a 100644 --- a/nixos/lib/test-driver/default.nix +++ b/nixos/lib/test-driver/default.nix @@ -19,9 +19,12 @@ qemu_test, setuptools, socat, + systemd, tesseract4, + util-linux, vde2, + enableNspawn ? false, enableOCR ? false, extraPythonPackages ? (_: [ ]), }: @@ -51,8 +54,12 @@ buildPythonApplication { netpbm qemu_pkg socat + util-linux vde2 ] + ++ lib.optionals enableNspawn [ + systemd + ] ++ lib.optionals enableOCR [ imagemagick_light tesseract4 diff --git a/nixos/lib/test-driver/src/extract-docstrings.py b/nixos/lib/test-driver/src/extract-docstrings.py index 64850ca711f3b..030ed01897046 100644 --- a/nixos/lib/test-driver/src/extract-docstrings.py +++ b/nixos/lib/test-driver/src/extract-docstrings.py @@ -51,7 +51,7 @@ def main() -> None: class_definitions = (node for node in module.body if isinstance(node, ast.ClassDef)) - machine_class = next(filter(lambda x: x.name == "Machine", class_definitions)) + machine_class = next(filter(lambda x: x.name == "BaseMachine", class_definitions)) assert machine_class is not None function_definitions = [ diff --git a/nixos/lib/test-driver/src/test_driver/__init__.py b/nixos/lib/test-driver/src/test_driver/__init__.py index 422cf39172333..0f42f2842c77e 100755 --- a/nixos/lib/test-driver/src/test_driver/__init__.py +++ b/nixos/lib/test-driver/src/test_driver/__init__.py @@ -1,6 +1,8 @@ import argparse import os +import sys import time +import warnings from pathlib import Path import ptpython.ipython @@ -16,7 +18,7 @@ class EnvDefault(argparse.Action): - """An argpars Action that takes values from the specified + """An argparse Action that takes values from the specified environment variable as the flags default value. """ @@ -55,9 +57,15 @@ def writeable_dir(arg: str) -> Path: def main() -> None: arg_parser = argparse.ArgumentParser(prog="nixos-test-driver") arg_parser.add_argument( - "-K", "--keep-vm-state", - help="re-use a VM state coming from a previous run", + help=argparse.SUPPRESS, + dest="keep_machine_state", + action="store_true", + ) + arg_parser.add_argument( + "-K", + "--keep-machine-state", + help="re-use a machine state coming from a previous run", action="store_true", ) arg_parser.add_argument( @@ -71,13 +79,37 @@ def main() -> None: help="Enable interactive debugging breakpoints for sandboxed runs", ) arg_parser.add_argument( - "--start-scripts", - metavar="START-SCRIPT", + "--vm-names", + metavar="VM-NAME", action=EnvDefault, - envvar="startScripts", + envvar="vmNames", + nargs="*", + help="names of participating virtual machines", + ) + arg_parser.add_argument( + "--vm-start-scripts", + metavar="VM-START-SCRIPT", + action=EnvDefault, + envvar="vmStartScripts", nargs="*", help="start scripts for participating virtual machines", ) + arg_parser.add_argument( + "--container-names", + metavar="CONTAINER-NAME", + action=EnvDefault, + envvar="containerNames", + nargs="*", + help="names of participating containers", + ) + arg_parser.add_argument( + "--container-start-scripts", + metavar="CONTAINER-START-SCRIPT", + action=EnvDefault, + envvar="containerStartScripts", + nargs="*", + help="start scripts for participating containers", + ) arg_parser.add_argument( "--vlans", metavar="VLAN", @@ -97,8 +129,8 @@ def main() -> None: arg_parser.add_argument( "-o", "--output_directory", - help="""The path to the directory where outputs copied from the VM will be placed. - By e.g. Machine.copy_from_vm or Machine.screenshot""", + help="""The path to the directory where outputs copied from the machine will be placed. + By e.g. NspawnMachine.copy_from_machine or QemuMachine.screenshot""", default=Path.cwd(), type=writeable_dir, ) @@ -122,6 +154,12 @@ def main() -> None: args = arg_parser.parse_args() + if "--keep-vm-state" in sys.argv: + warnings.warn( + "The flag '--keep-vm-state' is deprecated. Use '--keep-machine-state' instead.", + DeprecationWarning, + ) + output_directory = args.output_directory.resolve() logger = CompositeLogger([TerminalLogger()]) @@ -131,21 +169,33 @@ def main() -> None: if args.junit_xml: logger.add_logger(JunitXMLLogger(output_directory / args.junit_xml)) - if not args.keep_vm_state: - logger.info("Machine state will be reset. To keep it, pass --keep-vm-state") + if not args.keep_machine_state: + logger.info( + "Machine state will be reset. To keep it, pass --keep-machine-state" + ) debugger: DebugAbstract = DebugNop() if args.debug_hook_attach is not None: debugger = Debug(logger, args.debug_hook_attach) + assert len(args.vm_names) == len(args.vm_start_scripts), ( + f"the number of vm names and vm start scripts must be the same: {args.vm_names} vs. {args.vm_start_scripts}" + ) + assert len(args.container_names) == len(args.container_start_scripts), ( + f"the number of container names and container start scripts must be the same: {args.container_names} vs. {args.container_start_scripts}" + ) + with Driver( - args.start_scripts, - args.vlans, - args.testscript.read_text(), - output_directory, - logger, - args.keep_vm_state, - args.global_timeout, + vm_names=args.vm_names, + vm_start_scripts=args.vm_start_scripts, + container_names=args.container_names, + container_start_scripts=args.container_start_scripts, + vlans=args.vlans, + tests=args.testscript.read_text(), + out_dir=output_directory, + logger=logger, + keep_machine_state=args.keep_machine_state, + global_timeout=args.global_timeout, debug=debugger, ) as driver: if offset := args.dump_vsocks: @@ -170,7 +220,16 @@ def generate_driver_symbols() -> None: in user's test scripts. That list is then used by pyflakes to lint those scripts. """ - d = Driver([], [], "", Path(), CompositeLogger([])) + d = Driver( + vm_names=[], + vm_start_scripts=[], + container_names=[], + container_start_scripts=[], + vlans=[], + tests="", + out_dir=Path(), + logger=CompositeLogger([]), + ) test_symbols = d.test_symbols() with open("driver-symbols", "w") as fp: fp.write(",".join(test_symbols.keys())) diff --git a/nixos/lib/test-driver/src/test_driver/driver.py b/nixos/lib/test-driver/src/test_driver/driver.py index c4f268404cbce..8cbcb052be49c 100644 --- a/nixos/lib/test-driver/src/test_driver/driver.py +++ b/nixos/lib/test-driver/src/test_driver/driver.py @@ -1,6 +1,7 @@ import os import re import signal +import subprocess import sys import tempfile import threading @@ -16,7 +17,12 @@ from test_driver.debug import DebugAbstract, DebugNop from test_driver.errors import MachineError, RequestedAssertionFailed from test_driver.logger import AbstractLogger -from test_driver.machine import Machine, NixStartScript, retry +from test_driver.machine import ( + BaseMachine, + NspawnMachine, + QemuMachine, + retry, +) from test_driver.polling_condition import PollingCondition from test_driver.vlan import VLan @@ -63,7 +69,8 @@ class Driver: tests: str vlans: list[VLan] - machines: list[Machine] + machines_qemu: list[QemuMachine] + machines_nspawn: list[NspawnMachine] polling_conditions: list[PollingCondition] global_timeout: int race_timer: threading.Timer @@ -72,12 +79,15 @@ class Driver: def __init__( self, - start_scripts: list[str], + vm_names: list[str], + vm_start_scripts: list[str], + container_names: list[str], + container_start_scripts: list[str], vlans: list[int], tests: str, out_dir: Path, logger: AbstractLogger, - keep_vm_state: bool = False, + keep_machine_state: bool = False, global_timeout: int = 24 * 60 * 60 * 7, debug: DebugAbstract = DebugNop(), ): @@ -94,25 +104,95 @@ def __init__( vlans = list(set(vlans)) self.vlans = [VLan(nr, tmp_dir, self.logger) for nr in vlans] - def cmd(scripts: list[str]) -> Iterator[NixStartScript]: - for s in scripts: - yield NixStartScript(s) - self.polling_conditions = [] - self.machines = [ - Machine( - start_command=cmd, - keep_vm_state=keep_vm_state, - name=cmd.machine_name, + self.machines_qemu = [ + QemuMachine( + name=name, + start_command=vm_start_script, + keep_machine_state=keep_machine_state, tmp_dir=tmp_dir, callbacks=[self.check_polling_conditions], out_dir=self.out_dir, logger=self.logger, ) - for cmd in cmd(start_scripts) + for name, vm_start_script in zip(vm_names, vm_start_scripts) ] + if len(container_start_scripts) > 0: + self._init_nspawn_environment() + + self.machines_nspawn = [ + NspawnMachine( + name=name, + start_command=container_start_script, + tmp_dir=tmp_dir, + logger=self.logger, + keep_machine_state=keep_machine_state, + callbacks=[self.check_polling_conditions], + out_dir=self.out_dir, + ) + for name, container_start_script in zip( + container_names, + container_start_scripts, + ) + ] + + def _init_nspawn_environment(self) -> None: + assert os.geteuid() == 0, ( + f"systemd-nspawn requires root to work. You are {os.geteuid()}" + ) + + # set up prerequisites for systemd-nspawn containers. + # these are not guaranteed to be set up in the Nix sandbox. + # if running interactively as root, these will already be set up. + + # check if /run is writable by root + if not os.access("/run", os.W_OK): + Path("/run").mkdir(parents=True, exist_ok=True) + subprocess.run(["mount", "-t", "tmpfs", "none", "/run"], check=True) + Path("/run/netns").mkdir(parents=True, exist_ok=True) + + # check if /var/run is a symlink to /run + if not (os.path.exists("/var/run") and os.path.samefile("/var/run", "/run")): + Path("/var").mkdir(parents=True, exist_ok=True) + subprocess.run(["ln", "-s", "/run", "/var/run"], check=True) + + # check if /sys/fs/cgroup is mounted as cgroup2 + with open("/proc/mounts", encoding="utf-8") as mounts: + for line in mounts: + parts = line.split() + if len(parts) >= 3 and parts[1] == "/sys/fs/cgroup": + if parts[2] == "cgroup2": + break + else: + Path("/sys/fs/cgroup").mkdir(parents=True, exist_ok=True) + subprocess.run( + ["mount", "-t", "cgroup2", "none", "/sys/fs/cgroup"], check=True + ) + + # systemd-nspawn requires that /etc/os-release exists + # It supports SYSTEMD_NSPAWN_CHECK_OS_RELEASE=0, but that + # would try to "fix" it by bind mounting, which is worse. + if not os.path.isfile("/etc/os-release"): + subprocess.run(["touch", "/etc/os-release"], check=True) + + # ensure /etc/machine-id exists and is non-empty + if ( + not os.path.isfile("/etc/machine-id") + or os.path.getsize("/etc/machine-id") == 0 + ): + subprocess.run( + ["systemd-machine-id-setup"], check=True + ) # set up /etc/machine-id + + @property + def machines(self) -> list[QemuMachine | NspawnMachine]: + machines = self.machines_qemu + self.machines_nspawn + # Sort the machines by name for consistency with `nodesAndContainers` in . + machines.sort(key=lambda machine: machine.name) + return machines + def __enter__(self) -> "Driver": return self @@ -148,7 +228,8 @@ def subtest(name: str) -> Iterator[None]: general_symbols = dict( start_all=self.start_all, test_script=self.test_script, - machines=self.machines, + machines_qemu=self.machines_qemu, + machines_nspawn=self.machines_nspawn, vlans=self.vlans, driver=self, log=self.logger, @@ -161,7 +242,7 @@ def subtest(name: str) -> Iterator[None]: serial_stdout_off=self.serial_stdout_off, serial_stdout_on=self.serial_stdout_on, polling_condition=self.polling_condition, - Machine=Machine, # for typing + BaseMachine=BaseMachine, # for typing t=AssertionTester(), debug=self.debug, ) @@ -186,14 +267,14 @@ def subtest(name: str) -> Iterator[None]: def dump_machine_ssh(self, offset: int) -> None: print("SSH backdoor enabled, the machines can be accessed like this:") print( - f"{Style.BRIGHT}Note:{Style.RESET_ALL} this requires {Style.BRIGHT}systemd-ssh-proxy(1){Style.RESET_ALL} to be enabled (default on NixOS 25.05 and newer)." + f"{Style.BRIGHT}Note:{Style.RESET_ALL} vsocks require {Style.BRIGHT}systemd-ssh-proxy(1){Style.RESET_ALL} to be enabled (default on NixOS 25.05 and newer)." ) - names = [machine.name for machine in self.machines] - longest_name = len(max(names, key=len)) - for num, name in enumerate(names, start=offset + 1): + longest_name = len(max((machine.name for machine in self.machines), key=len)) + for index, machine in enumerate(self.machines, start=offset + 1): + name = machine.name spaces = " " * (longest_name - len(name) + 2) print( - f" {name}:{spaces}{Style.BRIGHT}ssh -o User=root vsock/{num}{Style.RESET_ALL}" + f" {name}:{spaces}{Style.BRIGHT}{machine.ssh_backdoor_command(index)}{Style.RESET_ALL}" ) def test_script(self) -> None: @@ -252,8 +333,16 @@ def run_tests(self) -> None: def start_all(self) -> None: """Start all machines""" with self.logger.nested("start all VMs"): + threads = [] for machine in self.machines: - machine.start() + # Create a thread for each machine's start method + t = threading.Thread(target=machine.start, name=f"start-{machine.name}") + threads.append(t) + t.start() + + # Wait for all startup threads to complete before proceeding + for t in threads: + t.join() def join_all(self) -> None: """Wait for all machines to shut down""" @@ -279,19 +368,19 @@ def create_machine( start_command: str, *, name: str | None = None, - keep_vm_state: bool = False, - ) -> Machine: + keep_machine_state: bool = False, + ) -> BaseMachine: + """ + Create a `QemuMachine`. This currently only supports qemu "nodes", not containers. + """ tmp_dir = get_tmp_dir() - cmd = NixStartScript(start_command) - name = name or cmd.machine_name - - return Machine( + return QemuMachine( tmp_dir=tmp_dir, out_dir=self.out_dir, - start_command=cmd, + start_command=start_command, name=name, - keep_vm_state=keep_vm_state, + keep_machine_state=keep_machine_state, logger=self.logger, ) diff --git a/nixos/lib/test-driver/src/test_driver/machine/__init__.py b/nixos/lib/test-driver/src/test_driver/machine/__init__.py index f722d36ae40ef..104f8939212ff 100644 --- a/nixos/lib/test-driver/src/test_driver/machine/__init__.py +++ b/nixos/lib/test-driver/src/test_driver/machine/__init__.py @@ -13,8 +13,11 @@ import tempfile import threading import time +import warnings +from abc import ABC, abstractmethod from collections.abc import Callable, Generator from contextlib import _GeneratorContextManager, contextmanager, nullcontext +from functools import cached_property from pathlib import Path from queue import Queue from typing import Any @@ -114,15 +117,30 @@ def retry(fn: Callable, timeout_seconds: int = 900) -> None: ) -class StartCommand: - """The Base Start Command knows how to append the necessary +class QemuStartCommand: + """This class knows how to append the necessary runtime qemu options as determined by a particular test driver - run. Any such start command is expected to happily receive and - append additional qemu args. + run. """ _cmd: str + def __init__(self, script: str): + self._cmd = script + + @property + def machine_name(self) -> str: + """A start script from nixos/modules/virtualiation/qemu-vm.nix. + These Nix commands have the particular characteristic that the + machine name can be extracted out of them via a regex match. + (Admittedly a _very_ implicit contract, TODO fix this eventually.) + """ + match = re.search("run-(.+)-vm$", self._cmd) + name = "machine" + if match: + name = match.group(1) + return name + def cmd( self, monitor_socket_path: Path, @@ -198,103 +216,39 @@ def run( ) -class NixStartScript(StartCommand): - """A start script from nixos/modules/virtualiation/qemu-vm.nix. - These Nix commands have the particular characteristic that the - machine name can be extracted out of them via a regex match. - (Admittedly a _very_ implicit contract, evtl. TODO fix) - """ - - def __init__(self, script: str): - self._cmd = script - - @property - def machine_name(self) -> str: - match = re.search("run-(.+)-vm$", self._cmd) - name = "machine" - if match: - name = match.group(1) - return name - - -class Machine: - """A handle to the machine with this name, that also knows how to manage - the machine lifecycle with the help of a start script / command.""" - +class BaseMachine(ABC): name: str - out_dir: Path - tmp_dir: Path - shared_dir: Path - state_dir: Path - monitor_path: Path - qmp_path: Path - shell_path: Path - - start_command: StartCommand - keep_vm_state: bool - - process: subprocess.Popen | None - pid: int | None - monitor: socket.socket | None - qmp_client: QMPSession | None - shell: socket.socket | None - serial_thread: threading.Thread | None - - booted: bool - connected: bool - # Store last serial console lines for use - # of wait_for_console_text - last_lines: Queue = Queue() - # Store all console output for full log retrieval - full_console_log: list[str] callbacks: list[Callable] + tmp_dir: Path + keep_machine_state: bool def __repr__(self) -> str: - return f"" + return f"<{self.__class__.__name__} '{self.name}'>" def __init__( self, out_dir: Path, - tmp_dir: Path, - start_command: StartCommand, + name: str, logger: AbstractLogger, - name: str = "machine", - keep_vm_state: bool = False, - callbacks: list[Callable] | None = None, + tmp_dir: Path, + callbacks: list[Callable] | None, + keep_machine_state: bool, ) -> None: self.out_dir = out_dir - self.tmp_dir = tmp_dir - self.keep_vm_state = keep_vm_state self.name = name - self.start_command = start_command - self.callbacks = callbacks if callbacks is not None else [] self.logger = logger - self.full_console_log = [] + self.callbacks = callbacks if callbacks is not None else [] + self.tmp_dir = tmp_dir - # set up directories - self.shared_dir = self.tmp_dir / "shared-xchg" - self.shared_dir.mkdir(mode=0o700, exist_ok=True) + self.keep_machine_state = keep_machine_state self.state_dir = self.tmp_dir / f"vm-state-{self.name}" - self.monitor_path = self.state_dir / "monitor" - self.qmp_path = self.state_dir / "qmp" - self.shell_path = self.state_dir / "shell" - if (not self.keep_vm_state) and self.state_dir.exists(): + if (not self.keep_machine_state) and self.state_dir.exists(): self.cleanup_statedir() self.state_dir.mkdir(mode=0o700, exist_ok=True) - self.process = None - self.pid = None - self.monitor = None - self.qmp_client = None - self.shell = None - self.serial_thread = None - - self.booted = False - self.connected = False - - def is_up(self) -> bool: - return self.booted and self.connected + self.shared_dir = self.tmp_dir / "shared-xchg" + self.shared_dir.mkdir(mode=0o700, exist_ok=True) def log(self, msg: str) -> None: """ @@ -313,28 +267,37 @@ def nested(self, msg: str, attrs: dict[str, str] = {}) -> _GeneratorContextManag my_attrs.update(attrs) return self.logger.nested(msg, my_attrs) - def wait_for_monitor_prompt(self) -> str: - assert self.monitor is not None - answer = "" - while True: - undecoded_answer = self.monitor.recv(1024) - if not undecoded_answer: - break - answer += undecoded_answer.decode() - if answer.endswith("(qemu) "): - break - return answer + @abstractmethod + def is_up(self) -> bool: ... - def send_monitor_command(self, command: str) -> str: + @abstractmethod + def start(self) -> None: ... + + @abstractmethod + def wait_for_shutdown(self) -> None: ... + + def systemctl(self, q: str, user: str | None = None) -> tuple[int, str]: """ - Send a command to the QEMU monitor. This allows attaching - virtual USB disks to a running machine, among other things. + Runs `systemctl` commands with optional support for + `systemctl --user` + + ```py + # run `systemctl list-jobs --no-pager` + machine.systemctl("list-jobs --no-pager") + + # spawn a shell for `any-user` and run + # `systemctl --user list-jobs --no-pager` + machine.systemctl("list-jobs --no-pager", "any-user") + ``` """ - self.run_callbacks() - message = f"{command}\n".encode() - assert self.monitor is not None - self.monitor.send(message) - return self.wait_for_monitor_prompt() + if user is not None: + q = q.replace("'", "\\'") + return self.execute( + f"su -l {user} --shell /bin/sh -c " + "$'XDG_RUNTIME_DIR=/run/user/`id -u` " + f"systemctl --user {q}'" + ) + return self.execute(f"systemctl {q}") def wait_for_unit( self, unit: str, user: str | None = None, timeout: int = 900 @@ -424,29 +387,6 @@ def get_unit_property( assert match[1] == property, invalid_output_message return match[2] - def systemctl(self, q: str, user: str | None = None) -> tuple[int, str]: - """ - Runs `systemctl` commands with optional support for - `systemctl --user` - - ```py - # run `systemctl list-jobs --no-pager` - machine.systemctl("list-jobs --no-pager") - - # spawn a shell for `any-user` and run - # `systemctl --user list-jobs --no-pager` - machine.systemctl("list-jobs --no-pager", "any-user") - ``` - """ - if user is not None: - q = q.replace("'", "\\'") - return self.execute( - f"su -l {user} --shell /bin/sh -c " - "$'XDG_RUNTIME_DIR=/run/user/`id -u` " - f"systemctl --user {q}'" - ) - return self.execute(f"systemctl {q}") - def require_unit_state(self, unit: str, require_state: str = "active") -> None: """ Assert that the current state of a unit has a specific value. The default state is "active". @@ -462,64 +402,444 @@ def require_unit_state(self, unit: str, require_state: str = "active") -> None: f"'{require_state}' but it is in state '{state}'" ) - def _next_newline_closed_block_from_shell(self) -> str: - assert self.shell - output_buffer = [] - while True: - # This receives up to 4096 bytes from the socket - chunk = self.shell.recv(4096) - if not chunk: - # Probably a broken pipe, return the output we have - break - - decoded = chunk.decode() - output_buffer += [decoded] - if decoded[-1] == "\n": - break - return "".join(output_buffer) + def succeed(self, *commands: str, timeout: int | None = None) -> str: + """ + Execute a shell command, raising an exception if the exit status is + not zero, otherwise returning the standard output. Similar to `execute`, + except that the timeout is `None` by default. See `execute` for details on + command execution. + """ + output = "" + for command in commands: + with self.nested(f"must succeed: {command}"): + (status, out) = self.execute(command, timeout=timeout) + if status != 0: + self.log(f"output: {out}") + raise RequestedAssertionFailed( + f"command `{command}` failed (exit code {status})" + ) + output += out + return output - def execute( - self, - command: str, - check_return: bool = True, - check_output: bool = True, - timeout: int | None = 900, - ) -> tuple[int, str]: + def fail(self, *commands: str, timeout: int | None = None) -> str: """ - Execute a shell command, returning a list `(status, stdout)`. + Like `succeed`, but raising an exception if the command returns a zero + status. + """ + output = "" + for command in commands: + with self.nested(f"must fail: {command}"): + (status, out) = self.execute(command, timeout=timeout) + if status == 0: + raise RequestedAssertionFailed( + f"command `{command}` unexpectedly succeeded" + ) + output += out + return output - Commands are run with `set -euo pipefail` set: + def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: + """ + Repeat a shell command with 1-second intervals until it succeeds. + Has a default timeout of 900 seconds which can be modified, e.g. + `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on + command execution. + Throws an exception on timeout. + """ + output = "" - - If several commands are separated by `;` and one fails, the - command as a whole will fail. + def check_success(_last_try: bool) -> bool: + nonlocal output + status, output = self.execute(command, timeout=timeout) + return status == 0 - - For pipelines, the last non-zero exit status will be returned - (if there is one; otherwise zero will be returned). + with self.nested(f"waiting for success: {command}"): + retry(check_success, timeout) + return output - - Dereferencing unset variables fails the command. + def wait_until_fails(self, command: str, timeout: int = 900) -> str: + """ + Like `wait_until_succeeds`, but repeating the command until it fails. + """ + output = "" - - It will wait for stdout to be closed. + def check_failure(_last_try: bool) -> bool: + nonlocal output + status, output = self.execute(command, timeout=timeout) + return status != 0 - If the command detaches, it must close stdout, as `execute` will wait - for this to consume all output reliably. This can be achieved by - redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or - a file. Examples of detaching commands are `sleep 365d &`, where the - shell forks a new process that can write to stdout and `xclip -i`, where - the `xclip` command itself forks without closing stdout. + with self.nested(f"waiting for failure: {command}"): + retry(check_failure, timeout) + return output - Takes an optional parameter `check_return` that defaults to `True`. - Setting this parameter to `False` will not check for the return code - and return -1 instead. This can be used for commands that shut down - the VM and would therefore break the pipe that would be used for - retrieving the return code. + def sleep(self, secs: int) -> None: + # We want to sleep in *guest* time, not *host* time. + self.succeed(f"sleep {secs}") - A timeout for the command can be specified (in seconds) using the optional - `timeout` parameter, e.g., `execute(cmd, timeout=10)` or - `execute(cmd, timeout=None)`. The default is 900 seconds. + def wait_for_file(self, filename: str, timeout: int = 900) -> None: """ - self.run_callbacks() - self.connect() - + Waits until the file exists in the machine's file system. + """ + + def check_file(_last_try: bool) -> bool: + status, _ = self.execute(f"test -e {filename}") + return status == 0 + + with self.nested(f"waiting for file '{filename}'"): + retry(check_file, timeout) + + def wait_for_open_port( + self, port: int, addr: str = "localhost", timeout: int = 900 + ) -> None: + """ + Wait until a process is listening on the given TCP port and IP address + (default `localhost`). + """ + + def port_is_open(_last_try: bool) -> bool: + status, _ = self.execute(f"nc -z {addr} {port}") + return status == 0 + + with self.nested(f"waiting for TCP port {port} on {addr}"): + retry(port_is_open, timeout) + + def wait_for_open_unix_socket( + self, addr: str, is_datagram: bool = False, timeout: int = 900 + ) -> None: + """ + Wait until a process is listening on the given UNIX-domain socket + (default to a UNIX-domain stream socket). + """ + + nc_flags = [ + "-z", + "-uU" if is_datagram else "-U", + ] + + def socket_is_open(_last_try: bool) -> bool: + status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}") + return status == 0 + + with self.nested( + f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'" + ): + retry(socket_is_open, timeout) + + def wait_for_closed_port( + self, port: int, addr: str = "localhost", timeout: int = 900 + ) -> None: + """ + Wait until nobody is listening on the given TCP port and IP address + (default `localhost`). + """ + + def port_is_closed(_last_try: bool) -> bool: + status, _ = self.execute(f"nc -z {addr} {port}") + return status != 0 + + with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): + retry(port_is_closed, timeout) + + def start_job(self, jobname: str, user: str | None = None) -> tuple[int, str]: + """ + Start systemd service. + """ + return self.systemctl(f"start {jobname}", user) + + def stop_job(self, jobname: str, user: str | None = None) -> tuple[int, str]: + """ + Stop systemd service. + """ + return self.systemctl(f"stop {jobname}", user) + + def execute( + self, + command: str, + check_return: bool = True, + check_output: bool = True, + timeout: int | None = 900, + ) -> tuple[int, str]: + self.run_callbacks() + return self._execute( + command=command, + check_return=check_return, + check_output=check_output, + timeout=timeout, + ) + + @abstractmethod + def _execute( + self, + command: str, + check_return: bool = True, + check_output: bool = True, + timeout: int | None = 900, + ) -> tuple[int, str]: ... + + def run_callbacks(self) -> None: + for callback in self.callbacks: + callback() + + def cleanup_statedir(self) -> None: + shutil.rmtree(self.state_dir) + self.log(f"deleting machine state directory {self.state_dir}") + self.log("if you want to keep the machine state, pass --keep-machine-state") + + def copy_from_machine(self, source: str, target_dir: str = "") -> None: + """Copy a file from the machine (specified by an in-machine source path) to a path + relative to `$out`. The file is copied via the `shared_dir` shared among + all the machines (using a temporary directory). + """ + # Compute the source, target, and intermediate shared file names + vm_src = Path(source) + with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: + shared_temp = Path(shared_td) + vm_shared_temp = Path("/tmp/shared") / shared_temp.name + vm_intermediate = vm_shared_temp / vm_src.name + intermediate = shared_temp / vm_src.name + # Copy the file to the shared directory inside machines + self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) + self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) + abs_target = self.out_dir / target_dir / vm_src.name + abs_target.parent.mkdir(exist_ok=True, parents=True) + # Copy the file from the shared directory outside machines + if intermediate.is_dir(): + shutil.copytree(intermediate, abs_target) + else: + shutil.copy(intermediate, abs_target) + + @warnings.deprecated("Use copy_from_machine() instead") + def copy_from_vm(self, source: str, target_dir: str = "") -> None: + self.copy_from_machine(source, target_dir) + + def copy_from_host_via_shell(self, source: str, target: str) -> None: + """Copy a file from the host into the guest by piping it over the + shell into the destination file. Works without host-guest shared folder. + Prefer copy_from_host for whenever possible. + """ + with open(source, "rb") as fh: + content_b64 = base64.b64encode(fh.read()).decode() + self.succeed( + f"mkdir -p $(dirname {target})", + f"echo -n {content_b64} | base64 -d > {target}", + ) + + def copy_from_host(self, source: str, target: str) -> None: + """ + Copies a file from host to machine, e.g., + `copy_from_host("myfile", "/etc/my/important/file")`. + + The first argument is the file on the host. Note that the "host" refers + to the environment in which the test driver runs, which is typically the + Nix build sandbox. + + The second argument is the location of the file on the machine that will + be written to. + + The file is copied via the `shared_dir` directory which is shared among + all the machines (using a temporary directory). + The access rights bits will mimic the ones from the host file and + user:group will be root:root. + """ + host_src = Path(source) + vm_target = Path(target) + with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: + shared_temp = Path(shared_td) + host_intermediate = shared_temp / host_src.name + vm_shared_temp = Path("/tmp/shared") / shared_temp.name + vm_intermediate = vm_shared_temp / host_src.name + + self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) + if host_src.is_dir(): + shutil.copytree(host_src, host_intermediate) + else: + shutil.copy(host_src, host_intermediate) + self.succeed(make_command(["mkdir", "-p", vm_target.parent])) + self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) + + +class QemuMachine(BaseMachine): + """A handle to the machine with this name, that also knows how to manage + the machine lifecycle with the help of a start script / command.""" + + name: str + out_dir: Path + shared_dir: Path + state_dir: Path + monitor_path: Path + qmp_path: Path + shell_path: Path + + start_command: QemuStartCommand + + process: subprocess.Popen | None + pid: int | None + monitor: socket.socket | None + qmp_client: QMPSession | None + shell: socket.socket | None + serial_thread: threading.Thread | None + + booted: bool + connected: bool + # Store last serial console lines for use + # of wait_for_console_text + last_lines: Queue = Queue() + # Store all console output for full log retrieval + full_console_log: list[str] + + def __init__( + self, + out_dir: Path, + tmp_dir: Path, + start_command: str, + logger: AbstractLogger, + name: str | None = None, + keep_machine_state: bool = False, + callbacks: list[Callable] | None = None, + ) -> None: + self.start_command = QemuStartCommand(start_command) + super().__init__( + out_dir=out_dir, + name=name or self.start_command.machine_name, + logger=logger, + callbacks=callbacks, + tmp_dir=tmp_dir, + keep_machine_state=keep_machine_state, + ) + + self.full_console_log = [] + + # set up directories + self.monitor_path = self.state_dir / "monitor" + self.qmp_path = self.state_dir / "qmp" + self.shell_path = self.state_dir / "shell" + + self.process = None + self.pid = None + self.monitor = None + self.qmp_client = None + self.shell = None + self.serial_thread = None + + self.booted = False + self.connected = False + + def ssh_backdoor_command(self, index: int) -> str: + return f"ssh -o User=root vsock/{index}" + + def is_up(self) -> bool: + return self.booted and self.connected + + def wait_for_monitor_prompt(self) -> str: + assert self.monitor is not None + answer = "" + while True: + undecoded_answer = self.monitor.recv(1024) + if not undecoded_answer: + break + answer += undecoded_answer.decode() + if answer.endswith("(qemu) "): + break + return answer + + def send_monitor_command(self, command: str) -> str: + """ + Send a command to the QEMU monitor. This allows attaching + virtual USB disks to a running machine, among other things. + """ + self.run_callbacks() + message = f"{command}\n".encode() + assert self.monitor is not None + self.monitor.send(message) + return self.wait_for_monitor_prompt() + + def _next_newline_closed_block_from_shell(self) -> str: + assert self.shell + output_buffer = [] + while True: + # This receives up to 4096 bytes from the socket + chunk = self.shell.recv(4096) + if not chunk: + # Probably a broken pipe, return the output we have + break + + decoded = chunk.decode() + output_buffer += [decoded] + if decoded[-1] == "\n": + break + return "".join(output_buffer) + + def get_tty_text(self, tty: str) -> str: + """ + Get the output printed to a given TTY. + """ + status, output = self.execute( + f"fold -w$(stty -F /dev/tty{tty} size | awk '{{print $2}}') /dev/vcs{tty}" + ) + return output + + def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None: + """Wait until the visible output on the chosen TTY matches regular + expression. Throws an exception on timeout. + """ + matcher = re.compile(regexp) + + def tty_matches(last_try: bool) -> bool: + text = self.get_tty_text(tty) + if last_try: + self.log( + f"Last chance to match /{regexp}/ on TTY{tty}, " + f"which currently contains: {text}" + ) + return len(matcher.findall(text)) > 0 + + with self.nested(f"waiting for {regexp} to appear on tty {tty}"): + retry(tty_matches, timeout) + + def dump_tty_contents(self, tty: str) -> None: + """Debugging: Dump the contents of the TTY""" + self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") + + def _execute( + self, + command: str, + check_return: bool = True, + check_output: bool = True, + timeout: int | None = 900, + ) -> tuple[int, str]: + """ + Execute a shell command, returning a list `(status, stdout)`. + + Commands are run with `set -euo pipefail` set: + + - If several commands are separated by `;` and one fails, the + command as a whole will fail. + + - For pipelines, the last non-zero exit status will be returned + (if there is one; otherwise zero will be returned). + + - Dereferencing unset variables fails the command. + + - It will wait for stdout to be closed. + + If the command detaches, it must close stdout, as `execute` will wait + for this to consume all output reliably. This can be achieved by + redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or + a file. Examples of detaching commands are `sleep 365d &`, where the + shell forks a new process that can write to stdout and `xclip -i`, where + the `xclip` command itself forks without closing stdout. + + Takes an optional parameter `check_return` that defaults to `True`. + Setting this parameter to `False` will not check for the return code + and return -1 instead. This can be used for commands that shut down + the machine and would therefore break the pipe that would be used for + retrieving the return code. + + A timeout for the command can be specified (in seconds) using the optional + `timeout` parameter, e.g., `execute(cmd, timeout=10)` or + `execute(cmd, timeout=None)`. The default is 900 seconds. + """ + self.connect() + # Always run command with shell opts command = f"set -euo pipefail; {command}" @@ -588,84 +908,15 @@ def console_interact(self) -> None: assert self.process assert self.process.stdin - while True: - try: - char = sys.stdin.buffer.read(1) - except KeyboardInterrupt: - break - if char == b"": # ctrl+d - self.log("Closing connection to the console") - break - self.send_console(char.decode()) - - def succeed(self, *commands: str, timeout: int | None = None) -> str: - """ - Execute a shell command, raising an exception if the exit status is - not zero, otherwise returning the standard output. Similar to `execute`, - except that the timeout is `None` by default. See `execute` for details on - command execution. - """ - output = "" - for command in commands: - with self.nested(f"must succeed: {command}"): - (status, out) = self.execute(command, timeout=timeout) - if status != 0: - self.log(f"output: {out}") - raise RequestedAssertionFailed( - f"command `{command}` failed (exit code {status})" - ) - output += out - return output - - def fail(self, *commands: str, timeout: int | None = None) -> str: - """ - Like `succeed`, but raising an exception if the command returns a zero - status. - """ - output = "" - for command in commands: - with self.nested(f"must fail: {command}"): - (status, out) = self.execute(command, timeout=timeout) - if status == 0: - raise RequestedAssertionFailed( - f"command `{command}` unexpectedly succeeded" - ) - output += out - return output - - def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: - """ - Repeat a shell command with 1-second intervals until it succeeds. - Has a default timeout of 900 seconds which can be modified, e.g. - `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on - command execution. - Throws an exception on timeout. - """ - output = "" - - def check_success(_last_try: bool) -> bool: - nonlocal output - status, output = self.execute(command, timeout=timeout) - return status == 0 - - with self.nested(f"waiting for success: {command}"): - retry(check_success, timeout) - return output - - def wait_until_fails(self, command: str, timeout: int = 900) -> str: - """ - Like `wait_until_succeeds`, but repeating the command until it fails. - """ - output = "" - - def check_failure(_last_try: bool) -> bool: - nonlocal output - status, output = self.execute(command, timeout=timeout) - return status != 0 - - with self.nested(f"waiting for failure: {command}"): - retry(check_failure, timeout) - return output + while True: + try: + char = sys.stdin.buffer.read(1) + except KeyboardInterrupt: + break + if char == b"": # ctrl+d + self.log("Closing connection to the console") + break + self.send_console(char.decode()) def wait_for_shutdown(self) -> None: """ @@ -710,33 +961,6 @@ def wait_for_qmp_event( if elapsed >= timeout: raise TimeoutError - def get_tty_text(self, tty: str) -> str: - """ - Get the output printed to a given TTY. - """ - status, output = self.execute( - f"fold -w$(stty -F /dev/tty{tty} size | awk '{{print $2}}') /dev/vcs{tty}" - ) - return output - - def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None: - """Wait until the visible output on the chosen TTY matches regular - expression. Throws an exception on timeout. - """ - matcher = re.compile(regexp) - - def tty_matches(last_try: bool) -> bool: - text = self.get_tty_text(tty) - if last_try: - self.log( - f"Last chance to match /{regexp}/ on TTY{tty}, " - f"which currently contains: {text}" - ) - return len(matcher.findall(text)) > 0 - - with self.nested(f"waiting for {regexp} to appear on tty {tty}"): - retry(tty_matches, timeout) - def send_chars(self, chars: str, delay: float | None = 0.01) -> None: r""" Simulate typing a sequence of characters on the virtual keyboard, @@ -759,70 +983,6 @@ def check_file(_last_try: bool) -> bool: with self.nested(f"waiting for file '{filename}'"): retry(check_file, timeout) - def wait_for_open_port( - self, port: int, addr: str = "localhost", timeout: int = 900 - ) -> None: - """ - Wait until a process is listening on the given TCP port and IP address - (default `localhost`). - """ - - def port_is_open(_last_try: bool) -> bool: - status, _ = self.execute(f"nc -z {addr} {port}") - return status == 0 - - with self.nested(f"waiting for TCP port {port} on {addr}"): - retry(port_is_open, timeout) - - def wait_for_open_unix_socket( - self, addr: str, is_datagram: bool = False, timeout: int = 900 - ) -> None: - """ - Wait until a process is listening on the given UNIX-domain socket - (default to a UNIX-domain stream socket). - """ - - nc_flags = [ - "-z", - "-uU" if is_datagram else "-U", - ] - - def socket_is_open(_last_try: bool) -> bool: - status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}") - return status == 0 - - with self.nested( - f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'" - ): - retry(socket_is_open, timeout) - - def wait_for_closed_port( - self, port: int, addr: str = "localhost", timeout: int = 900 - ) -> None: - """ - Wait until nobody is listening on the given TCP port and IP address - (default `localhost`). - """ - - def port_is_closed(_last_try: bool) -> bool: - status, _ = self.execute(f"nc -z {addr} {port}") - return status != 0 - - with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): - retry(port_is_closed, timeout) - - def start_job(self, jobname: str, user: str | None = None) -> tuple[int, str]: - """ - Start systemd service. - """ - return self.systemctl(f"start {jobname}", user) - - def stop_job(self, jobname: str, user: str | None = None) -> tuple[int, str]: - """ - Stop systemd service. - """ - return self.systemctl(f"stop {jobname}", user) - def connect(self) -> None: """ Wait for a connection to the guest root shell @@ -902,78 +1062,6 @@ def screenshot(self, filename: str) -> None: f"Cannot convert screenshot (pnmtopng returned code {ret.returncode})" ) - def copy_from_host_via_shell(self, source: str, target: str) -> None: - """Copy a file from the host into the guest by piping it over the - shell into the destination file. Works without host-guest shared folder. - Prefer copy_from_host for whenever possible. - """ - with open(source, "rb") as fh: - content_b64 = base64.b64encode(fh.read()).decode() - self.succeed( - f"mkdir -p $(dirname {target})", - f"echo -n {content_b64} | base64 -d > {target}", - ) - - def copy_from_host(self, source: str, target: str) -> None: - """ - Copies a file from host to machine, e.g., - `copy_from_host("myfile", "/etc/my/important/file")`. - - The first argument is the file on the host. Note that the "host" refers - to the environment in which the test driver runs, which is typically the - Nix build sandbox. - - The second argument is the location of the file on the machine that will - be written to. - - The file is copied via the `shared_dir` directory which is shared among - all the VMs (using a temporary directory). - The access rights bits will mimic the ones from the host file and - user:group will be root:root. - """ - host_src = Path(source) - vm_target = Path(target) - with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: - shared_temp = Path(shared_td) - host_intermediate = shared_temp / host_src.name - vm_shared_temp = Path("/tmp/shared") / shared_temp.name - vm_intermediate = vm_shared_temp / host_src.name - - self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) - if host_src.is_dir(): - shutil.copytree(host_src, host_intermediate) - else: - shutil.copy(host_src, host_intermediate) - self.succeed(make_command(["mkdir", "-p", vm_target.parent])) - self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) - - def copy_from_vm(self, source: str, target_dir: str = "") -> None: - """Copy a file from the VM (specified by an in-VM source path) to a path - relative to `$out`. The file is copied via the `shared_dir` shared among - all the VMs (using a temporary directory). - """ - # Compute the source, target, and intermediate shared file names - vm_src = Path(source) - with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: - shared_temp = Path(shared_td) - vm_shared_temp = Path("/tmp/shared") / shared_temp.name - vm_intermediate = vm_shared_temp / vm_src.name - intermediate = shared_temp / vm_src.name - # Copy the file to the shared directory inside VM - self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) - self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) - abs_target = self.out_dir / target_dir / vm_src.name - abs_target.parent.mkdir(exist_ok=True, parents=True) - # Copy the file from the shared directory outside VM - if intermediate.is_dir(): - shutil.copytree(intermediate, abs_target) - else: - shutil.copy(intermediate, abs_target) - - def dump_tty_contents(self, tty: str) -> None: - """Debugging: Dump the contents of the TTY""" - self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") - def get_screen_text_variants(self) -> list[str]: """ Return a list of different interpretations of what is currently @@ -1154,11 +1242,6 @@ def process_serial_output() -> None: self.log(f"QEMU running (pid {self.pid})") - def cleanup_statedir(self) -> None: - shutil.rmtree(self.state_dir) - self.logger.log(f"deleting VM state directory {self.state_dir}") - self.logger.log("if you want to keep the VM state, pass --keep-vm-state") - def shutdown(self) -> None: """ Shut down the machine, waiting for the VM to exit. @@ -1234,10 +1317,6 @@ def window_is_visible(last_try: bool) -> bool: with self.nested("waiting for a window to appear"): retry(window_is_visible, timeout) - def sleep(self, secs: int) -> None: - # We want to sleep in *guest* time, not *host* time. - self.succeed(f"sleep {secs}") - def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: """ Forward a TCP port on the host to a TCP port on the guest. @@ -1264,7 +1343,7 @@ def unblock(self) -> None: def release(self) -> None: if self.pid is None: return - self.logger.info(f"kill machine (pid {self.pid})") + self.logger.info(f"kill QemuMachine (pid {self.pid})") assert self.process assert self.shell assert self.monitor @@ -1278,10 +1357,6 @@ def release(self) -> None: if self.qmp_client: self.qmp_client.close() - def run_callbacks(self) -> None: - for callback in self.callbacks: - callback() - def switch_root(self) -> None: """ Transition from stage 1 to stage 2. This requires the @@ -1296,3 +1371,266 @@ def switch_root(self) -> None: ) self.connected = False self.connect() + + +class NspawnMachine(BaseMachine): + """ + A handle to a systemd-nspawn container machine with this name, that also + knows how to manage the machine lifecycle with the help of a start script / command. + """ + + start_command: str + tmp_dir: Path + process: subprocess.Popen | None + pid: int | None + + machine_sock_path: Path + machine_sock: socket.socket | None + + @staticmethod + def machine_name_from_start_command(start_command: str) -> str: + match = re.search("run-(.+)-nspawn", os.path.basename(start_command)) + assert match is not None, f"Could not extract node name from {start_command}" + return match.group(1) + + def __init__( + self, + out_dir: Path, + name: str | None, + start_command: str, + tmp_dir: Path, + logger: AbstractLogger, + callbacks: list[Callable] | None = None, + keep_machine_state: bool = False, + ): + # TODO: don't compute `name` from `start_command` path, instead thread it down explicitly. + # See analogous TODO in `QemuStartCommand::machine_name`. + super().__init__( + out_dir=out_dir, + name=name or self.machine_name_from_start_command(start_command), + logger=logger, + callbacks=callbacks, + tmp_dir=tmp_dir, + keep_machine_state=keep_machine_state, + ) + + self.start_command = start_command + self.process = None + self.pid = None + + self.machine_sock_path = self.tmp_dir / f"{self.name}-nspawn.sock" + + def ssh_backdoor_command(self, index: int) -> str: + # documented in systemd-ssh-generator(8) and https://systemd.io/CONTAINER_INTERFACE/ + socket_path = f"/run/systemd/nspawn/unix-export/{self.name}/ssh" + proxy_cmd = f"socat - UNIX-CLIENT:{socket_path}" + return f'ssh -o User=root -o ProxyCommand="{proxy_cmd}" bash' + + def release(self) -> None: + if self.pid is None: + return + + if self.machine_sock: + self.machine_sock.close() + + self.logger.info(f"kill NspawnMachine (pid {self.pid})") + assert self.process is not None + self.process.terminate() + self.process = None + + def is_up(self) -> bool: + return self.process is not None + + def _poll_socket(self) -> tuple[bool, int | None]: + """Non-blocking check of container status via socket. + Returns (is_ready, leader_pid). + """ + assert self.machine_sock is not None + ready = False + leader_pid = None + try: + data, _ = self.machine_sock.recvfrom(4096) + msg = data.decode() + for line in msg.splitlines(): + if line == "READY=1": + ready = True + if line.startswith("X_NSPAWN_LEADER_PID="): + leader_pid = int(line.split("=")[1]) + except OSError: + pass + return ready, leader_pid + + @cached_property + def get_systemd_process(self) -> int: + """Block until startup is complete and return the PID of the container's systemd process.""" + assert self.process is not None + + container_pid: int | None = None + is_ready = False + + start_time = time.monotonic() + last_warning = start_time + delay = 0.01 + max_delay = 0.5 + + while not is_ready or container_pid is None: + # Poll the socket until we have the container leader PID + if self.process.poll() is not None: + raise MachineError("systemd-nspawn process exited unexpectedly") + + # Print periodic warnings every 10s so the user knows we aren't deadlocked + now = time.monotonic() + if now - last_warning > 10.0: + self.log( + f"still waiting for container '{self.name}' to reach ready state..." + ) + last_warning = now + + # Poll and update our local tracking variables + ready_now, pid_now = self._poll_socket() + if ready_now: + is_ready = True + if pid_now: + container_pid = pid_now + + if not (is_ready and container_pid): + time.sleep(delay) + delay = min(delay * 2, max_delay) + + return container_pid + + def _execute( + self, + command: str, + check_return: bool = True, + check_output: bool = True, + timeout: int | None = 900, + ) -> tuple[int, str]: + self.start() + + container_pid = self.get_systemd_process + nsenter = shutil.which("nsenter") + assert nsenter is not None + + # Sourcing /etc/profile on every call of `_execute` ensures a correct shell + # environment (correct PATH, etc.). This is slower than the QEMU version. + # + # NOTE If the test calls switch-to-configuration (with a differently configured specialization) + # this will use the /etc/profile of the new specialisation while `QemuMachine` nodes + # will continue to use the original /etc/profile. + command = f"set -eo pipefail; source /etc/profile; set -u; {command}" + + cp = subprocess.run( + [ + nsenter, + "--target", + str(container_pid), + "--mount", + "--uts", + "--ipc", + "--net", + "--pid", + "--cgroup", + "/bin/sh", + "-c", + command, + ], + env={}, + timeout=timeout, + stdout=subprocess.PIPE, + text=True, + ) + return (cp.returncode, cp.stdout) + + def _stream_journal(self) -> None: + assert self.process is not None, "Container not started" + journal_path = self.state_dir / "var/log/journal" + + # Grab a reference to the process here so we can continue polling + # the container process to see if it has exited. + proc = self.process + + # 1. Wait for the directory to actually be created by the container + self.log(f"Waiting for journal at {journal_path}...") + max_attempts = 10 + attempts = 0 + while not journal_path.exists() and attempts < max_attempts: + time.sleep(1) + attempts += 1 + + if not journal_path.exists(): + self.log(f"Error: Journal directory {journal_path} never appeared.") + return + + # 2. Start the journalctl process + # Using a loop here handles cases where journalctl might exit unexpectedly + while proc.poll() is None: # While the container is still running + with subprocess.Popen( + [ + "journalctl", + "--follow", + f"--directory={journal_path}", + "--lines=all", + "--output=short-monotonic", + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, # Line buffered. + ) as log_proc: + assert log_proc.stdout is not None, ( + "Failed to capture journalctl output" + ) + try: + for line in iter(log_proc.stdout.readline, ""): + if line: + self.log_serial(line.rstrip()) + if proc.poll() is not None: + break + except Exception as e: + self.log(f"Error while reading journalctl output: {e}") + finally: + log_proc.terminate() + log_proc.wait() + + # If we reach here, journalctl stopped while the container is still running. + # Wait a moment before retrying to avoid CPU pegging if something is wrong. + if proc.poll() is None: + time.sleep(1) + + def start(self) -> None: + if self.process is not None: + return + + if self.machine_sock_path is not None and self.machine_sock_path.exists(): + self.machine_sock_path.unlink() + + self.machine_sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) + self.machine_sock.bind(str(self.machine_sock_path)) + self.machine_sock.setblocking(False) + + self.process = subprocess.Popen( + [self.start_command], + env={ + "RUN_NSPAWN_ROOT_DIR": str(self.state_dir), + "RUN_NSPAWN_SHARED_DIR": str(self.shared_dir), + "NOTIFY_SOCKET": self.machine_sock_path.as_posix(), + }, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + self.pid = self.process.pid + + self.log(f"systemd-nspawn running (pid {self.pid})") + + journal_thread = threading.Thread(target=self._stream_journal, daemon=True) + journal_thread.start() + + def wait_for_shutdown(self) -> None: + if self.process is None: + return + + with self.nested("waiting for the container to power off"): + self.process.wait() + self.process = None diff --git a/nixos/lib/test-driver/src/test_driver/vlan.py b/nixos/lib/test-driver/src/test_driver/vlan.py index 89ca33165b4db..eed66dcd13cb8 100644 --- a/nixos/lib/test-driver/src/test_driver/vlan.py +++ b/nixos/lib/test-driver/src/test_driver/vlan.py @@ -50,6 +50,8 @@ class VLan: pid: int fd: io.TextIOBase + plug_process: subprocess.Popen + logger: AbstractLogger def __repr__(self) -> str: @@ -58,6 +60,7 @@ def __repr__(self) -> str: def __init__(self, nr: int, tmp_dir: Path, logger: AbstractLogger): self.nr = nr self.socket_dir = tmp_dir / f"vde{self.nr}.ctl" + self.tap_name = f"vde-tap{self.nr}" self.logger = logger # TODO: don't side-effect environment here @@ -114,6 +117,13 @@ def __init__(self, nr: int, tmp_dir: Path, logger: AbstractLogger): if "1000 Success" in line: break + # This is needed to allow systemd-nspawn containers to communicate + # with VMs connected to the VLAN. + self.logger.info(f"creating tap interface {self.tap_name}") + self.plug_process = subprocess.Popen( + ["vde_plug2tap", "-s", self.socket_dir, self.tap_name], + ) + assert (self.socket_dir / "ctl").exists(), "cannot start vde_switch" self.logger.info(f"running vlan (pid {self.pid}; ctl {self.socket_dir})") @@ -122,4 +132,7 @@ def stop(self) -> None: self.logger.info(f"kill vlan (pid {self.pid})") assert self.process.stdin is not None self.process.stdin.close() + if self.plug_process: + self.plug_process.terminate() + self.plug_process.wait() self.process.terminate() diff --git a/nixos/lib/test-script-prepend.py b/nixos/lib/test-script-prepend.py index 6be20270c6ccb..b9a12d5f2f546 100644 --- a/nixos/lib/test-script-prepend.py +++ b/nixos/lib/test-script-prepend.py @@ -4,7 +4,7 @@ from test_driver.debug import DebugAbstract from test_driver.driver import Driver from test_driver.vlan import VLan -from test_driver.machine import Machine +from test_driver.machine import BaseMachine, NspawnMachine, QemuMachine from test_driver.logger import AbstractLogger from typing import Callable, Iterator, ContextManager, Optional, List, Dict, Any, Union from typing_extensions import Protocol @@ -34,8 +34,9 @@ def __call__( start_command: str | dict, *, name: Optional[str] = None, - keep_vm_state: bool = False, - ) -> Machine: + keep_machine_state: bool = False, + **kwargs: Any, # to allow usage of deprecated keep_vm_state + ) -> BaseMachine: raise Exception("This is just type information for the Nix test driver") @@ -43,7 +44,7 @@ def __call__( subtest: Callable[[str], ContextManager[None]] retry: RetryProtocol test_script: Callable[[], None] -machines: List[Machine] +machines: List[BaseMachine] vlans: List[VLan] driver: Driver log: AbstractLogger diff --git a/nixos/lib/testing-python.nix b/nixos/lib/testing-python.nix index 878f9669321a8..1d2777ecb0335 100644 --- a/nixos/lib/testing-python.nix +++ b/nixos/lib/testing-python.nix @@ -56,6 +56,7 @@ pkgs.lib.throwIf (args ? specialArgs) { machine ? null, nodes ? { }, + containers ? { }, testScript, enableOCR ? false, globalTimeout ? (60 * 60), diff --git a/nixos/lib/testing/driver.nix b/nixos/lib/testing/driver.nix index 5845ebe2695ac..63ad51147743a 100644 --- a/nixos/lib/testing/driver.nix +++ b/nixos/lib/testing/driver.nix @@ -14,18 +14,17 @@ let qemu_pkg = config.qemu.package; imagemagick_light = hostPkgs.imagemagick_light.override { inherit (hostPkgs) libtiff; }; tesseract4 = hostPkgs.tesseract4.override { enableLanguages = [ "eng" ]; }; + + enableNspawn = config.containers != { }; + # We want `pkgs.systemd`, *not* `python3Packages.system`. + systemd = hostPkgs.systemd; }; vlans = map ( m: (m.virtualisation.vlans ++ (lib.mapAttrsToList (_: v: v.vlan) m.virtualisation.interfaces)) - ) (lib.attrValues config.nodes); + ) ((lib.attrValues config.nodes) ++ (lib.attrValues config.containers)); vms = map (m: m.system.build.vm) (lib.attrValues config.nodes); - - nodeHostNames = - let - nodesList = map (c: c.system.name) (lib.attrValues config.nodes); - in - nodesList ++ lib.optional (lib.length nodesList == 1 && !lib.elem "machine" nodesList) "machine"; + containers = map (m: m.system.build.nspawn) (lib.attrValues config.containers); pythonizeName = name: @@ -38,8 +37,22 @@ let uniqueVlans = lib.unique (builtins.concatLists vlans); vlanNames = map (i: "vlan${toString i}: VLan;") uniqueVlans; - pythonizedNames = map pythonizeName nodeHostNames; - machineNames = map (name: "${name}: Machine;") pythonizedNames; + + vmMachineNames = map (c: c.system.name) (lib.attrValues config.nodes); + containerMachineNames = map (c: c.system.name) (lib.attrValues config.containers); + + theOnlyMachine = + let + exactlyOneMachine = lib.length (lib.attrValues config.nodes) == 1; + allMachineNames = map (c: c.system.name) (lib.attrValues config.allMachines); + in + lib.optional (exactlyOneMachine && !lib.elem "machine" allMachineNames) "machine"; + + pythonizedVmNames = map pythonizeName (vmMachineNames ++ theOnlyMachine); + vmMachineTypeHints = map (name: "${name}: QemuMachine;") pythonizedVmNames; + + pythonizedContainerNames = map pythonizeName containerMachineNames; + containerMachineTypeHints = map (name: "${name}: NspawnMachine;") pythonizedContainerNames; withChecks = lib.warnIf config.skipLint "Linting is disabled"; @@ -62,12 +75,16 @@ let '' mkdir -p $out/bin - vmStartScripts=($(for i in ${toString vms}; do echo $i/bin/run-*-vm; done)) + vmNames=(${lib.escapeShellArgs vmMachineNames}) + vmStartScripts=(${lib.escapeShellArgs (map lib.getExe vms)}) + containerNames=(${lib.escapeShellArgs containerMachineNames}) + containerStartScripts=(${lib.escapeShellArgs (map lib.getExe containers)}) ${lib.optionalString (!config.skipTypeCheck) '' # prepend type hints so the test script can be type checked with mypy cat "${../test-script-prepend.py}" >> testScriptWithTypes - echo "${toString machineNames}" >> testScriptWithTypes + echo "${toString vmMachineTypeHints}" >> testScriptWithTypes + echo "${toString containerMachineTypeHints}" >> testScriptWithTypes echo "${toString vlanNames}" >> testScriptWithTypes echo -n "$testScript" >> testScriptWithTypes @@ -90,7 +107,9 @@ let echo "See https://nixos.org/manual/nixos/stable/#test-opt-skipLint" PYFLAKES_BUILTINS="$( - echo -n ${lib.escapeShellArg (lib.concatStringsSep "," pythonizedNames)}, + echo -n ${ + lib.escapeShellArg (lib.concatStringsSep "," (pythonizedVmNames ++ pythonizedContainerNames)) + }, cat ${lib.escapeShellArg "driver-symbols"} )" ${hostPkgs.python3Packages.pyflakes}/bin/pyflakes $out/test-script ''} @@ -98,7 +117,10 @@ let # set defaults through environment # see: ./test-driver/test-driver.py argparse implementation wrapProgram $out/bin/nixos-test-driver \ - --set startScripts "''${vmStartScripts[*]}" \ + --set vmStartScripts "''${vmStartScripts[*]}" \ + --set vmNames "''${vmNames[*]}" \ + --set containerStartScripts "''${containerStartScripts[*]}" \ + --set containerNames "''${containerNames[*]}" \ --set testScript "$out/test-script" \ --set globalTimeout "${toString config.globalTimeout}" \ --set vlans '${toString vlans}' \ diff --git a/nixos/lib/testing/network.nix b/nixos/lib/testing/network.nix index 9a5facfc2433d..570b854d8e28a 100644 --- a/nixos/lib/testing/network.nix +++ b/nixos/lib/testing/network.nix @@ -1,11 +1,13 @@ -{ lib, nodes, ... }: +testModuleArgs@{ + lib, + ... +}: let inherit (lib) attrNames - concatMap + concatMapAttrsStringSep concatMapStrings - flip forEach head listToAttrs @@ -20,22 +22,15 @@ let zipLists ; - nodeNumbers = listToAttrs (zipListsWith nameValuePair (attrNames nodes) (range 1 254)); + nodeNumbers = listToAttrs ( + zipListsWith nameValuePair (attrNames testModuleArgs.config.allMachines) (range 1 254) + ); networkModule = - { - config, - nodes, - pkgs, - ... - }: + { config, ... }: let - qemu-common = import ../qemu-common.nix { inherit (pkgs) lib stdenv; }; - interfaces = lib.attrValues config.virtualisation.allInterfaces; - interfacesNumbered = zipLists interfaces (range 1 255); - # Automatically assign IP addresses to requested interfaces. assignIPs = lib.filter (i: i.assignIP) interfaces; ipInterfaces = forEach assignIPs ( @@ -56,17 +51,6 @@ let } ); - qemuOptions = lib.flatten ( - forEach interfacesNumbered ( - { fst, snd }: qemu-common.qemuNICFlags snd fst.vlan config.virtualisation.test.nodeNumber - ) - ); - udevRules = forEach interfaces ( - interface: - # MAC Addresses for QEMU network devices are lowercase, and udev string comparison is case-sensitive. - ''SUBSYSTEM=="net",ACTION=="add",ATTR{address}=="${toLower (qemu-common.qemuNicMac interface.vlan config.virtualisation.test.nodeNumber)}",NAME="${interface.name}"'' - ); - networkConfig = { networking.hostName = mkDefault config.virtualisation.test.nodeName; @@ -80,33 +64,51 @@ let optionalString (ipInterfaces != [ ]) (head (head ipInterfaces).value.ipv6.addresses).address; - # Put the IP addresses of all VMs in this machine's - # /etc/hosts file. If a machine has multiple - # interfaces, use the IP address corresponding to - # the first interface (i.e. the first network in its - # virtualisation.vlans option). - networking.extraHosts = flip concatMapStrings (attrNames nodes) ( - m': + # Generate /etc/hosts including every remote's primary IP addresses + # (whichever VLAN they may belong to) as well as all IP addresses from + # VLANs that both the local machine and the remote machine share. + networking.extraHosts = let - config = nodes.${m'}; - hostnames = - optionalString ( - config.networking.domain != null - ) "${config.networking.hostName}.${config.networking.domain} " - + "${config.networking.hostName}\n"; + localVlans = config.virtualisation.vlans; in - optionalString ( - config.networking.primaryIPAddress != "" - ) "${config.networking.primaryIPAddress} ${hostnames}" - + optionalString ( - config.networking.primaryIPv6Address != "" - ) "${config.networking.primaryIPv6Address} ${hostnames}" - ); - - virtualisation.qemu.options = qemuOptions; - boot.initrd.services.udev.rules = concatMapStrings (x: x + "\n") udevRules; + concatMapAttrsStringSep "" ( + mName: remoteConfig: + let + remoteInterfaces = remoteConfig.networking.interfaces; + sharedIps = lib.flatten ( + lib.mapAttrsToList ( + ifaceName: ifaceCfg: + let + remoteIfaceMeta = remoteConfig.virtualisation.allInterfaces."${ifaceName}" or { }; + vlanId = remoteIfaceMeta.vlan or null; + in + if vlanId != null && builtins.elem vlanId localVlans then + builtins.map (addr: addr.address) ifaceCfg.ipv4.addresses + ++ builtins.map (addr: addr.address) ifaceCfg.ipv6.addresses + else + [ ] + ) remoteInterfaces + ); + + # We also want to test router protocols that enable connections + # between nodes even if they don't share a VLAN, so we include + # the primary IPs of all machines in the hosts file. + primaryIPs = [ + remoteConfig.networking.primaryIPAddress + remoteConfig.networking.primaryIPv6Address + ]; + + allReachableIps = lib.lists.uniqueStrings (sharedIps ++ primaryIPs); + + hostnames = + optionalString ( + remoteConfig.networking.domain != null + ) "${remoteConfig.networking.hostName}.${remoteConfig.networking.domain} " + + "${remoteConfig.networking.hostName}\n"; + in + builtins.concatStringsSep "" (map (ip: "${ip} ${hostnames}") allReachableIps) + ) testModuleArgs.config.allMachines; }; - in { key = "network-interfaces"; @@ -117,6 +119,31 @@ let }; }; + qemuNetworkModule = + { config, pkgs, ... }: + let + qemu-common = import ../qemu-common.nix { inherit (pkgs) lib stdenv; }; + + interfaces = lib.attrValues config.virtualisation.allInterfaces; + + interfacesNumbered = zipLists interfaces (range 1 255); + + qemuOptions = lib.flatten ( + forEach interfacesNumbered ( + { fst, snd }: qemu-common.qemuNICFlags snd fst.vlan config.virtualisation.test.nodeNumber + ) + ); + udevRules = map ( + interface: + # MAC Addresses for QEMU network devices are lowercase, and udev string comparison is case-sensitive. + ''SUBSYSTEM=="net",ACTION=="add",ATTR{address}=="${toLower (qemu-common.qemuNicMac interface.vlan config.virtualisation.test.nodeNumber)}",NAME="${interface.name}"'' + ) interfaces; + in + { + virtualisation.qemu.options = qemuOptions; + boot.initrd.services.udev.rules = concatMapStrings (x: x + "\n") udevRules; + }; + nodeNumberModule = ( regular@{ config, name, ... }: { @@ -127,7 +154,7 @@ let # We need to force this in specialisations, otherwise it'd be # readOnly = true; description = '' - The `name` in `nodes.`; stable across `specialisations`. + The `name` in `nodes.` and `containers.`; stable across `specialisations`. ''; }; virtualisation.test.nodeNumber = mkOption { @@ -136,7 +163,7 @@ let readOnly = true; default = nodeNumbers.${config.virtualisation.test.nodeName}; description = '' - A unique number assigned for each node in `nodes`. + A unique number assigned for each machine in `nodes` and `containers`. ''; }; @@ -172,5 +199,10 @@ in nodeNumberModule ]; }; + extraBaseNodeModules = { + imports = [ + qemuNetworkModule + ]; + }; }; } diff --git a/nixos/lib/testing/nixos-test-base.nix b/nixos/lib/testing/nixos-test-base.nix index 23358f2185b1f..6b518e39ac116 100644 --- a/nixos/lib/testing/nixos-test-base.nix +++ b/nixos/lib/testing/nixos-test-base.nix @@ -7,7 +7,6 @@ let in { imports = [ - ../../modules/virtualisation/qemu-vm.nix ../../modules/testing/test-instrumentation.nix # !!! should only get added for automated test runs { key = "no-manual"; @@ -32,7 +31,9 @@ in # This is mostly a Hydra optimization, so we don't rebuild all the tests every time switch-to-configuration-ng changes. key = "no-switch-to-configuration"; system.switch.enable = mkDefault ( - config.isSpecialisation || config.specialisation != { } || config.virtualisation.installBootLoader + config.isSpecialisation + || config.specialisation != { } + || (!config.boot.isContainer && config.virtualisation.installBootLoader) ); } ) diff --git a/nixos/lib/testing/nodes.nix b/nixos/lib/testing/nodes.nix index 721a3c88b3692..795838e77983e 100644 --- a/nixos/lib/testing/nodes.nix +++ b/nixos/lib/testing/nodes.nix @@ -2,7 +2,6 @@ testModuleArgs@{ config, lib, hostPkgs, - nodes, options, ... }: @@ -12,12 +11,9 @@ let literalExpression literalMD mapAttrs - mkDefault mkIf mkMerge mkOption - mkForce - optional optionalAttrs types ; @@ -49,15 +45,11 @@ let ./nixos-test-base.nix { key = "nodes"; - _module.args.nodes = config.nodesCompat; + _module.args = { + inherit (config) containers; + nodes = config.nodesCompat; + }; } - ( - { config, ... }: - { - virtualisation.qemu.package = testModuleArgs.config.qemu.package; - virtualisation.host.pkgs = hostPkgs; - } - ) ( { options, ... }: { @@ -73,6 +65,62 @@ let testModuleArgs.config.extraBaseModules ]; }; + baseQemuOS = baseOS.extendModules { + modules = [ + ../../modules/virtualisation/qemu-vm.nix + config.nodeDefaults + { + key = "base-qemu"; + virtualisation.qemu.package = testModuleArgs.config.qemu.package; + virtualisation.host.pkgs = hostPkgs; + } + testModuleArgs.config.extraBaseNodeModules + ]; + }; + baseNspawnOS = baseOS.extendModules { + modules = [ + ../../modules/virtualisation/nspawn-container + config.containerDefaults + ( + { pkgs, ... }: + { + key = "base-nspawn"; + + # PAM requires setuid and doesn't work in the build sandbox. + # https://github.com/NixOS/nix/blob/959c244a1265f4048390f3ad21679219d7b27a99/src/libstore/unix/build/linux-derivation-builder.cc#L63 + services.openssh.settings.UsePAM = false; + + # Networking for tests is statically configured by default. + # dhcpcd times out after blocking for a long time, which slows down tests. + # See https://github.com/NixOS/nixpkgs/pull/478109#discussion_r2867570799 + networking.useDHCP = lib.mkDefault false; + + # Disable Info manual directory generation to prevent build failures. + # + # Context: 'install-info' (from texinfo) is triggered during system-path + # generation to index manuals, but it requires 'gzip' in the $PATH to + # decompress them. + # When 'networking.useDHCP' is set to false, transitive dependencies + # (like dhcpcd or other network tools) that normally pull 'gzip' into + # the system environment are removed. This leaves 'install-info' + # stranded without 'gzip', causing the 'system-path' derivation to fail. + # Since nspawn containers are typically minimal, disabling 'info' + # is a cleaner fix than explicitly adding 'gzip' to systemPackages. + documentation.info.enable = lib.mkDefault false; + + # Gross, insecure hack to make login work. See above. + security.pam.services.login = { + text = '' + auth sufficient ${pkgs.linux-pam}/lib/security/pam_permit.so + account sufficient ${pkgs.linux-pam}/lib/security/pam_permit.so + password sufficient ${pkgs.linux-pam}/lib/security/pam_permit.so + session sufficient ${pkgs.linux-pam}/lib/security/pam_permit.so + ''; + }; + } + ) + ]; + }; # TODO (lib): Dedup with run.nix, add to lib/options.nix mkOneUp = opt: f: lib.mkOverride (opt.highestPrio - 1) (f opt.value); @@ -109,15 +157,37 @@ in node.type = mkOption { type = types.raw; - default = baseOS.type; + default = baseQemuOS.type; internal = true; }; nodes = mkOption { type = types.lazyAttrsOf config.node.type; + default = { }; + visible = "shallow"; + description = '' + An attribute set of NixOS configuration modules representing QEMU vms that can be started during a test. + + The configurations are augmented by the [`defaults`](#test-opt-defaults) option. + + They are assigned network addresses according to the `nixos/lib/testing/network.nix` module. + + A few special options are available, that aren't in a plain NixOS configuration. See [Configuring the nodes](#sec-nixos-test-nodes) + ''; + }; + + container.type = mkOption { + type = types.raw; + default = baseNspawnOS.type; + internal = true; + }; + + containers = mkOption { + type = types.lazyAttrsOf config.container.type; + default = { }; visible = "shallow"; description = '' - An attribute set of NixOS configuration modules. + An attribute set of NixOS configuration modules representing systemd-nspawn containers that can be started during a test. The configurations are augmented by the [`defaults`](#test-opt-defaults) option. @@ -127,7 +197,34 @@ in ''; }; + allMachines = mkOption { + readOnly = true; + internal = true; + description = '' + Basically a merge of [{option}`nodes`](#test-opt-nodes) and [{option}`containers`](#test-opt-containers). + + This ensures that there are no name collisions between nodes and containers. + ''; + default = + let + overlappingNames = lib.intersectLists (lib.attrNames config.nodes) ( + lib.attrNames config.containers + ); + in + lib.throwIfNot (overlappingNames == [ ]) + "The following names are used in both `nodes` and `containers`: ${lib.concatStringsSep ", " overlappingNames}" + (config.nodes // config.containers); + }; + defaults = mkOption { + description = '' + NixOS configuration that is applied to all [{option}`nodes`](#test-opt-nodes) and [{option}`containers`](#test-opt-containers). + ''; + type = types.deferredModule; + default = { }; + }; + + nodeDefaults = mkOption { description = '' NixOS configuration that is applied to all [{option}`nodes`](#test-opt-nodes). ''; @@ -135,7 +232,23 @@ in default = { }; }; + containerDefaults = mkOption { + description = '' + NixOS configuration that is applied to all [{option}`containers`](#test-opt-containers). + ''; + type = types.deferredModule; + default = { }; + }; + extraBaseModules = mkOption { + description = '' + NixOS configuration that, like [{option}`defaults`](#test-opt-defaults), is applied to all [{option}`nodes`](#test-opt-nodes) and [{option}`containers`](#test-opt-containers) and can not be undone with [`specialisation..inheritParentConfig`](https://search.nixos.org/options?show=specialisation.%3Cname%3E.inheritParentConfig&from=0&size=50&sort=relevance&type=packages&query=specialisation). + ''; + type = types.deferredModule; + default = { }; + }; + + extraBaseNodeModules = mkOption { description = '' NixOS configuration that, like [{option}`defaults`](#test-opt-defaults), is applied to all [{option}`nodes`](#test-opt-nodes) and can not be undone with [`specialisation..inheritParentConfig`](https://search.nixos.org/options?show=specialisation.%3Cname%3E.inheritParentConfig&from=0&size=50&sort=relevance&type=packages&query=specialisation). ''; @@ -145,7 +258,7 @@ in node.pkgs = mkOption { description = '' - The Nixpkgs to use for the nodes. + The Nixpkgs to use for the nodes and containers. Setting this will make the `nixpkgs.*` options read-only, to avoid mistakenly testing with a Nixpkgs configuration that diverges from regular use. ''; @@ -160,7 +273,7 @@ in description = '' Whether to make the `nixpkgs.*` options read-only. This is only relevant when [`node.pkgs`](#test-opt-node.pkgs) is set. - Set this to `false` when any of the [`nodes`](#test-opt-nodes) needs to configure any of the `nixpkgs.*` options. This will slow down evaluation of your test a bit. + Set this to `false` when any of the [`nodes`](#test-opt-nodes) or [{option}`containers`](#test-opt-containers) need to configure any of the `nixpkgs.*` options. This will slow down evaluation of your test a bit. ''; type = types.bool; default = config.node.pkgs != null; @@ -188,6 +301,7 @@ in }; config = { + _module.args.containers = config.containers; _module.args.nodes = config.nodesCompat; nodesCompat = mapAttrs ( name: config: @@ -201,6 +315,7 @@ in ) config.nodes; passthru.nodes = config.nodesCompat; + passthru.containers = config.containers; extraDriverArgs = mkIf config.sshBackdoor.enable [ "--dump-vsocks=${toString config.sshBackdoor.vsockOffset}" @@ -211,33 +326,35 @@ in nixpkgs.pkgs = config.node.pkgs; imports = [ ../../modules/misc/nixpkgs/read-only.nix ]; }) - (mkIf config.sshBackdoor.enable ( - let - inherit (config.sshBackdoor) vsockOffset; - in - { config, ... }: - { - services.openssh = { - enable = true; - settings = { - PermitRootLogin = "yes"; - PermitEmptyPasswords = "yes"; - }; - }; - - security.pam.services.sshd = { - allowNullPassword = true; + (mkIf config.sshBackdoor.enable { + services.openssh = { + enable = true; + settings = { + PermitRootLogin = "yes"; + PermitEmptyPasswords = "yes"; }; + }; - virtualisation.qemu.options = [ - "-device vhost-vsock-pci,guest-cid=${ - toString (config.virtualisation.test.nodeNumber + vsockOffset) - }" - ]; - } - )) + security.pam.services.sshd = { + allowNullPassword = true; + }; + }) ]; + nodeDefaults = mkIf config.sshBackdoor.enable ( + let + inherit (config.sshBackdoor) vsockOffset; + in + { config, ... }: + { + virtualisation.qemu.options = [ + "-device vhost-vsock-pci,guest-cid=${ + toString (config.virtualisation.test.nodeNumber + vsockOffset) + }" + ]; + } + ); + # Docs: nixos/doc/manual/development/writing-nixos-tests.section.md /** See https://nixos.org/manual/nixos/unstable#sec-override-nixos-test diff --git a/nixos/lib/testing/run.nix b/nixos/lib/testing/run.nix index e34e585241bec..646832f71e624 100644 --- a/nixos/lib/testing/run.nix +++ b/nixos/lib/testing/run.nix @@ -2,6 +2,7 @@ config, hostPkgs, lib, + containers, options, ... }: @@ -96,12 +97,15 @@ in requiredSystemFeatures = [ "nixos-test" ] + # Containers use systemd-nspawn, which requires pid 0 inside of the sandbox. + ++ lib.optional (builtins.length (lib.attrNames containers) > 0) "uid-range" ++ lib.optional isLinux "kvm" ++ lib.optional isDarwin "apple-virt"; nativeBuildInputs = lib.optionals config.enableDebugHook [ hostPkgs.openssh hostPkgs.inetutils + hostPkgs.socat # to allow SSH backdoor connections for systemd-nspawn containers ]; buildCommand = '' diff --git a/nixos/lib/testing/testScript.nix b/nixos/lib/testing/testScript.nix index bde7b78607b47..4ce368c0b8dbb 100644 --- a/nixos/lib/testing/testScript.nix +++ b/nixos/lib/testing/testScript.nix @@ -56,11 +56,12 @@ in # reuse memoized config v ) config.nodesCompat; + containers = config.containers; } else config.testScript; - defaults = + nodeDefaults = { config, name, ... }: { # Make sure all derivations referenced by the test diff --git a/nixos/modules/testing/test-instrumentation.nix b/nixos/modules/testing/test-instrumentation.nix index dce59a0a93b74..81cb578bf0174 100644 --- a/nixos/modules/testing/test-instrumentation.nix +++ b/nixos/modules/testing/test-instrumentation.nix @@ -86,7 +86,8 @@ in options.testing = { backdoor = lib.mkEnableOption "backdoor service in stage 2" // { - default = true; + # See assertion below for why the backdoor doesn't work with containers. + default = !config.boot.isContainer; }; initrdBackdoor = lib.mkEnableOption '' @@ -105,7 +106,20 @@ in { assertion = cfg.initrdBackdoor -> config.boot.initrd.systemd.enable; message = '' - testing.initrdBackdoor requires boot.initrd.systemd.enable to be enabled. + `testing.initrdBackdoor` requires `boot.initrd.systemd.enable` to be enabled. + ''; + } + { + assertion = config.boot.isContainer -> !cfg.backdoor; + message = '' + `testing.backdoor` uses virtio console, which does not work with + containers (we use `nsenter` instead). + ''; + } + { + assertion = config.boot.isContainer -> !cfg.initrdBackdoor; + message = '' + `testing.initrdBackdoor` does not work with containers as there is no initrd. ''; } ]; diff --git a/nixos/modules/virtualisation/guest-networking-options.nix b/nixos/modules/virtualisation/guest-networking-options.nix index 817ccc4e6370c..39d9eea24f3dc 100644 --- a/nixos/modules/virtualisation/guest-networking-options.nix +++ b/nixos/modules/virtualisation/guest-networking-options.nix @@ -71,7 +71,7 @@ in virtualisation.vlans = lib.mkOption { type = types.listOf types.ints.unsigned; default = if cfg.interfaces == { } then [ 1 ] else [ ]; - defaultText = lib.literalExpression "if cfg.interfaces == {} then [ 1 ] else [ ]"; + defaultText = lib.literalExpression "if config.virtualisation.interfaces == {} then [ 1 ] else [ ]"; example = [ 1 2 diff --git a/nixos/modules/virtualisation/nspawn-container/default.nix b/nixos/modules/virtualisation/nspawn-container/default.nix index 6f303eec16df1..f56167ca0b079 100644 --- a/nixos/modules/virtualisation/nspawn-container/default.nix +++ b/nixos/modules/virtualisation/nspawn-container/default.nix @@ -70,6 +70,45 @@ in config = { boot.isNspawnContainer = true; + assertions = [ + { + assertion = config.specialisation == { }; + message = '' + Setting 'specialisation' is disallowed for systemd-nspawn container configurations. + Activating a specialisation requires creating SUID wrappers (e.g., for 'sudo'), + which is prohibited within the Nix build sandbox where the test is run. + ''; + } + { + # Check every interface defined in allInterfaces. + # Containers try to create a bridge "${config.system.name}-${interfaceName}" + assertion = lib.all ( + iface: + let + hostName = "${config.system.name}-${iface.name}"; + in + lib.stringLength hostName <= 15 + ) (lib.attrValues cfg.allInterfaces); + + message = + let + offendingInterfaces = lib.filter ( + iface: lib.stringLength "${config.system.name}-${iface.name}" > 15 + ) (lib.attrValues cfg.allInterfaces); + offenderList = map ( + i: + "${config.system.name}-${i.name} (${toString (lib.stringLength "${config.system.name}-${i.name}")} chars)" + ) offendingInterfaces; + in + '' + The following generated host interface names exceed the Linux 15-character limit: + ${lib.concatStringsSep "\n " offenderList} + + Please shorten 'config.system.name' or the interface names in 'virtualisation.interfaces'. + ''; + } + ]; + # TODO(arianvp): Remove after https://github.com/NixOS/nixpkgs/pull/480686 is merged console.enable = true; @@ -94,6 +133,9 @@ in # > kind of unit allocation or registration with systemd-machined. "--keep-unit" "--register=no" + + # Send a READY=1 notification to a socket when the container is fully booted. + "--notify-ready=yes" ]; system.build.nspawn = diff --git a/nixos/modules/virtualisation/nspawn-container/run-nspawn/src/run_nspawn/__init__.py b/nixos/modules/virtualisation/nspawn-container/run-nspawn/src/run_nspawn/__init__.py index 99f50038fd7c7..3eb622cbbfe84 100644 --- a/nixos/modules/virtualisation/nspawn-container/run-nspawn/src/run_nspawn/__init__.py +++ b/nixos/modules/virtualisation/nspawn-container/run-nspawn/src/run_nspawn/__init__.py @@ -68,7 +68,9 @@ def ensure_vlan_bridge(vlan: int) -> typing.Generator[str, None, None]: ipv6_addr = f"2001:db8:{vlan}::fe/64" bridge_name = f"br{vlan}" + tap_name = f"vde-tap{vlan}" bridge_path = Path("/sys/class/net") / bridge_name + tap_path = Path("/sys/class/net") / tap_name try: # To avoid racing against other nspawn containers that also # need this vlan, grab an exclusive lock. @@ -80,6 +82,19 @@ def ensure_vlan_bridge(vlan: int) -> typing.Generator[str, None, None]: run_ip("addr", "add", ipv4_addr, "dev", bridge_name) run_ip("addr", "add", ipv6_addr, "dev", bridge_name) + if tap_path.exists(): + logger.info(f"attaching {tap_name} to {bridge_name}") + run_ip("link", "set", tap_name, "master", bridge_name) + run_ip("link", "set", tap_name, "up") + else: + logger.warning( + f"TAP {tap_name} not found; container will be isolated from VDE" + ) + if not Path("/dev/net").exists(): + logger.warning( + "A common reason for this is that /dev/net is not available in the Nix sandbox. Try adding /dev/net to extra-sandbox-paths." + ) + yield bridge_name finally: # To avoid racing against other nspawn containers that also @@ -126,6 +141,7 @@ def mk_veth( def run( container_name: str, root_dir_str: str, + shared_dir_str: typing.Optional[str], interfaces: dict, nspawn_options: list[str], init: str, @@ -166,12 +182,19 @@ def print_pid() -> None: flush=True, ) + shared_dir = Path(shared_dir_str) if shared_dir_str else None + cp = subprocess.Popen( [ "@systemd-nspawn@", *nspawn_options, f"--directory={root_dir}", f"--network-namespace-path={netns.path}", + *( + [f"--bind={shared_dir}:/tmp/shared"] + if shared_dir is not None + else [] + ), init, *cmdline, ], @@ -218,6 +241,11 @@ def main(): required=True, help="Path to container root directory (overridable with RUN_NSPAWN_ROOT_DIR)", ) + arg_parser.add_argument( + "--shared-dir", + required=False, + help="Path to a shared directory to bind-mount into the container at /tmp/shared (overridable with RUN_NSPAWN_SHARED_DIR)", + ) arg_parser.add_argument( "--interfaces-json", dest="interfaces", @@ -239,6 +267,7 @@ def main(): run( container_name=args.container_name, root_dir_str=os.getenv("RUN_NSPAWN_ROOT_DIR", default=args.root_dir), + shared_dir_str=os.getenv("RUN_NSPAWN_SHARED_DIR", default=args.shared_dir), interfaces=args.interfaces, nspawn_options=nspawn_options, init=args.init, diff --git a/nixos/tests/all-tests.nix b/nixos/tests/all-tests.nix index 39542f1e683a7..f96dc0fb88541 100644 --- a/nixos/tests/all-tests.nix +++ b/nixos/tests/all-tests.nix @@ -168,6 +168,7 @@ in node-name = runTest ./nixos-test-driver/node-name.nix; busybox = runTest ./nixos-test-driver/busybox.nix; console-log = runTest ./nixos-test-driver/console-log.nix; + containers = runTest ./nixos-test-driver/containers.nix; driver-timeout = pkgs.runCommand "ensure-timeout-induced-failure" { @@ -1592,6 +1593,7 @@ in teleports = runTest ./teleports.nix; temporal = runTest ./temporal.nix; terminal-emulators = handleTest ./terminal-emulators.nix { }; + test-containers-bittorrent = runTest ./test-containers-bittorrent.nix; thanos = runTest ./thanos.nix; thelounge = handleTest ./thelounge.nix { }; tiddlywiki = runTest ./tiddlywiki.nix; diff --git a/nixos/tests/nixos-test-driver/containers.nix b/nixos/tests/nixos-test-driver/containers.nix new file mode 100644 index 0000000000000..073e2de28bd6c --- /dev/null +++ b/nixos/tests/nixos-test-driver/containers.nix @@ -0,0 +1,77 @@ +{ pkgs, ... }: +{ + name = "containers"; + meta.maintainers = with pkgs.lib.maintainers; [ jfly ]; + + nodes = { + n1 = { + virtualisation.vlans = [ 1 ]; + }; + n2 = { + virtualisation.vlans = [ + 2 + ]; + }; + }; + + containers = { + c1 = { + virtualisation.vlans = [ 1 ]; + }; + c2 = { + virtualisation.vlans = [ 2 ]; + }; + c12 = { + virtualisation.vlans = [ + 1 + 2 + ]; + }; + }; + + testScript = /* python */ '' + c1.start() + c2.start() + c12.start() + + c1.succeed("echo hello > /hello.txt") + c1.copy_from_machine("/hello.txt") + + c1.systemctl("start network-online.target") + c2.systemctl("start network-online.target") + c12.systemctl("start network-online.target") + c1.wait_for_unit("network-online.target") + c2.wait_for_unit("network-online.target") + c12.wait_for_unit("network-online.target") + + # Confirm containers in vlan 1 can talk to each other. + c1.succeed("ping -c 1 c12") + c12.succeed("ping -c 1 c1") + + # Confirm containers in vlan 2 can talk to each other. + c2.succeed("ping -c 1 c12") + c12.succeed("ping -c 1 c2") + + # Confirm containers in separate vlans cannot talk to each other. + c1.fail("ping -c 1 -W 1 c2") + + n1.start() + n2.start() + n1.systemctl("start network-online.target") + n2.systemctl("start network-online.target") + n1.wait_for_unit("network-online.target") + n2.wait_for_unit("network-online.target") + + # Confirm containers and nodes in the same vlan can talk to each other. + c1.succeed("ping -c 1 n1") + n1.succeed("ping -c 1 c1") + c2.succeed("ping -c 1 n2") + n2.succeed("ping -c 1 c2") + + # Confirm containers and nodes in different vlans cannot talk to each other. + c1.fail("ping -c 1 -W 1 n2") + n1.fail("ping -c 1 -W 1 c2") + c2.fail("ping -c 1 -W 1 n1") + n2.fail("ping -c 1 -W 1 c1") + ''; +} diff --git a/nixos/tests/test-containers-bittorrent.nix b/nixos/tests/test-containers-bittorrent.nix new file mode 100644 index 0000000000000..aeac191cdc2e0 --- /dev/null +++ b/nixos/tests/test-containers-bittorrent.nix @@ -0,0 +1,215 @@ +# This test runs a Bittorrent tracker on one machine, and verifies +# that two client machines can download the torrent using +# `aria2c'. The first client (behind a NAT router) downloads +# from the initial seeder running on the tracker. Then we kill the +# initial seeder. The second client downloads from the first client, +# which only works if the first client successfully uses the UPnP-IGD +# protocol to poke a hole in the NAT. + +# We use aria2 as the initial seeder because transmission +# fails in the sandbox because of systemd hardening settings, +# namely MountAPIVFS=yes, so we get the following error: + +# $ journalctl --unit transmission.service +# (n-daemon)[417]: transmission.service: Failed to create destination mount point node '/run/transmission/run/host/.os-release-stage/', ignoring: Read-only file system +# (n-daemon)[417]: transmission.service: Failed to mount /run/systemd/propagate/.os-release-stage to /run/transmission/run/host/.os-release-stage/: No such file or directory +# (n-daemon)[417]: transmission.service: Failed to set up mount namespacing: /run/host/.os-release-stage/: No such file or directory +# (n-daemon)[417]: transmission.service: Failed at step NAMESPACE spawning /nix/store/zfksw9bllp95pl45d1nxmpd2lks42bkj-transmission-4.0.6/bin/transmission-daemon: No such file or directory +# systemd[1]: transmission.service: Main process exited, code=exited, status=226/NAMESPACE + +{ lib, hostPkgs, ... }: + +let + + # Some random file to serve. + file = hostPkgs.hello.src; + + internalRouterAddress = "192.168.3.1"; + internalClient1Address = "192.168.3.2"; + + # cannot use documentation networks (198.51.100.0/24 or 192.0.2.0/24) here + # because miniupnpd recognizes them as such and refuses to work with them + # https://github.com/miniupnp/miniupnp/blob/2a74cb2f27cacf06d2b50c187e8f90aa1f5c2528/miniupnpd/miniupnpd.c#L998 + externalRouterAddress = "80.100.100.1"; + externalClient2Address = "80.100.100.2"; + externalTrackerAddress = "80.100.100.3"; + + download-dir = "/tmp/aria2-downloads"; + peerConfig = + { pkgs, ... }: + { + environment.systemPackages = [ + pkgs.aria2 + pkgs.transmission_4 # only needed for transmission-create + ]; + }; +in + +{ + name = "bittorrent"; + meta = { + maintainers = [ + lib.maintainers.kmein + ]; + }; + + containers = { + tracker = + { pkgs, ... }: + { + imports = [ peerConfig ]; + + virtualisation.vlans = [ 1 ]; + networking.firewall.enable = false; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = externalTrackerAddress; + prefixLength = 24; + } + ]; + + # We need Apache on the tracker to serve the torrents. + services.httpd = { + enable = true; + virtualHosts = { + "torrentserver.org" = { + adminAddr = "foo@example.org"; + documentRoot = "/tmp"; + }; + }; + }; + services.opentracker.enable = true; + }; + + router = + { pkgs, containers, ... }: + { + virtualisation.vlans = [ + 1 + 2 + ]; + networking.nat.enable = true; + networking.nat.internalInterfaces = [ "eth2" ]; + networking.nat.externalInterface = "eth1"; + networking.firewall.enable = true; + networking.firewall.trustedInterfaces = [ "eth2" ]; + networking.interfaces.eth0.ipv4.addresses = [ ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = externalRouterAddress; + prefixLength = 24; + } + ]; + networking.interfaces.eth2.ipv4.addresses = [ + { + address = internalRouterAddress; + prefixLength = 24; + } + ]; + networking.nftables.enable = true; + services.miniupnpd = { + enable = true; + externalInterface = "eth1"; + internalIPs = [ "eth2" ]; + appendConfig = '' + ext_ip=${externalRouterAddress} + ''; + }; + }; + + client1 = + { pkgs, containers, ... }: + { + imports = [ peerConfig ]; + environment.systemPackages = [ pkgs.miniupnpc ]; + + virtualisation.vlans = [ 2 ]; + networking.interfaces.eth0.ipv4.addresses = [ ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = internalClient1Address; + prefixLength = 24; + } + ]; + networking.defaultGateway = internalRouterAddress; + networking.firewall.enable = false; + }; + + client2 = + { pkgs, ... }: + { + imports = [ peerConfig ]; + + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth0.ipv4.addresses = [ ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = externalClient2Address; + prefixLength = 24; + } + ]; + networking.firewall.enable = false; + }; + }; + + testScript = + { containers, ... }: + '' + start_all() + + # Wait for network and miniupnpd. + router.systemctl("start network-online.target") + router.wait_for_unit("network-online.target") + router.wait_for_unit("miniupnpd") + + # Create the torrent. + tracker.succeed("mkdir -p ${download-dir}") + tracker.succeed( + "cp ${file} ${download-dir}/test.tar.bz2" + ) + tracker.succeed( + "transmission-create ${download-dir}/test.tar.bz2 --private --tracker http://${externalTrackerAddress}:6969/announce --outfile /tmp/test.torrent" + ) + tracker.succeed("chmod 644 /tmp/test.torrent") + + # Start the tracker + tracker.systemctl("start network-online.target") + tracker.wait_for_unit("network-online.target") + tracker.wait_for_unit("opentracker.service") + tracker.wait_for_open_port(6969) + + # --- Start the initial seeder using aria2 --- + # https://stackoverflow.com/a/44528978 + tracker.execute( + "aria2c --enable-dht=false --seed-time=999 --dir=${download-dir} " + "-V --seed-ratio=0.0 " + "/tmp/test.torrent >/dev/null &" + ) + + # --- Wait until the tracker shows we are seeding --- + tracker.wait_until_succeeds("curl -s http://localhost:6969/stats | grep -q 'serving 1 torrents'") + + # Now we should be able to download from the client behind the NAT. + tracker.wait_for_unit("httpd") + + def connect_from(machine): + machine.systemctl("start network-online.target") + machine.wait_for_unit("network-online.target") + machine.execute( + "aria2c --enable-dht=false --seed-time=999 --dir=${download-dir} " + "http://${externalTrackerAddress}/test.torrent >/dev/null &" + ) + machine.wait_until_succeeds( + "cmp ${download-dir}/test.tar.bz2 ${file}" + ) # Wait for download to finish and verify + + connect_from(client1) + + # --- Bring down the initial seeder --- + tracker.succeed("pkill aria2c") + + # Now download from the second client. This can only succeed if + # the first client created a NAT hole in the router. + connect_from(client2) + ''; +}