Skip to content

Commit 92526af

Browse files
committed
Add lock arround the update_allocation commands
This prevent two allocation from running in parallel and causing racing error
1 parent d51b57a commit 92526af

File tree

2 files changed

+85
-78
lines changed

2 files changed

+85
-78
lines changed

src/aleph/vm/hypervisors/firecracker/microvm.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,6 @@ async def teardown(self):
474474
await asyncio.sleep(1)
475475
await self.stop()
476476

477-
478477
if (
479478
self.journal_stdout
480479
and self.journal_stdout != asyncio.subprocess.DEVNULL

src/aleph/vm/orchestrator/views/__init__.py

Lines changed: 85 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import binascii
23
import logging
34
from decimal import Decimal
@@ -370,6 +371,9 @@ def authenticate_api_request(request: web.Request) -> bool:
370371
return sha256(signature).hexdigest() == settings.ALLOCATION_TOKEN_HASH
371372

372373

374+
allocation_lock = None
375+
376+
373377
async def update_allocations(request: web.Request):
374378
"""Main entry for the start of persistence VM and instance, called by the Scheduler,
375379
@@ -380,6 +384,9 @@ async def update_allocations(request: web.Request):
380384
if not authenticate_api_request(request):
381385
return web.HTTPUnauthorized(text="Authentication token received is invalid")
382386

387+
global allocation_lock
388+
if allocation_lock is None:
389+
allocation_lock = asyncio.Lock()
383390
try:
384391
data = await request.json()
385392
allocation = Allocation.parse_obj(data)
@@ -389,85 +396,86 @@ async def update_allocations(request: web.Request):
389396
pubsub: PubSub = request.app["pubsub"]
390397
pool: VmPool = request.app["vm_pool"]
391398

392-
# First free resources from persistent programs and instances that are not scheduled anymore.
393-
allocations = allocation.persistent_vms | allocation.instances
394-
# Make a copy since the pool is modified
395-
for execution in list(pool.get_persistent_executions()):
396-
if execution.vm_hash not in allocations and execution.is_running and not execution.uses_payment_stream:
397-
vm_type = "instance" if execution.is_instance else "persistent program"
398-
logger.info("Stopping %s %s", vm_type, execution.vm_hash)
399-
await pool.stop_vm(execution.vm_hash)
400-
pool.forget_vm(execution.vm_hash)
401-
402-
# Second start persistent VMs and instances sequentially to limit resource usage.
403-
404-
# Exceptions that can be raised when starting a VM:
405-
vm_creation_exceptions = (
406-
UnknownHashError,
407-
ResourceDownloadError,
408-
FileTooLargeError,
409-
VmSetupError,
410-
MicroVMFailedInitError,
411-
HostNotFoundError,
412-
HTTPNotFound,
413-
)
414-
415-
scheduling_errors: dict[ItemHash, Exception] = {}
416-
417-
# Schedule the start of persistent VMs:
418-
for vm_hash in allocation.persistent_vms:
419-
try:
420-
logger.info(f"Starting long running VM '{vm_hash}'")
421-
vm_hash = ItemHash(vm_hash)
422-
await start_persistent_vm(vm_hash, pubsub, pool)
423-
except vm_creation_exceptions as error:
424-
logger.exception("Error while starting VM '%s': %s", vm_hash, error)
425-
scheduling_errors[vm_hash] = error
426-
except Exception as error:
427-
# Handle unknown exception separately, to avoid leaking data
428-
logger.exception("Unhandled Error while starting VM '%s': %s", vm_hash, error)
429-
scheduling_errors[vm_hash] = Exception("Unhandled Error")
430-
431-
# Schedule the start of instances:
432-
for instance_hash in allocation.instances:
433-
logger.info(f"Starting instance '{instance_hash}'")
434-
instance_item_hash = ItemHash(instance_hash)
435-
try:
436-
await start_persistent_vm(instance_item_hash, pubsub, pool)
437-
except vm_creation_exceptions as error:
438-
logger.exception("Error while starting VM '%s': %s", instance_hash, error)
439-
scheduling_errors[instance_item_hash] = error
440-
except Exception as error:
441-
# Handle unknown exception separately, to avoid leaking data
442-
logger.exception("Unhandled Error while starting VM '%s': %s", instance_hash, error)
443-
scheduling_errors[instance_hash] = Exception("Unhandled Error")
444-
445-
# Log unsupported features
446-
if allocation.on_demand_vms:
447-
logger.warning("Not supported yet: 'allocation.on_demand_vms'")
448-
if allocation.jobs:
449-
logger.warning("Not supported yet: 'allocation.jobs'")
450-
451-
failing = set(scheduling_errors.keys())
452-
successful = allocations - failing
399+
async with allocation_lock:
400+
# First free resources from persistent programs and instances that are not scheduled anymore.
401+
allocations = allocation.persistent_vms | allocation.instances
402+
# Make a copy since the pool is modified
403+
for execution in list(pool.get_persistent_executions()):
404+
if execution.vm_hash not in allocations and execution.is_running and not execution.uses_payment_stream:
405+
vm_type = "instance" if execution.is_instance else "persistent program"
406+
logger.info("Stopping %s %s", vm_type, execution.vm_hash)
407+
await pool.stop_vm(execution.vm_hash)
408+
pool.forget_vm(execution.vm_hash)
409+
410+
# Second start persistent VMs and instances sequentially to limit resource usage.
411+
412+
# Exceptions that can be raised when starting a VM:
413+
vm_creation_exceptions = (
414+
UnknownHashError,
415+
ResourceDownloadError,
416+
FileTooLargeError,
417+
VmSetupError,
418+
MicroVMFailedInitError,
419+
HostNotFoundError,
420+
HTTPNotFound,
421+
)
453422

454-
status_code: int
455-
if not failing:
456-
status_code = 200 # OK
457-
elif not successful:
458-
status_code = 503 # Service Unavailable
459-
else:
460-
status_code = 207 # Multi-Status
423+
scheduling_errors: dict[ItemHash, Exception] = {}
461424

462-
return web.json_response(
463-
data={
464-
"success": not failing,
465-
"successful": list(successful),
466-
"failing": list(failing),
467-
"errors": {vm_hash: repr(error) for vm_hash, error in scheduling_errors.items()},
468-
},
469-
status=status_code,
470-
)
425+
# Schedule the start of persistent VMs:
426+
for vm_hash in allocation.persistent_vms:
427+
try:
428+
logger.info(f"Starting long running VM '{vm_hash}'")
429+
vm_hash = ItemHash(vm_hash)
430+
await start_persistent_vm(vm_hash, pubsub, pool)
431+
except vm_creation_exceptions as error:
432+
logger.exception("Error while starting VM '%s': %s", vm_hash, error)
433+
scheduling_errors[vm_hash] = error
434+
except Exception as error:
435+
# Handle unknown exception separately, to avoid leaking data
436+
logger.exception("Unhandled Error while starting VM '%s': %s", vm_hash, error)
437+
scheduling_errors[vm_hash] = Exception("Unhandled Error")
438+
439+
# Schedule the start of instances:
440+
for instance_hash in allocation.instances:
441+
logger.info(f"Starting instance '{instance_hash}'")
442+
instance_item_hash = ItemHash(instance_hash)
443+
try:
444+
await start_persistent_vm(instance_item_hash, pubsub, pool)
445+
except vm_creation_exceptions as error:
446+
logger.exception("Error while starting VM '%s': %s", instance_hash, error)
447+
scheduling_errors[instance_item_hash] = error
448+
except Exception as error:
449+
# Handle unknown exception separately, to avoid leaking data
450+
logger.exception("Unhandled Error while starting VM '%s': %s", instance_hash, error)
451+
scheduling_errors[instance_hash] = Exception("Unhandled Error")
452+
453+
# Log unsupported features
454+
if allocation.on_demand_vms:
455+
logger.warning("Not supported yet: 'allocation.on_demand_vms'")
456+
if allocation.jobs:
457+
logger.warning("Not supported yet: 'allocation.jobs'")
458+
459+
failing = set(scheduling_errors.keys())
460+
successful = allocations - failing
461+
462+
status_code: int
463+
if not failing:
464+
status_code = 200 # OK
465+
elif not successful:
466+
status_code = 503 # Service Unavailable
467+
else:
468+
status_code = 207 # Multi-Status
469+
470+
return web.json_response(
471+
data={
472+
"success": not failing,
473+
"successful": list(successful),
474+
"failing": list(failing),
475+
"errors": {vm_hash: repr(error) for vm_hash, error in scheduling_errors.items()},
476+
},
477+
status=status_code,
478+
)
471479

472480

473481
@cors_allow_all

0 commit comments

Comments
 (0)