diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index 9801dfb41..b676e661b 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -8,7 +8,7 @@ from datetime import datetime, timezone from os import listdir from pathlib import Path -from typing import Any, Optional +from typing import Any import aiohttp from aleph_message.models import ( @@ -128,8 +128,8 @@ async def read_aleph_messages() -> dict[str, MessagesResponse]: async def resolve_dns_hostname(): """Check if DNS resolution is working.""" hostname = "example.org" - ipv4: Optional[str] = None - ipv6: Optional[str] = None + ipv4: str | None = None + ipv6: str | None = None info = socket.getaddrinfo(hostname, 80, proto=socket.IPPROTO_TCP) if not info: @@ -170,7 +170,7 @@ async def connect_ipv4(): sock.settimeout(5) sock.connect((ipv4_host, 53)) return {"result": True} - except socket.timeout: + except TimeoutError: logger.warning(f"Socket connection for host {ipv4_host} failed") return {"result": False} diff --git a/examples/example_fastapi_1.py b/examples/example_fastapi_1.py index 863d0232a..4e6c1c3f2 100644 --- a/examples/example_fastapi_1.py +++ b/examples/example_fastapi_1.py @@ -1,5 +1,3 @@ -from typing import Optional - from fastapi import FastAPI app = FastAPI() @@ -11,10 +9,10 @@ def read_root(): @app.get("/run/{item_id}") -def read_item(item_id: str, q: Optional[str] = None): +def read_item(item_id: str, q: str | None = None): return {"item_id": item_id, "q": q} @app.post("/run/{item_id}") -def read_item_post(item_id: str, q: Optional[str] = None): +def read_item_post(item_id: str, q: str | None = None): return {"item_id_post": item_id, "q": q} diff --git a/runtimes/aleph-debian-12-python/init1.py b/runtimes/aleph-debian-12-python/init1.py index 80ab0fd84..905e12b3a 100644 --- a/runtimes/aleph-debian-12-python/init1.py +++ b/runtimes/aleph-debian-12-python/init1.py @@ -19,13 +19,11 @@ import sys import traceback from collections.abc import AsyncIterable -from contextlib import redirect_stdout from dataclasses import dataclass, field from enum import Enum -from io import StringIO from os import system from shutil import make_archive -from typing import Any, Literal, NewType, Optional, Union, cast +from typing import Any, Literal, NewType, cast import aiohttp import msgpack @@ -66,14 +64,14 @@ class ConfigurationPayload: code: bytes encoding: Encoding entrypoint: str - ip: Optional[str] = None - ipv6: Optional[str] = None - route: Optional[str] = None - ipv6_gateway: Optional[str] = None + ip: str | None = None + ipv6: str | None = None + route: str | None = None + ipv6_gateway: str | None = None dns_servers: list[str] = field(default_factory=list) volumes: list[Volume] = field(default_factory=list) - variables: Optional[dict[str, str]] = None - authorized_keys: Optional[list[str]] = None + variables: dict[str, str] | None = None + authorized_keys: list[str] | None = None @dataclass @@ -107,7 +105,7 @@ def setup_hostname(hostname: str): system(f"hostname {hostname}") -def setup_variables(variables: Optional[dict[str, str]]): +def setup_variables(variables: dict[str, str] | None): if variables is None: return for key, value in variables.items(): @@ -115,11 +113,11 @@ def setup_variables(variables: Optional[dict[str, str]]): def setup_network( - ipv4: Optional[str], - ipv6: Optional[str], - ipv4_gateway: Optional[str], - ipv6_gateway: Optional[str], - dns_servers: Optional[list[str]] = None, + ipv4: str | None, + ipv6: str | None, + ipv4_gateway: str | None, + ipv6_gateway: str | None, + dns_servers: list[str] | None = None, ): """Setup the system with info from the host.""" dns_servers = dns_servers or [] @@ -188,9 +186,7 @@ def setup_volumes(volumes: list[Volume]): system("mount") -async def wait_for_lifespan_event_completion( - application: ASGIApplication, event: Union[Literal["startup", "shutdown"]] -): +async def wait_for_lifespan_event_completion(application: ASGIApplication, event: Literal["startup", "shutdown"]): """ Send the startup lifespan signal to the ASGI app. Specification: https://asgi.readthedocs.io/en/latest/specs/lifespan.html @@ -295,7 +291,7 @@ async def setup_code( encoding: Encoding, entrypoint: str, interface: Interface, -) -> Union[ASGIApplication, subprocess.Popen]: +) -> ASGIApplication | subprocess.Popen: if interface == Interface.asgi: return await setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint) elif interface == Interface.executable: @@ -304,7 +300,7 @@ async def setup_code( raise ValueError("Invalid interface. This should never happen.") -async def run_python_code_http(application: ASGIApplication, scope: dict) -> tuple[dict, dict, str, Optional[bytes]]: +async def run_python_code_http(application: ASGIApplication, scope: dict) -> tuple[dict, dict, str, bytes | None]: logger.debug("Running code") # Execute in the same process, saves ~20ms than a subprocess @@ -386,7 +382,7 @@ def show_loading(): return headers, body -async def run_executable_http(scope: dict) -> tuple[dict, dict, str, Optional[bytes]]: +async def run_executable_http(scope: dict) -> tuple[dict, dict, str, bytes | None]: logger.debug("Calling localhost") tries = 0 @@ -413,7 +409,7 @@ async def run_executable_http(scope: dict) -> tuple[dict, dict, str, Optional[by async def process_instruction( instruction: bytes, interface: Interface, - application: Union[ASGIApplication, subprocess.Popen], + application: ASGIApplication | subprocess.Popen, ) -> AsyncIterable[bytes]: if instruction == b"halt": logger.info("Received halt command") @@ -443,11 +439,11 @@ async def process_instruction( logger.debug("msgpack.loads )") payload = RunCodePayload(**msg_) - output: Optional[str] = None + output: str | None = None try: headers: dict body: dict - output_data: Optional[bytes] + output_data: bytes | None if interface == Interface.asgi: application = cast(ASGIApplication, application) @@ -540,7 +536,7 @@ async def main() -> None: setup_system(config) try: - app: Union[ASGIApplication, subprocess.Popen] = await setup_code( + app: ASGIApplication | subprocess.Popen = await setup_code( config.code, config.encoding, config.entrypoint, config.interface ) client.send(msgpack.dumps({"success": True})) diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 0c6d9cbec..f1f8d75f6 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -9,7 +9,7 @@ from os.path import abspath, exists, isdir, isfile, join from pathlib import Path from subprocess import CalledProcessError, check_output -from typing import Any, List, Literal, NewType, Optional +from typing import Any, Literal, NewType from aleph_message.models import Chain from aleph_message.models.execution.environment import HypervisorType @@ -188,21 +188,21 @@ class Settings(BaseSettings): CONNECTOR_URL = Url("http://localhost:4021") CACHE_ROOT: Path = Path("/var/cache/aleph/vm") - MESSAGE_CACHE: Optional[Path] = Field( + MESSAGE_CACHE: Path | None = Field( None, description="Default to CACHE_ROOT/message", ) - CODE_CACHE: Optional[Path] = Field(None, description="Default to CACHE_ROOT/code") - RUNTIME_CACHE: Optional[Path] = Field(None, description="Default to CACHE_ROOT/runtime") - DATA_CACHE: Optional[Path] = Field(None, description="Default to CACHE_ROOT/data") + CODE_CACHE: Path | None = Field(None, description="Default to CACHE_ROOT/code") + RUNTIME_CACHE: Path | None = Field(None, description="Default to CACHE_ROOT/runtime") + DATA_CACHE: Path | None = Field(None, description="Default to CACHE_ROOT/data") EXECUTION_ROOT: Path = Path("/var/lib/aleph/vm") - JAILER_BASE_DIRECTORY: Optional[Path] = Field(None, description="Default to EXECUTION_ROOT/jailer") - EXECUTION_DATABASE: Optional[Path] = Field( + JAILER_BASE_DIRECTORY: Path | None = Field(None, description="Default to EXECUTION_ROOT/jailer") + EXECUTION_DATABASE: Path | None = Field( None, description="Location of database file. Default to EXECUTION_ROOT/executions.sqlite3" ) EXECUTION_LOG_ENABLED: bool = False - EXECUTION_LOG_DIRECTORY: Optional[Path] = Field( + EXECUTION_LOG_DIRECTORY: Path | None = Field( None, description="Location of executions log. Default to EXECUTION_ROOT/executions/" ) @@ -267,12 +267,12 @@ class Settings(BaseSettings): "with SEV and SEV-ES", ) - CONFIDENTIAL_DIRECTORY: Optional[Path] = Field( + CONFIDENTIAL_DIRECTORY: Path | None = Field( None, description="Confidential Computing default directory. Default to EXECUTION_ROOT/confidential", ) - CONFIDENTIAL_SESSION_DIRECTORY: Optional[Path] = Field(None, description="Default to EXECUTION_ROOT/sessions") + CONFIDENTIAL_SESSION_DIRECTORY: Path | None = Field(None, description="Default to EXECUTION_ROOT/sessions") ENABLE_GPU_SUPPORT: bool = Field( default=False, diff --git a/src/aleph/vm/controllers/qemu/instance.py b/src/aleph/vm/controllers/qemu/instance.py index 259f84744..8252e5f0f 100644 --- a/src/aleph/vm/controllers/qemu/instance.py +++ b/src/aleph/vm/controllers/qemu/instance.py @@ -5,7 +5,7 @@ from asyncio import Task from asyncio.subprocess import Process from pathlib import Path -from typing import Generic, List, TypeVar +from typing import Generic, TypeVar import psutil from aleph_message.models import ItemHash @@ -38,7 +38,7 @@ class AlephQemuResources(AlephFirecrackerResources): - gpus: List[HostGPU] = [] + gpus: list[HostGPU] = [] async def download_runtime(self) -> None: volume = self.message_content.rootfs diff --git a/src/aleph/vm/garbage_collector.py b/src/aleph/vm/garbage_collector.py index 54d182775..1586a7a83 100644 --- a/src/aleph/vm/garbage_collector.py +++ b/src/aleph/vm/garbage_collector.py @@ -98,6 +98,7 @@ def check_api(item_hash): f"systemctl status aleph-vm-controller@{item_hash}.service --no-pager", shell=True, capture_output=True, + check=False, ) exit_code = proc_ret.returncode if exit_code == 0: diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index 7dd59091b..140228dc1 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -6,7 +6,6 @@ from collections.abc import Callable, Coroutine from dataclasses import dataclass from datetime import datetime, timezone -from typing import List from aleph_message.models import ( ExecutableContent, @@ -76,7 +75,7 @@ class VmExecution: AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None ) = None vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None - gpus: List[HostGPU] = [] + gpus: list[HostGPU] = [] times: VmExecutionTimes @@ -223,7 +222,7 @@ async def prepare(self) -> None: self.times.prepared_at = datetime.now(tz=timezone.utc) self.resources = resources - def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None: + def prepare_gpus(self, available_gpus: list[GpuDevice]) -> None: gpus = [] if self.message.requirements and self.message.requirements.gpu: for gpu in self.message.requirements.gpu: diff --git a/src/aleph/vm/orchestrator/resources.py b/src/aleph/vm/orchestrator/resources.py index b6d34a9f0..95704b79b 100644 --- a/src/aleph/vm/orchestrator/resources.py +++ b/src/aleph/vm/orchestrator/resources.py @@ -1,7 +1,6 @@ import math from datetime import datetime, timezone from functools import lru_cache -from typing import List, Optional import cpuinfo import psutil @@ -77,8 +76,8 @@ class MachineProperties(BaseModel): class GpuProperties(BaseModel): - devices: Optional[List[GpuDevice]] - available_devices: Optional[List[GpuDevice]] + devices: list[GpuDevice] | None + available_devices: list[GpuDevice] | None class MachineUsage(BaseModel): diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index edcccd43a..3b7d20eda 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -5,7 +5,6 @@ import logging from collections.abc import Iterable from datetime import datetime, timezone -from typing import List from aleph_message.models import ( Chain, @@ -45,7 +44,7 @@ class VmPool: snapshot_manager: SnapshotManager | None = None systemd_manager: SystemDManager creation_lock: asyncio.Lock - gpus: List[GpuDevice] = [] + gpus: list[GpuDevice] = [] def __init__(self, loop: asyncio.AbstractEventLoop): self.executions = {} @@ -249,7 +248,7 @@ async def load_persistent_executions(self): if execution.is_running: # TODO: Improve the way that we re-create running execution # Load existing GPUs assigned to VMs - execution.gpus = parse_raw_as(List[HostGPU], saved_execution.gpus) if saved_execution.gpus else [] + execution.gpus = parse_raw_as(list[HostGPU], saved_execution.gpus) if saved_execution.gpus else [] # Load and instantiate the rest of resources and already assigned GPUs await execution.prepare() if self.network: @@ -303,7 +302,7 @@ def get_instance_executions(self) -> Iterable[VmExecution]: ) return executions or [] - def get_available_gpus(self) -> List[GpuDevice]: + def get_available_gpus(self) -> list[GpuDevice]: available_gpus = [] for gpu in self.gpus: used = False diff --git a/src/aleph/vm/resources.py b/src/aleph/vm/resources.py index 4776c254c..caa8393a9 100644 --- a/src/aleph/vm/resources.py +++ b/src/aleph/vm/resources.py @@ -1,11 +1,9 @@ import subprocess from enum import Enum -from typing import List, Optional from aleph_message.models import HashableModel from pydantic import BaseModel, Extra, Field -from aleph.vm.conf import settings from aleph.vm.orchestrator.utils import get_compatible_gpus @@ -97,7 +95,7 @@ def is_kernel_enabled_gpu(pci_host: str) -> bool: return False -def parse_gpu_device_info(line: str) -> Optional[GpuDevice]: +def parse_gpu_device_info(line: str) -> GpuDevice | None: """Parse GPU device info from a line of lspci output.""" pci_host, device = line.split(' "', maxsplit=1) @@ -134,7 +132,7 @@ def parse_gpu_device_info(line: str) -> Optional[GpuDevice]: ) -def get_gpu_devices() -> Optional[List[GpuDevice]]: +def get_gpu_devices() -> list[GpuDevice] | None: """Get GPU info using lspci command.""" result = subprocess.run(["lspci", "-mmnnn"], capture_output=True, text=True, check=True) diff --git a/src/aleph/vm/storage.py b/src/aleph/vm/storage.py index 58d6f78c2..5d0ed3cd8 100644 --- a/src/aleph/vm/storage.py +++ b/src/aleph/vm/storage.py @@ -10,7 +10,6 @@ import logging import re import sys -import uuid from datetime import datetime, timezone from pathlib import Path from shutil import copy2, make_archive @@ -382,7 +381,7 @@ async def get_volume_path(volume: MachineVolume, namespace: str) -> Path: raise NotImplementedError(msg) if not re.match(r"^[\w\-_/]+$", volume_name): # Sanitize volume names - logger.debug(f"Invalid values for volume name: {repr(volume_name)} detected, sanitizing") + logger.debug(f"Invalid values for volume name: {volume_name!r} detected, sanitizing") volume_name = re.sub(r"[^\w\-_]", "_", volume_name) (Path(settings.PERSISTENT_VOLUMES_DIR) / namespace).mkdir(exist_ok=True) if volume.parent: diff --git a/tests/supervisor/test_execution.py b/tests/supervisor/test_execution.py index b064a084a..26f1ca9c3 100644 --- a/tests/supervisor/test_execution.py +++ b/tests/supervisor/test_execution.py @@ -1,5 +1,4 @@ import asyncio -import json import logging from typing import Any diff --git a/vm_connector/main.py b/vm_connector/main.py index 86494dd53..a24854ed6 100644 --- a/vm_connector/main.py +++ b/vm_connector/main.py @@ -1,6 +1,5 @@ import json import logging -from typing import Optional import aiohttp from aleph_client.asynchronous import create_post @@ -24,7 +23,7 @@ def read_root(): return {"Server": "Aleph.im VM Connector"} -async def get_latest_message_amend(ref: str, sender: str) -> Optional[dict]: +async def get_latest_message_amend(ref: str, sender: str) -> dict | None: async with aiohttp.ClientSession() as session: url = f"{settings.API_SERVER}/api/v0/messages.json?msgType=STORE&sort_order=-1&refs={ref}&addresses={sender}" resp = await session.get(url) @@ -36,7 +35,7 @@ async def get_latest_message_amend(ref: str, sender: str) -> Optional[dict]: return None -async def get_message(hash_: str) -> Optional[dict]: +async def get_message(hash_: str) -> dict | None: async with aiohttp.ClientSession() as session: url = f"{settings.API_SERVER}/api/v0/messages.json?hashes={hash_}" resp = await session.get(url)