Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/example_fastapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import datetime, timezone
from os import listdir
from pathlib import Path
from typing import Any, Optional
from typing import Any

import aiohttp
from aleph_message.models import (
Expand Down Expand Up @@ -128,8 +128,8 @@ async def read_aleph_messages() -> dict[str, MessagesResponse]:
async def resolve_dns_hostname():
"""Check if DNS resolution is working."""
hostname = "example.org"
ipv4: Optional[str] = None
ipv6: Optional[str] = None
ipv4: str | None = None
ipv6: str | None = None

info = socket.getaddrinfo(hostname, 80, proto=socket.IPPROTO_TCP)
if not info:
Expand Down Expand Up @@ -170,7 +170,7 @@ async def connect_ipv4():
sock.settimeout(5)
sock.connect((ipv4_host, 53))
return {"result": True}
except socket.timeout:
except TimeoutError:
logger.warning(f"Socket connection for host {ipv4_host} failed")
return {"result": False}

Expand Down
6 changes: 2 additions & 4 deletions examples/example_fastapi_1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Optional

from fastapi import FastAPI

app = FastAPI()
Expand All @@ -11,10 +9,10 @@ def read_root():


@app.get("/run/{item_id}")
def read_item(item_id: str, q: Optional[str] = None):
def read_item(item_id: str, q: str | None = None):
return {"item_id": item_id, "q": q}


@app.post("/run/{item_id}")
def read_item_post(item_id: str, q: Optional[str] = None):
def read_item_post(item_id: str, q: str | None = None):
return {"item_id_post": item_id, "q": q}
46 changes: 21 additions & 25 deletions runtimes/aleph-debian-12-python/init1.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
import sys
import traceback
from collections.abc import AsyncIterable
from contextlib import redirect_stdout
from dataclasses import dataclass, field
from enum import Enum
from io import StringIO
from os import system
from shutil import make_archive
from typing import Any, Literal, NewType, Optional, Union, cast
from typing import Any, Literal, NewType, cast

import aiohttp
import msgpack
Expand Down Expand Up @@ -66,14 +64,14 @@ class ConfigurationPayload:
code: bytes
encoding: Encoding
entrypoint: str
ip: Optional[str] = None
ipv6: Optional[str] = None
route: Optional[str] = None
ipv6_gateway: Optional[str] = None
ip: str | None = None
ipv6: str | None = None
route: str | None = None
ipv6_gateway: str | None = None
dns_servers: list[str] = field(default_factory=list)
volumes: list[Volume] = field(default_factory=list)
variables: Optional[dict[str, str]] = None
authorized_keys: Optional[list[str]] = None
variables: dict[str, str] | None = None
authorized_keys: list[str] | None = None


@dataclass
Expand Down Expand Up @@ -107,19 +105,19 @@ def setup_hostname(hostname: str):
system(f"hostname {hostname}")


def setup_variables(variables: Optional[dict[str, str]]):
def setup_variables(variables: dict[str, str] | None):
if variables is None:
return
for key, value in variables.items():
os.environ[key] = value


def setup_network(
ipv4: Optional[str],
ipv6: Optional[str],
ipv4_gateway: Optional[str],
ipv6_gateway: Optional[str],
dns_servers: Optional[list[str]] = None,
ipv4: str | None,
ipv6: str | None,
ipv4_gateway: str | None,
ipv6_gateway: str | None,
dns_servers: list[str] | None = None,
):
"""Setup the system with info from the host."""
dns_servers = dns_servers or []
Expand Down Expand Up @@ -188,9 +186,7 @@ def setup_volumes(volumes: list[Volume]):
system("mount")


async def wait_for_lifespan_event_completion(
application: ASGIApplication, event: Union[Literal["startup", "shutdown"]]
):
async def wait_for_lifespan_event_completion(application: ASGIApplication, event: Literal["startup", "shutdown"]):
"""
Send the startup lifespan signal to the ASGI app.
Specification: https://asgi.readthedocs.io/en/latest/specs/lifespan.html
Expand Down Expand Up @@ -295,7 +291,7 @@ async def setup_code(
encoding: Encoding,
entrypoint: str,
interface: Interface,
) -> Union[ASGIApplication, subprocess.Popen]:
) -> ASGIApplication | subprocess.Popen:
if interface == Interface.asgi:
return await setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint)
elif interface == Interface.executable:
Expand All @@ -304,7 +300,7 @@ async def setup_code(
raise ValueError("Invalid interface. This should never happen.")


async def run_python_code_http(application: ASGIApplication, scope: dict) -> tuple[dict, dict, str, Optional[bytes]]:
async def run_python_code_http(application: ASGIApplication, scope: dict) -> tuple[dict, dict, str, bytes | None]:
logger.debug("Running code")
# Execute in the same process, saves ~20ms than a subprocess

Expand Down Expand Up @@ -386,7 +382,7 @@ def show_loading():
return headers, body


async def run_executable_http(scope: dict) -> tuple[dict, dict, str, Optional[bytes]]:
async def run_executable_http(scope: dict) -> tuple[dict, dict, str, bytes | None]:
logger.debug("Calling localhost")

tries = 0
Expand All @@ -413,7 +409,7 @@ async def run_executable_http(scope: dict) -> tuple[dict, dict, str, Optional[by
async def process_instruction(
instruction: bytes,
interface: Interface,
application: Union[ASGIApplication, subprocess.Popen],
application: ASGIApplication | subprocess.Popen,
) -> AsyncIterable[bytes]:
if instruction == b"halt":
logger.info("Received halt command")
Expand Down Expand Up @@ -443,11 +439,11 @@ async def process_instruction(
logger.debug("msgpack.loads )")
payload = RunCodePayload(**msg_)

output: Optional[str] = None
output: str | None = None
try:
headers: dict
body: dict
output_data: Optional[bytes]
output_data: bytes | None

if interface == Interface.asgi:
application = cast(ASGIApplication, application)
Expand Down Expand Up @@ -540,7 +536,7 @@ async def main() -> None:
setup_system(config)

try:
app: Union[ASGIApplication, subprocess.Popen] = await setup_code(
app: ASGIApplication | subprocess.Popen = await setup_code(
config.code, config.encoding, config.entrypoint, config.interface
)
client.send(msgpack.dumps({"success": True}))
Expand Down
20 changes: 10 additions & 10 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from os.path import abspath, exists, isdir, isfile, join
from pathlib import Path
from subprocess import CalledProcessError, check_output
from typing import Any, List, Literal, NewType, Optional
from typing import Any, Literal, NewType

from aleph_message.models import Chain
from aleph_message.models.execution.environment import HypervisorType
Expand Down Expand Up @@ -188,21 +188,21 @@ class Settings(BaseSettings):
CONNECTOR_URL = Url("http://localhost:4021")

CACHE_ROOT: Path = Path("/var/cache/aleph/vm")
MESSAGE_CACHE: Optional[Path] = Field(
MESSAGE_CACHE: Path | None = Field(
None,
description="Default to CACHE_ROOT/message",
)
CODE_CACHE: Optional[Path] = Field(None, description="Default to CACHE_ROOT/code")
RUNTIME_CACHE: Optional[Path] = Field(None, description="Default to CACHE_ROOT/runtime")
DATA_CACHE: Optional[Path] = Field(None, description="Default to CACHE_ROOT/data")
CODE_CACHE: Path | None = Field(None, description="Default to CACHE_ROOT/code")
RUNTIME_CACHE: Path | None = Field(None, description="Default to CACHE_ROOT/runtime")
DATA_CACHE: Path | None = Field(None, description="Default to CACHE_ROOT/data")

EXECUTION_ROOT: Path = Path("/var/lib/aleph/vm")
JAILER_BASE_DIRECTORY: Optional[Path] = Field(None, description="Default to EXECUTION_ROOT/jailer")
EXECUTION_DATABASE: Optional[Path] = Field(
JAILER_BASE_DIRECTORY: Path | None = Field(None, description="Default to EXECUTION_ROOT/jailer")
EXECUTION_DATABASE: Path | None = Field(
None, description="Location of database file. Default to EXECUTION_ROOT/executions.sqlite3"
)
EXECUTION_LOG_ENABLED: bool = False
EXECUTION_LOG_DIRECTORY: Optional[Path] = Field(
EXECUTION_LOG_DIRECTORY: Path | None = Field(
None, description="Location of executions log. Default to EXECUTION_ROOT/executions/"
)

Expand Down Expand Up @@ -267,12 +267,12 @@ class Settings(BaseSettings):
"with SEV and SEV-ES",
)

CONFIDENTIAL_DIRECTORY: Optional[Path] = Field(
CONFIDENTIAL_DIRECTORY: Path | None = Field(
None,
description="Confidential Computing default directory. Default to EXECUTION_ROOT/confidential",
)

CONFIDENTIAL_SESSION_DIRECTORY: Optional[Path] = Field(None, description="Default to EXECUTION_ROOT/sessions")
CONFIDENTIAL_SESSION_DIRECTORY: Path | None = Field(None, description="Default to EXECUTION_ROOT/sessions")

ENABLE_GPU_SUPPORT: bool = Field(
default=False,
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/controllers/qemu/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Task
from asyncio.subprocess import Process
from pathlib import Path
from typing import Generic, List, TypeVar
from typing import Generic, TypeVar

import psutil
from aleph_message.models import ItemHash
Expand Down Expand Up @@ -38,7 +38,7 @@


class AlephQemuResources(AlephFirecrackerResources):
gpus: List[HostGPU] = []
gpus: list[HostGPU] = []

async def download_runtime(self) -> None:
volume = self.message_content.rootfs
Expand Down
1 change: 1 addition & 0 deletions src/aleph/vm/garbage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def check_api(item_hash):
f"systemctl status aleph-vm-controller@{item_hash}.service --no-pager",
shell=True,
capture_output=True,
check=False,
)
exit_code = proc_ret.returncode
if exit_code == 0:
Expand Down
5 changes: 2 additions & 3 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List

from aleph_message.models import (
ExecutableContent,
Expand Down Expand Up @@ -76,7 +75,7 @@ class VmExecution:
AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None
) = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None
gpus: List[HostGPU] = []
gpus: list[HostGPU] = []

times: VmExecutionTimes

Expand Down Expand Up @@ -223,7 +222,7 @@ async def prepare(self) -> None:
self.times.prepared_at = datetime.now(tz=timezone.utc)
self.resources = resources

def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None:
def prepare_gpus(self, available_gpus: list[GpuDevice]) -> None:
gpus = []
if self.message.requirements and self.message.requirements.gpu:
for gpu in self.message.requirements.gpu:
Expand Down
5 changes: 2 additions & 3 deletions src/aleph/vm/orchestrator/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import math
from datetime import datetime, timezone
from functools import lru_cache
from typing import List, Optional

import cpuinfo
import psutil
Expand Down Expand Up @@ -77,8 +76,8 @@ class MachineProperties(BaseModel):


class GpuProperties(BaseModel):
devices: Optional[List[GpuDevice]]
available_devices: Optional[List[GpuDevice]]
devices: list[GpuDevice] | None
available_devices: list[GpuDevice] | None


class MachineUsage(BaseModel):
Expand Down
7 changes: 3 additions & 4 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
from collections.abc import Iterable
from datetime import datetime, timezone
from typing import List

from aleph_message.models import (
Chain,
Expand Down Expand Up @@ -45,7 +44,7 @@
snapshot_manager: SnapshotManager | None = None
systemd_manager: SystemDManager
creation_lock: asyncio.Lock
gpus: List[GpuDevice] = []
gpus: list[GpuDevice] = []

def __init__(self, loop: asyncio.AbstractEventLoop):
self.executions = {}
Expand Down Expand Up @@ -249,7 +248,7 @@
if execution.is_running:
# TODO: Improve the way that we re-create running execution
# Load existing GPUs assigned to VMs
execution.gpus = parse_raw_as(List[HostGPU], saved_execution.gpus) if saved_execution.gpus else []
execution.gpus = parse_raw_as(list[HostGPU], saved_execution.gpus) if saved_execution.gpus else []

Check warning on line 251 in src/aleph/vm/pool.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/pool.py#L251

Added line #L251 was not covered by tests
# Load and instantiate the rest of resources and already assigned GPUs
await execution.prepare()
if self.network:
Expand Down Expand Up @@ -303,7 +302,7 @@
)
return executions or []

def get_available_gpus(self) -> List[GpuDevice]:
def get_available_gpus(self) -> list[GpuDevice]:
available_gpus = []
for gpu in self.gpus:
used = False
Expand Down
6 changes: 2 additions & 4 deletions src/aleph/vm/resources.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import subprocess
from enum import Enum
from typing import List, Optional

from aleph_message.models import HashableModel
from pydantic import BaseModel, Extra, Field

from aleph.vm.conf import settings
from aleph.vm.orchestrator.utils import get_compatible_gpus


Expand Down Expand Up @@ -97,7 +95,7 @@ def is_kernel_enabled_gpu(pci_host: str) -> bool:
return False


def parse_gpu_device_info(line: str) -> Optional[GpuDevice]:
def parse_gpu_device_info(line: str) -> GpuDevice | None:
"""Parse GPU device info from a line of lspci output."""

pci_host, device = line.split(' "', maxsplit=1)
Expand Down Expand Up @@ -134,7 +132,7 @@ def parse_gpu_device_info(line: str) -> Optional[GpuDevice]:
)


def get_gpu_devices() -> Optional[List[GpuDevice]]:
def get_gpu_devices() -> list[GpuDevice] | None:
"""Get GPU info using lspci command."""

result = subprocess.run(["lspci", "-mmnnn"], capture_output=True, text=True, check=True)
Expand Down
3 changes: 1 addition & 2 deletions src/aleph/vm/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import logging
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from shutil import copy2, make_archive
Expand Down Expand Up @@ -382,7 +381,7 @@
raise NotImplementedError(msg)
if not re.match(r"^[\w\-_/]+$", volume_name):
# Sanitize volume names
logger.debug(f"Invalid values for volume name: {repr(volume_name)} detected, sanitizing")
logger.debug(f"Invalid values for volume name: {volume_name!r} detected, sanitizing")

Check warning on line 384 in src/aleph/vm/storage.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/storage.py#L384

Added line #L384 was not covered by tests
volume_name = re.sub(r"[^\w\-_]", "_", volume_name)
(Path(settings.PERSISTENT_VOLUMES_DIR) / namespace).mkdir(exist_ok=True)
if volume.parent:
Expand Down
1 change: 0 additions & 1 deletion tests/supervisor/test_execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import json
import logging
from typing import Any

Expand Down
Loading