diff --git a/src/aleph/vm/hypervisors/firecracker/microvm.py b/src/aleph/vm/hypervisors/firecracker/microvm.py index 7a8fe787e..d357fb6e0 100644 --- a/src/aleph/vm/hypervisors/firecracker/microvm.py +++ b/src/aleph/vm/hypervisors/firecracker/microvm.py @@ -13,7 +13,7 @@ from pathlib import Path from pwd import getpwnam from tempfile import NamedTemporaryFile -from typing import Any +from typing import Any, BinaryIO import msgpack from aleph_message.models import ItemHash @@ -93,6 +93,8 @@ class MicroVM: mounted_rootfs: Path | None = None _unix_socket: Server | None = None enable_log: bool + journal_stdout: BinaryIO | int | None = None + journal_stderr: BinaryIO | int | None = None def __repr__(self): return f"" @@ -219,19 +221,19 @@ async def start_firecracker(self, config_path: Path) -> asyncio.subprocess.Proce str(config_path), ) if self.enable_log: - journal_stdout = journal.stream(self._journal_stdout_name) - journal_stderr = journal.stream(self._journal_stderr_name) + self.journal_stdout = journal.stream(self._journal_stdout_name) + self.journal_stderr = journal.stream(self._journal_stderr_name) else: - journal_stdout = asyncio.subprocess.DEVNULL - journal_stderr = asyncio.subprocess.DEVNULL + self.journal_stdout = asyncio.subprocess.DEVNULL + self.journal_stderr = asyncio.subprocess.DEVNULL logger.debug(" ".join(options)) self.proc = await asyncio.create_subprocess_exec( *options, stdin=asyncio.subprocess.PIPE, - stdout=journal_stdout, - stderr=journal_stderr, + stdout=self.journal_stdout, + stderr=self.journal_stderr, ) return self.proc @@ -252,11 +254,11 @@ async def start_jailed_firecracker(self, config_path: Path) -> asyncio.subproces self.config_file_path = config_path if self.enable_log: - journal_stdout = journal.stream(self._journal_stdout_name) - journal_stderr = journal.stream(self._journal_stderr_name) + self.journal_stdout = journal.stream(self._journal_stdout_name) + self.journal_stderr = journal.stream(self._journal_stderr_name) else: - journal_stdout = asyncio.subprocess.DEVNULL - journal_stderr = asyncio.subprocess.DEVNULL + self.journal_stdout = asyncio.subprocess.DEVNULL + self.journal_stderr = asyncio.subprocess.DEVNULL options = ( str(self.jailer_bin_path), @@ -280,8 +282,8 @@ async def start_jailed_firecracker(self, config_path: Path) -> asyncio.subproces self.proc = await asyncio.create_subprocess_exec( *options, stdin=asyncio.subprocess.PIPE, - stdout=journal_stdout, - stderr=journal_stderr, + stdout=self.journal_stdout, + stderr=self.journal_stderr, ) return self.proc @@ -480,6 +482,19 @@ async def teardown(self): if self.stderr_task: self.stderr_task.cancel() + if ( + self.journal_stdout + and self.journal_stdout != asyncio.subprocess.DEVNULL + and hasattr(self.journal_stdout, "close") + ): + self.journal_stdout.close() + if ( + self.journal_stderr + and self.journal_stderr != asyncio.subprocess.DEVNULL + and hasattr(self.journal_stderr, "close") + ): + self.journal_stderr.close() + # Clean mounted block devices if self.mounted_rootfs: logger.debug("Waiting for one second for the VM to shutdown") diff --git a/src/aleph/vm/hypervisors/qemu/qemuvm.py b/src/aleph/vm/hypervisors/qemu/qemuvm.py index 1d707c2a5..5949fbdc4 100644 --- a/src/aleph/vm/hypervisors/qemu/qemuvm.py +++ b/src/aleph/vm/hypervisors/qemu/qemuvm.py @@ -2,7 +2,7 @@ from asyncio.subprocess import Process from dataclasses import dataclass from pathlib import Path -from typing import TextIO +from typing import BinaryIO, TextIO import qmp from systemd import journal @@ -28,6 +28,8 @@ class QemuVM: interface_name: str qemu_process: Process | None = None host_volumes: list[HostVolume] + journal_stdout: TextIO | None + journal_stderr: TextIO | None def __repr__(self) -> str: if self.qemu_process: @@ -72,8 +74,8 @@ async def start( # qemu-system-x86_64 -enable-kvm -m 2048 -net nic,model=virtio # -net tap,ifname=tap0,script=no,downscript=no -drive file=alpine.qcow2,media=disk,if=virtio -nographic - journal_stdout: TextIO = journal.stream(self._journal_stdout_name) - journal_stderr: TextIO = journal.stream(self._journal_stderr_name) + self.journal_stdout: BinaryIO = journal.stream(self._journal_stdout_name) + self.journal_stderr: BinaryIO = journal.stream(self._journal_stderr_name) # hardware_resources.published ports -> not implemented at the moment # hardware_resources.seconds -> only for microvm args = [ @@ -120,8 +122,8 @@ async def start( self.qemu_process = proc = await asyncio.create_subprocess_exec( *args, stdin=asyncio.subprocess.DEVNULL, - stdout=journal_stdout, - stderr=journal_stderr, + stdout=self.journal_stdout, + stderr=self.journal_stderr, ) print( @@ -149,3 +151,8 @@ def send_shutdown_message(self): async def stop(self): """Stop the VM.""" self.send_shutdown_message() + + if self.journal_stdout and self.journal_stdout != asyncio.subprocess.DEVNULL: + self.journal_stdout.close() + if self.journal_stderr and self.journal_stderr != asyncio.subprocess.DEVNULL: + self.journal_stderr.close()