Skip to content

Commit a6e8e38

Browse files
committed
Usage system: remove reserved usage for volumes
of vm when calculate available disk space Jira: ALEPH-420 This will help the scheduler scheduler more properly the available resource on CRN Alternative approach to #780 as Sparse file were not properly created.
1 parent bd20595 commit a6e8e38

File tree

5 files changed

+66
-24
lines changed

5 files changed

+66
-24
lines changed

src/aleph/vm/controllers/firecracker/executable.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from aiohttp import ClientResponseError
1414
from aleph_message.models import ExecutableContent, ItemHash
1515
from aleph_message.models.execution.environment import MachineResources
16-
from aleph_message.models.execution.volume import PersistentVolume
16+
from aleph_message.models.execution.volume import MachineVolume, PersistentVolume
1717

1818
from aleph.vm.conf import settings
1919
from aleph.vm.controllers.configuration import (
@@ -71,6 +71,7 @@ class HostVolume:
7171
mount: str
7272
path_on_host: Path
7373
read_only: bool
74+
size_mib: int | None
7475

7576

7677
@dataclass
@@ -100,6 +101,35 @@ class AlephFirecrackerResources:
100101
volumes: list[HostVolume]
101102
namespace: str
102103

104+
def get_disk_usage_delta(self) -> int:
105+
"""Difference between the size requested and what is currently used on disk.
106+
107+
Count rootfs and volumes.
108+
Used to calculate an estimate of space resource available for use.
109+
Value in bytes and is negative"""
110+
111+
total_delta = 0
112+
# Root fs
113+
if hasattr(self.message_content, "rootfs"):
114+
volume = self.message_content.rootfs
115+
used_size = self.rootfs_path.stat().st_size if self.rootfs_path.exists() else 0
116+
requested_size = int(volume.size_mib * 1024 * 1024)
117+
size_delta = used_size - requested_size
118+
total_delta += size_delta
119+
120+
# Count each extra volume
121+
for volume in self.volumes:
122+
if not volume.size_mib:
123+
# planned size not set on immutable volume
124+
size_delta = 0
125+
else:
126+
used_size = volume.path_on_host.stat().st_size if volume.path_on_host.exists() else 0
127+
requested_size = int(volume.size_mib * 1024 * 1024)
128+
129+
size_delta = used_size - requested_size
130+
total_delta += size_delta
131+
return total_delta
132+
103133
def __init__(self, message_content: ExecutableContent, namespace: str):
104134
self.message_content = message_content
105135
self.namespace = namespace
@@ -115,8 +145,9 @@ async def download_kernel(self):
115145
async def download_volumes(self):
116146
volumes = []
117147
# TODO: Download in parallel and prevent duplicated volume names
148+
volume: MachineVolume
118149
for i, volume in enumerate(self.message_content.volumes):
119-
# only persistant volume has name and mount
150+
# only persistent volume has name and mount
120151
if isinstance(volume, PersistentVolume):
121152
if not volume.name:
122153
volume.name = f"unamed_volume_{i}"
@@ -127,6 +158,7 @@ async def download_volumes(self):
127158
mount=volume.mount,
128159
path_on_host=(await get_volume_path(volume=volume, namespace=self.namespace)),
129160
read_only=volume.is_read_only(),
161+
size_mib=getattr(volume, "size_mib", None),
130162
)
131163
)
132164
self.volumes = volumes

src/aleph/vm/controllers/qemu/instance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async def make_writable_volume(self, parent_image_path, volume: PersistentVolume
7272
raise VmSetupError(msg)
7373

7474
dest_path = settings.PERSISTENT_VOLUMES_DIR / self.namespace / f"{volume_name}.qcow2"
75-
# Do not override if user asked for host persistance.
75+
# Do not override if user asked for host persistence.
7676
if dest_path.exists() and volume.persistence == VolumePersistence.host:
7777
return dest_path
7878

src/aleph/vm/orchestrator/resources.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ async def about_system_usage(request: web.Request):
133133
"""Public endpoint to expose information about the system usage."""
134134
period_start = datetime.now(timezone.utc).replace(second=0, microsecond=0)
135135
machine_properties = get_machine_properties()
136+
pool = request.app["vm_pool"]
136137

137138
usage: MachineUsage = MachineUsage(
138139
cpu=CpuUsage(
@@ -146,7 +147,8 @@ async def about_system_usage(request: web.Request):
146147
),
147148
disk=DiskUsage(
148149
total_kB=psutil.disk_usage(str(settings.PERSISTENT_VOLUMES_DIR)).total // 1000,
149-
available_kB=psutil.disk_usage(str(settings.PERSISTENT_VOLUMES_DIR)).free // 1000,
150+
available_kB=pool.calculate_available_disk() // 1000,
151+
# available_kB=psutil.disk_usage(str(settings.PERSISTENT_VOLUMES_DIR)).free // 1000,
150152
),
151153
period=UsagePeriod(
152154
start_timestamp=period_start,

src/aleph/vm/pool.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import shutil
67
from collections.abc import Iterable
78
from datetime import datetime, timedelta, timezone
89
from typing import Any, cast
@@ -101,6 +102,29 @@ def teardown(self) -> None:
101102
# upon supervisor restart or upgrade.
102103
pass
103104

105+
def calculate_available_disk(self):
106+
"""Disk available for the creation of new VM.
107+
108+
This take into account the disk request (but not used) for Volume of executions in the pool"""
109+
free_space = shutil.disk_usage(str(settings.PERSISTENT_VOLUMES_DIR)).free // 1000
110+
# Free disk space reported by system
111+
112+
# Calculate the reservation
113+
total_delta = 0
114+
for execution in self.executions.values():
115+
if not execution.resources:
116+
continue
117+
delta = execution.resources.get_disk_usage_delta()
118+
logger.warning("Disk usage delta: %d for %s", delta, execution.vm_hash)
119+
total_delta += delta
120+
available_space = free_space - total_delta
121+
logger.info(
122+
"Disk: freespace : %.f Mb, available space (non reserved) %.f Mb",
123+
free_space / 1024**2,
124+
available_space / 1024**2,
125+
)
126+
return available_space
127+
104128
async def create_a_vm(
105129
self, vm_hash: ItemHash, message: ExecutableContent, original: ExecutableContent, persistent: bool
106130
) -> VmExecution:

tests/supervisor/test_views.py

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,10 @@ async def test_allocation_fails_on_invalid_item_hash(aiohttp_client):
3939

4040

4141
@pytest.mark.asyncio
42-
async def test_system_usage(aiohttp_client):
42+
async def test_system_usage(aiohttp_client, mocker, mock_app_with_pool):
4343
"""Test that the usage system endpoints responds. No auth needed"""
4444

45-
class FakeVmPool:
46-
gpus = []
47-
48-
def get_available_gpus(self):
49-
return []
50-
51-
app = setup_webapp()
52-
app["vm_pool"] = FakeVmPool()
53-
client = await aiohttp_client(app)
45+
client = await aiohttp_client(await mock_app_with_pool)
5446
response: web.Response = await client.get("/about/usage/system")
5547
assert response.status == 200
5648
# check if it is valid json
@@ -60,15 +52,9 @@ def get_available_gpus(self):
6052

6153

6254
@pytest.mark.asyncio
63-
async def test_system_usage_mock(aiohttp_client, mocker):
55+
async def test_system_usage_mock(aiohttp_client, mocker, mock_app_with_pool):
6456
"""Test that the usage system endpoints response value. No auth needed"""
6557

66-
class FakeVmPool:
67-
gpus = []
68-
69-
def get_available_gpus(self):
70-
return []
71-
7258
mocker.patch(
7359
"cpuinfo.cpuinfo.get_cpu_info",
7460
{
@@ -85,9 +71,7 @@ def get_available_gpus(self):
8571
lambda: 200,
8672
)
8773

88-
app = setup_webapp()
89-
app["vm_pool"] = FakeVmPool()
90-
client = await aiohttp_client(app)
74+
client = await aiohttp_client(await mock_app_with_pool)
9175
response: web.Response = await client.get("/about/usage/system")
9276
assert response.status == 200
9377
# check if it is valid json

0 commit comments

Comments
 (0)