Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
82e83e4
Problem: The server don't have a directory to save the platform certi…
nesitor May 28, 2024
39bee9a
Problem: The aren't an endpoint to be able to get the confidential pl…
nesitor May 28, 2024
5f2ef49
Fix: Solved code quality issues.
nesitor May 28, 2024
1439096
Fix: Added 2 test cases for that endpoint.
nesitor May 28, 2024
3c29477
Fix: Added PR suggestions.
nesitor Jun 4, 2024
7c6c3ac
Fix: Modified test mock to let the tests work
nesitor Jun 4, 2024
d1454a4
Problem: Now isn't possible as a VM operator to get the client sessio…
nesitor Jun 4, 2024
a6559c2
Fix: Remove useless aiofiles import
nesitor Jun 5, 2024
5d7044b
Fix: Solve test issues after code quality fixes
nesitor Jun 5, 2024
0987636
Merge branch 'refs/heads/andres-feature-endpoint_platform_certificate…
nesitor Jun 5, 2024
e37ba59
Fix: Solve code quality issues.
nesitor Jun 5, 2024
017115e
Fix: Solve code quality issues.
nesitor Jun 5, 2024
8fc195c
Fix: Write file in sync mode to avoid adding a new dependency. Files …
nesitor Jun 5, 2024
5d2eb5f
Fix: Solved PR comments and wrong conditionals.
nesitor Jun 5, 2024
68413be
Fix: Solved more PR comments.
nesitor Jun 5, 2024
1b3a7c7
Fix: Removed unexisting import
nesitor Jun 5, 2024
2f66b85
Merge branch 'refs/heads/andres-feature-endpoint_platform_certificate…
nesitor Jun 5, 2024
722db64
Merge branch 'refs/heads/main' into andres-feature-endpoint_platform_…
nesitor Jun 5, 2024
e88e6ea
Merge branch 'refs/heads/andres-feature-endpoint_platform_certificate…
nesitor Jun 5, 2024
e983c4e
Fix: Added useless command requested on the PR review.
nesitor Jun 5, 2024
ffd469e
Fix: Changed endpoint path and added automatic tests for that endpoint.
nesitor Jun 11, 2024
e1c0afa
Fix: Solved settings singleton issue with testing, adding an `initial…
nesitor Jun 12, 2024
e3259f7
Fix: Just disable the setting that is failing and remove previous met…
nesitor Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class Settings(BaseSettings):
description="Confidential Computing default directory. Default to EXECUTION_ROOT/confidential",
)

CONFIDENTIAL_SESSION_DIRECTORY: Path = Field(None, description="Default to EXECUTION_ROOT/sessions")

# Tests on programs

FAKE_DATA_PROGRAM: Optional[Path] = None
Expand Down Expand Up @@ -387,6 +389,7 @@ def check(self):
assert (
check_system_module("kvm_amd/parameters/sev_es") == "Y"
), "SEV-ES feature isn't enabled, enable it in BIOS"
assert is_command_available("sevctl"), "Command `sevctl` not found, run `cargo install sevctl`"

assert self.ENABLE_QEMU_SUPPORT, "Qemu Support is needed for confidential computing and it's disabled, "
"enable it setting the env variable `ENABLE_QEMU_SUPPORT=True` in configuration"
Expand Down Expand Up @@ -415,6 +418,7 @@ def setup(self):
os.makedirs(self.EXECUTION_LOG_DIRECTORY, exist_ok=True)
os.makedirs(self.PERSISTENT_VOLUMES_DIR, exist_ok=True)
os.makedirs(self.CONFIDENTIAL_DIRECTORY, exist_ok=True)
os.makedirs(self.CONFIDENTIAL_SESSION_DIRECTORY, exist_ok=True)

self.API_SERVER = self.API_SERVER.rstrip("/")

Expand Down Expand Up @@ -463,6 +467,8 @@ def __init__(
self.RUNTIME_CACHE = self.CACHE_ROOT / "runtime"
if not self.DATA_CACHE:
self.DATA_CACHE = self.CACHE_ROOT / "data"
if not self.CONFIDENTIAL_DIRECTORY:
self.CONFIDENTIAL_DIRECTORY = self.CACHE_ROOT / "confidential"
if not self.JAILER_BASE_DIRECTORY:
self.JAILER_BASE_DIRECTORY = self.EXECUTION_ROOT / "jailer"
if not self.PERSISTENT_VOLUMES_DIR:
Expand All @@ -473,8 +479,8 @@ def __init__(
self.EXECUTION_LOG_DIRECTORY = self.EXECUTION_ROOT / "executions"
if not self.JAILER_BASE_DIR:
self.JAILER_BASE_DIR = self.EXECUTION_ROOT / "jailer"
if not self.CONFIDENTIAL_DIRECTORY:
self.CONFIDENTIAL_DIRECTORY = self.CACHE_ROOT / "confidential"
if not self.CONFIDENTIAL_SESSION_DIRECTORY:
self.CONFIDENTIAL_SESSION_DIRECTORY = self.EXECUTION_ROOT / "sessions"

class Config:
env_prefix = "ALEPH_VM_"
Expand Down
4 changes: 4 additions & 0 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def is_program(self) -> bool:
def is_instance(self) -> bool:
return isinstance(self.message, InstanceContent)

@property
def is_confidential(self) -> bool:
return self.uses_payment_stream # TODO: check also if the VM message is confidential

@property
def hypervisor(self) -> HypervisorType:
if self.is_program:
Expand Down
13 changes: 13 additions & 0 deletions src/aleph/vm/orchestrator/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pydantic import BaseModel, Field

from aleph.vm.conf import settings
from aleph.vm.sevclient import SevClient
from aleph.vm.utils import cors_allow_all


Expand Down Expand Up @@ -122,6 +123,18 @@ async def about_system_usage(_: web.Request):
return web.json_response(text=usage.json(exclude_none=True))


@cors_allow_all
async def about_certificates(request: web.Request):
"""Public endpoint to expose platform certificates for confidential computing."""

if not settings.ENABLE_CONFIDENTIAL_COMPUTING:
return web.HTTPBadRequest(reason="Confidential computing setting not enabled on that server")

sev_client: SevClient = request.app["sev_client"]

return web.FileResponse(await sev_client.get_certificates())


class Allocation(BaseModel):
"""An allocation is the set of resources that are currently allocated on this orchestrator.
It contains the item_hashes of all persistent VMs, instances, on-demand VMs and jobs.
Expand Down
12 changes: 11 additions & 1 deletion src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

from aleph.vm.conf import settings
from aleph.vm.pool import VmPool
from aleph.vm.sevclient import SevClient
from aleph.vm.version import __version__

from .metrics import create_tables, setup_engine
from .resources import about_system_usage
from .resources import about_certificates, about_system_usage
from .tasks import (
start_payment_monitoring_task,
start_watch_for_messages_task,
Expand All @@ -46,6 +47,7 @@
update_allocations,
)
from .views.operator import (
operate_confidential_initialize,
operate_erase,
operate_expire,
operate_reboot,
Expand Down Expand Up @@ -95,11 +97,13 @@ def setup_webapp():
web.get("/about/executions/details", about_executions),
web.get("/about/executions/records", about_execution_records),
web.get("/about/usage/system", about_system_usage),
web.get("/about/certificates", about_certificates),
web.get("/about/config", about_config),
# /control APIs are used to control the VMs and access their logs
web.post("/control/allocation/notify", notify_allocation),
web.get("/control/machine/{ref}/logs", stream_logs),
web.post("/control/machine/{ref}/expire", operate_expire),
web.post("/control/machine/{ref}/confidential/initialize", operate_confidential_initialize),
web.post("/control/machine/{ref}/stop", operate_stop),
web.post("/control/machine/{ref}/erase", operate_erase),
web.post("/control/machine/{ref}/reboot", operate_reboot),
Expand Down Expand Up @@ -159,6 +163,12 @@ def run():
app["secret_token"] = secret_token
app["vm_pool"] = pool

# Store sevctl app singleton only if confidential feature is enabled
if settings.ENABLE_CONFIDENTIAL_COMPUTING:
sev_client = SevClient(settings.CONFIDENTIAL_DIRECTORY)
app["sev_client"] = sev_client
# TODO: Review and check sevctl first initialization steps, like (sevctl generate and sevctl provision)

logger.debug(f"Login to /about pages {protocol}://{hostname}/about/login?token={secret_token}")

try:
Expand Down
45 changes: 45 additions & 0 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from aleph_message.models import ItemHash
from aleph_message.models.execution import BaseExecutableContent

from aleph.vm.conf import settings
from aleph.vm.models import VmExecution
from aleph.vm.orchestrator.run import create_vm_execution
from aleph.vm.orchestrator.views import authenticate_api_request
Expand Down Expand Up @@ -133,6 +134,50 @@ async def operate_expire(request: web.Request, authenticated_sender: str) -> web
return web.Response(status=200, body=f"Expiring VM with ref {vm_hash} in {timeout} seconds")


@cors_allow_all
@require_jwk_authentication
async def operate_confidential_initialize(request: web.Request, authenticated_sender: str) -> web.Response:
"""Start the confidential virtual machine if possible."""
# TODO: Add user authentication
vm_hash = get_itemhash_or_400(request.match_info)

pool: VmPool = request.app["vm_pool"]
logger.debug(f"Iterating through running executions... {pool.executions}")
execution = get_execution_or_404(vm_hash, pool=pool)

if not is_sender_authorized(authenticated_sender, execution.message):
return web.Response(status=403, body="Unauthorized sender")

if execution.is_running:
return web.Response(status=403, body=f"VM with ref {vm_hash} already running")

if not execution.is_confidential:
return web.Response(status=403, body=f"Operation not allowed for VM {vm_hash} because it isn't confidential")

post = await request.post()

vm_session_path = settings.CONFIDENTIAL_SESSION_DIRECTORY / vm_hash
vm_session_path.mkdir(exist_ok=True)

session_file_content = post.get("session")
if not session_file_content:
return web.Response(status=403, body=f"Session file required for VM with ref {vm_hash}")

session_file_path = vm_session_path / "vm_session.b64"
session_file_path.write_bytes(session_file_content.file.read())

godh_file_content = post.get("godh")
if not godh_file_content:
return web.Response(status=403, body=f"GODH file required for VM with ref {vm_hash}")

godh_file_path = vm_session_path / "vm_godh.b64"
godh_file_path.write_bytes(godh_file_content.file.read())

pool.systemd_manager.enable_and_start(execution.controller_service)

return web.Response(status=200, body=f"Started VM with ref {vm_hash}")


@cors_allow_all
@require_jwk_authentication
async def operate_stop(request: web.Request, authenticated_sender: str) -> web.Response:
Expand Down
4 changes: 3 additions & 1 deletion src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ async def create_a_vm(
await execution.start()

# Start VM and snapshots automatically
if execution.persistent:
# If the execution is confidential, don't start it because we need to wait for the session certificate
# files, use the endpoint /control/machine/{ref}/start to get session files and start the VM
if execution.persistent and not execution.is_confidential:
self.systemd_manager.enable_and_start(execution.controller_service)
await execution.wait_for_init()
if execution.is_program and execution.vm:
Expand Down
23 changes: 23 additions & 0 deletions src/aleph/vm/sevclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path

from aleph.vm.utils import run_in_subprocess


class SevClient:
def __init__(self, sev_dir: Path):
self.sev_dir = sev_dir
self.certificates_dir = sev_dir / "platform"
self.certificates_dir.mkdir(exist_ok=True, parents=True)
self.certificates_archive = self.certificates_dir / "certs_export.cert"

async def sevctl_cmd(self, *args) -> bytes:
return await run_in_subprocess(
["sevctl", *args],
check=True,
)

async def get_certificates(self) -> Path:
if not self.certificates_archive.is_file():
_ = await self.sevctl_cmd("export", str(self.certificates_archive))

return self.certificates_archive
136 changes: 136 additions & 0 deletions tests/supervisor/test_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import io
import tempfile
from pathlib import Path
from unittest import mock
from unittest.mock import MagicMock

import aiohttp
import pytest
from aleph_message.models import ItemHash

from aleph.vm.conf import settings
from aleph.vm.orchestrator.supervisor import setup_webapp
from aleph.vm.storage import get_message


@pytest.mark.asyncio
async def test_operator_confidential_initialize_not_authorized(aiohttp_client):
"""Test that the confidential initialize endpoint rejects if the sender is not the good one. Auth needed"""

settings.ENABLE_QEMU_SUPPORT = True
settings.ENABLE_CONFIDENTIAL_COMPUTING = True
settings.setup()

class FakeExecution:
message = None
is_running: bool = True
is_confidential: bool = False

class FakeVmPool:
executions: dict[ItemHash, FakeExecution] = {}

def __init__(self):
self.executions[settings.FAKE_INSTANCE_ID] = FakeExecution()

with mock.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value="",
):
with mock.patch(
"aleph.vm.orchestrator.views.operator.is_sender_authorized",
return_value=False,
) as is_sender_authorized_mock:
app = setup_webapp()
app["vm_pool"] = FakeVmPool()
client = await aiohttp_client(app)
response = await client.post(
f"/control/machine/{settings.FAKE_INSTANCE_ID}/confidential/initialize",
)
assert response.status == 403
assert await response.text() == "Unauthorized sender"
is_sender_authorized_mock.assert_called_once()


@pytest.mark.asyncio
async def test_operator_confidential_initialize_already_running(aiohttp_client):
"""Test that the confidential initialize endpoint rejects if the VM is already running. Auth needed"""

settings.ENABLE_QEMU_SUPPORT = True
settings.ENABLE_CONFIDENTIAL_COMPUTING = True
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)

class FakeExecution:
message = instance_message.content
is_running: bool = True
is_confidential: bool = False

class FakeVmPool:
executions: dict[ItemHash, FakeExecution] = {}

def __init__(self):
self.executions[vm_hash] = FakeExecution()

with mock.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=instance_message.sender,
):
app = setup_webapp()
app["vm_pool"] = FakeVmPool()
client = await aiohttp_client(app)
response = await client.post(
f"/control/machine/{vm_hash}/confidential/initialize",
json={"persistent_vms": []},
)
assert response.status == 403
assert await response.text() == f"VM with ref {vm_hash} already running"


@pytest.mark.asyncio
async def test_operator_confidential_initialize(aiohttp_client):
"""Test that the certificates system endpoint responds. No auth needed"""

settings.ENABLE_QEMU_SUPPORT = True
settings.ENABLE_CONFIDENTIAL_COMPUTING = True
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)

class FakeExecution:
message = instance_message.content
is_running: bool = False
is_confidential: bool = True
controller_service: str = ""

class MockSystemDManager:
enable_and_start = MagicMock(return_value=True)

class FakeVmPool:
executions: dict[ItemHash, FakeExecution] = {}

def __init__(self):
self.executions[vm_hash] = FakeExecution()
self.systemd_manager = MockSystemDManager()

with tempfile.NamedTemporaryFile() as temp_file:
form_data = aiohttp.FormData()
form_data.add_field("session", open(temp_file.name, "rb"), filename="session.b64")
form_data.add_field("godh", open(temp_file.name, "rb"), filename="godh.b64")

with mock.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=instance_message.sender,
):
app = setup_webapp()
app["vm_pool"] = FakeVmPool()
client = await aiohttp_client(app)
response = await client.post(
f"/control/machine/{vm_hash}/confidential/initialize",
data=form_data,
)
assert response.status == 200
assert await response.text() == f"Started VM with ref {vm_hash}"
app["vm_pool"].systemd_manager.enable_and_start.assert_called_once()
2 changes: 2 additions & 0 deletions tests/supervisor/test_qemu_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async def test_create_qemu_instance():
settings.USE_FAKE_INSTANCE_BASE = True
settings.FAKE_INSTANCE_MESSAGE = settings.FAKE_INSTANCE_QEMU_MESSAGE
settings.FAKE_INSTANCE_BASE = settings.FAKE_QEMU_INSTANCE_BASE
settings.ENABLE_CONFIDENTIAL_COMPUTING = False
settings.ALLOW_VM_NETWORKING = False
settings.USE_JAILER = False

Expand Down Expand Up @@ -108,6 +109,7 @@ async def test_create_qemu_instance_online():
settings.USE_FAKE_INSTANCE_BASE = True
settings.FAKE_INSTANCE_MESSAGE = settings.FAKE_INSTANCE_QEMU_MESSAGE
settings.FAKE_INSTANCE_BASE = settings.FAKE_QEMU_INSTANCE_BASE
settings.ENABLE_CONFIDENTIAL_COMPUTING = False
settings.ALLOW_VM_NETWORKING = True
settings.USE_JAILER = False

Expand Down
Loading