Skip to content

Commit 5c93a82

Browse files
committed
Add lock arround the update_allocation commands
This prevent two allocation from running in parallel and causing racing error
1 parent d51b57a commit 5c93a82

File tree

1 file changed

+83
-77
lines changed

1 file changed

+83
-77
lines changed

src/aleph/vm/orchestrator/views/__init__.py

Lines changed: 83 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import binascii
23
import logging
34
from decimal import Decimal
@@ -370,6 +371,7 @@ def authenticate_api_request(request: web.Request) -> bool:
370371
return sha256(signature).hexdigest() == settings.ALLOCATION_TOKEN_HASH
371372

372373

374+
allocation_lock = None
373375
async def update_allocations(request: web.Request):
374376
"""Main entry for the start of persistence VM and instance, called by the Scheduler,
375377
@@ -380,6 +382,9 @@ async def update_allocations(request: web.Request):
380382
if not authenticate_api_request(request):
381383
return web.HTTPUnauthorized(text="Authentication token received is invalid")
382384

385+
global allocation_lock
386+
if allocation_lock is None:
387+
allocation_lock = asyncio.Lock()
383388
try:
384389
data = await request.json()
385390
allocation = Allocation.parse_obj(data)
@@ -389,85 +394,86 @@ async def update_allocations(request: web.Request):
389394
pubsub: PubSub = request.app["pubsub"]
390395
pool: VmPool = request.app["vm_pool"]
391396

392-
# First free resources from persistent programs and instances that are not scheduled anymore.
393-
allocations = allocation.persistent_vms | allocation.instances
394-
# Make a copy since the pool is modified
395-
for execution in list(pool.get_persistent_executions()):
396-
if execution.vm_hash not in allocations and execution.is_running and not execution.uses_payment_stream:
397-
vm_type = "instance" if execution.is_instance else "persistent program"
398-
logger.info("Stopping %s %s", vm_type, execution.vm_hash)
399-
await pool.stop_vm(execution.vm_hash)
400-
pool.forget_vm(execution.vm_hash)
401-
402-
# Second start persistent VMs and instances sequentially to limit resource usage.
403-
404-
# Exceptions that can be raised when starting a VM:
405-
vm_creation_exceptions = (
406-
UnknownHashError,
407-
ResourceDownloadError,
408-
FileTooLargeError,
409-
VmSetupError,
410-
MicroVMFailedInitError,
411-
HostNotFoundError,
412-
HTTPNotFound,
413-
)
414-
415-
scheduling_errors: dict[ItemHash, Exception] = {}
416-
417-
# Schedule the start of persistent VMs:
418-
for vm_hash in allocation.persistent_vms:
419-
try:
420-
logger.info(f"Starting long running VM '{vm_hash}'")
421-
vm_hash = ItemHash(vm_hash)
422-
await start_persistent_vm(vm_hash, pubsub, pool)
423-
except vm_creation_exceptions as error:
424-
logger.exception("Error while starting VM '%s': %s", vm_hash, error)
425-
scheduling_errors[vm_hash] = error
426-
except Exception as error:
427-
# Handle unknown exception separately, to avoid leaking data
428-
logger.exception("Unhandled Error while starting VM '%s': %s", vm_hash, error)
429-
scheduling_errors[vm_hash] = Exception("Unhandled Error")
430-
431-
# Schedule the start of instances:
432-
for instance_hash in allocation.instances:
433-
logger.info(f"Starting instance '{instance_hash}'")
434-
instance_item_hash = ItemHash(instance_hash)
435-
try:
436-
await start_persistent_vm(instance_item_hash, pubsub, pool)
437-
except vm_creation_exceptions as error:
438-
logger.exception("Error while starting VM '%s': %s", instance_hash, error)
439-
scheduling_errors[instance_item_hash] = error
440-
except Exception as error:
441-
# Handle unknown exception separately, to avoid leaking data
442-
logger.exception("Unhandled Error while starting VM '%s': %s", instance_hash, error)
443-
scheduling_errors[instance_hash] = Exception("Unhandled Error")
444-
445-
# Log unsupported features
446-
if allocation.on_demand_vms:
447-
logger.warning("Not supported yet: 'allocation.on_demand_vms'")
448-
if allocation.jobs:
449-
logger.warning("Not supported yet: 'allocation.jobs'")
450-
451-
failing = set(scheduling_errors.keys())
452-
successful = allocations - failing
397+
with allocation_lock:
398+
# First free resources from persistent programs and instances that are not scheduled anymore.
399+
allocations = allocation.persistent_vms | allocation.instances
400+
# Make a copy since the pool is modified
401+
for execution in list(pool.get_persistent_executions()):
402+
if execution.vm_hash not in allocations and execution.is_running and not execution.uses_payment_stream:
403+
vm_type = "instance" if execution.is_instance else "persistent program"
404+
logger.info("Stopping %s %s", vm_type, execution.vm_hash)
405+
await pool.stop_vm(execution.vm_hash)
406+
pool.forget_vm(execution.vm_hash)
407+
408+
# Second start persistent VMs and instances sequentially to limit resource usage.
409+
410+
# Exceptions that can be raised when starting a VM:
411+
vm_creation_exceptions = (
412+
UnknownHashError,
413+
ResourceDownloadError,
414+
FileTooLargeError,
415+
VmSetupError,
416+
MicroVMFailedInitError,
417+
HostNotFoundError,
418+
HTTPNotFound,
419+
)
453420

454-
status_code: int
455-
if not failing:
456-
status_code = 200 # OK
457-
elif not successful:
458-
status_code = 503 # Service Unavailable
459-
else:
460-
status_code = 207 # Multi-Status
421+
scheduling_errors: dict[ItemHash, Exception] = {}
461422

462-
return web.json_response(
463-
data={
464-
"success": not failing,
465-
"successful": list(successful),
466-
"failing": list(failing),
467-
"errors": {vm_hash: repr(error) for vm_hash, error in scheduling_errors.items()},
468-
},
469-
status=status_code,
470-
)
423+
# Schedule the start of persistent VMs:
424+
for vm_hash in allocation.persistent_vms:
425+
try:
426+
logger.info(f"Starting long running VM '{vm_hash}'")
427+
vm_hash = ItemHash(vm_hash)
428+
await start_persistent_vm(vm_hash, pubsub, pool)
429+
except vm_creation_exceptions as error:
430+
logger.exception("Error while starting VM '%s': %s", vm_hash, error)
431+
scheduling_errors[vm_hash] = error
432+
except Exception as error:
433+
# Handle unknown exception separately, to avoid leaking data
434+
logger.exception("Unhandled Error while starting VM '%s': %s", vm_hash, error)
435+
scheduling_errors[vm_hash] = Exception("Unhandled Error")
436+
437+
# Schedule the start of instances:
438+
for instance_hash in allocation.instances:
439+
logger.info(f"Starting instance '{instance_hash}'")
440+
instance_item_hash = ItemHash(instance_hash)
441+
try:
442+
await start_persistent_vm(instance_item_hash, pubsub, pool)
443+
except vm_creation_exceptions as error:
444+
logger.exception("Error while starting VM '%s': %s", instance_hash, error)
445+
scheduling_errors[instance_item_hash] = error
446+
except Exception as error:
447+
# Handle unknown exception separately, to avoid leaking data
448+
logger.exception("Unhandled Error while starting VM '%s': %s", instance_hash, error)
449+
scheduling_errors[instance_hash] = Exception("Unhandled Error")
450+
451+
# Log unsupported features
452+
if allocation.on_demand_vms:
453+
logger.warning("Not supported yet: 'allocation.on_demand_vms'")
454+
if allocation.jobs:
455+
logger.warning("Not supported yet: 'allocation.jobs'")
456+
457+
failing = set(scheduling_errors.keys())
458+
successful = allocations - failing
459+
460+
status_code: int
461+
if not failing:
462+
status_code = 200 # OK
463+
elif not successful:
464+
status_code = 503 # Service Unavailable
465+
else:
466+
status_code = 207 # Multi-Status
467+
468+
return web.json_response(
469+
data={
470+
"success": not failing,
471+
"successful": list(successful),
472+
"failing": list(failing),
473+
"errors": {vm_hash: repr(error) for vm_hash, error in scheduling_errors.items()},
474+
},
475+
status=status_code,
476+
)
471477

472478

473479
@cors_allow_all

0 commit comments

Comments
 (0)