Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion orchestrator/modules/operators/orchestrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def cleanup(self) -> None:
done, not_done = ray.wait(
ray_waitables=handles, num_returns=len(handles), timeout=60.0
)
print(f"cleaned {len(done)}, clean failed {len(not_done)}")
moduleLog.info(f"cleaned {len(done)}, clean failed {len(not_done)}")


def log_space_details(discovery_space: "DiscoverySpace"):
Expand Down
71 changes: 43 additions & 28 deletions orchestrator/utilities/run_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from collections.abc import Callable

import ray.exceptions
import requests
import typer
import yaml
Expand All @@ -15,6 +16,10 @@
from orchestrator.modules.actuators.base import ActuatorBase
from orchestrator.modules.actuators.measurement_queue import MeasurementQueue
from orchestrator.modules.actuators.registry import ActuatorRegistry
from orchestrator.modules.operators.orchestrate import (
graceful_operation_shutdown,
initialize_resource_cleaner,
)
from orchestrator.schema.entity import Entity
from orchestrator.schema.point import SpacePoint
from orchestrator.schema.reference import ExperimentReference
Expand Down Expand Up @@ -62,8 +67,6 @@ def local_execution_closure(
f"Loaded configuration {actuator_configuration_identifier} for actuator {actuator_configuration.actuatorIdentifier}"
)

print(actuator_configurations)

def execute_local(
reference: ExperimentReference, entity: Entity
) -> MeasurementRequest | None:
Expand All @@ -85,19 +88,24 @@ def execute_local(
actuator = actuators[reference.actuatorIdentifier]
# Submit the measurement request asynchronously, handle errors gracefully.
try:
actuator.submit.remote(
ray.get(actuator.ready.remote())
future = actuator.submit.remote(
entities=[entity],
experimentReference=reference,
requesterid="run_experiment",
requestIndex=0,
)
except Exception as e:
_ = ray.get(future)
except ray.exceptions.ActorDiedError as error:
print(
f"[ERROR] Failed to submit measurement request for {reference} to actuator '{reference.actuatorIdentifier}': {e}"
f"[ERROR] Failed to initialize actuator '{reference.actuatorIdentifier}': {error}"
)
return None
except ray.exceptions.RayTaskError as error:
e = error.as_instanceof_cause()
print(
f"[ERROR] Failed to submit measurement request for {reference} to actuator '{reference.actuatorIdentifier}':\n {e}"
)
import traceback

traceback.print_exc()
# Either skip, or return None, or propagate. Let's return None.
return None

Expand Down Expand Up @@ -239,27 +247,34 @@ def run(
else remote_execution_closure(remote, timeout=timeout)
)

for reference in point.experiments:
valid = True
if validate:
print("Validating entity ...")
experiment = registry.experimentForReference(reference)
valid = experiment.validate_entity(entity, verbose=True)
else:
print("Skipping validation")

if valid:
print(f"Executing: {reference}")
request = execute(reference, entity)
if request is None:
print(
"Measurement request failed unexpectedly. Skipping this experiment."
)
if not remote:
initialize_resource_cleaner()

try:
for reference in point.experiments:
valid = True
if validate:
print("Validating entity ...")
experiment = registry.experimentForReference(reference)
valid = experiment.validate_entity(entity, verbose=True)
else:
print("Result:")
print(f"{request.series_representation(output_format='target')}\n")
else:
print("Entity is not valid")
print("Skipping validation")

if valid:
print(f"Executing: {reference}")
request = execute(reference, entity)
if request is None:
print(
"Measurement request failed unexpectedly. Skipping this experiment."
)
else:
print("Result:")
print(f"{request.series_representation(output_format='target')}\n")
else:
print("Entity is not valid")
finally:
if not remote:
graceful_operation_shutdown()


def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ def __init__(
logger.warning(
f"Failed to register custom actors for clean up {e}. Make sure you clean it up"
)
else:
self.log.warning(
"No namespace set in acutator configuration - will not be able to create deployments"
)

# initialize local port
self.local_port = 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ado_actuators.vllm_performance.k8s.yaml_support.build_components import (
ComponentsYaml,
)
from kubernetes.client import ApiException

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -112,8 +113,11 @@ def get_environment(
if env.in_use == 0:
available = True
start = time.time()
self.manager.delete_service(k8s_name=env.k8s_name)
self.manager.delete_deployment(k8s_name=env.k8s_name)
try:
self.manager.delete_service(k8s_name=env.k8s_name)
self.manager.delete_deployment(k8s_name=env.k8s_name)
except ApiException as e:
logger.error(f"Error deleting deployment or service {e}")
del self.environments[key]
print(
f"deleted environment {env.k8s_name} in {time.time() - start} sec. "
Expand Down Expand Up @@ -165,11 +169,18 @@ def cleanup(self) -> None:
Clean up environment
:return: None
"""
print("Cleaning environment manager")
logger.info("Cleaning environments")
for env in self.environments.values():
if env.state == EnvironmentState.READY:
try:
self.manager.delete_service(k8s_name=env.k8s_name)
except ApiException as e:
if e.reason != "Not Found":
raise e
try:
self.manager.delete_deployment(k8s_name=env.k8s_name)
except ApiException as e:
if e.reason != "Not Found":
raise e
# We only delete the PVC if it was created by this actuator
if self.manager.pvc_created:
logger.debug("Deleting PVC")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
VLLMDtype,
)
from kubernetes import client, config
from kubernetes.client import ApiException
from kubernetes.client import ApiException, V1Deployment

logger = logging.getLogger(__name__)

# These are the most common reasons for a container failure.
container_waiting_error_reasons = [
"CrashLoopBackOff",
"ImagePullBackOff",
"ErrImagePull",
]


class ComponentsManager:
"""
Expand Down Expand Up @@ -176,13 +183,10 @@ def delete_service(self, k8s_name: str) -> None:
:param k8s_name: kubernetes name
:return: boolean
"""
try:
self.kube_client_V1.delete_namespaced_service(
namespace=self.namespace,
name=k8s_name,
)
except ApiException as e:
logger.error(f"error deleting service {e}")
self.kube_client_V1.delete_namespaced_service(
namespace=self.namespace,
name=k8s_name,
)

def create_service(
self, k8s_name: str, template: str | None = None, reuse: bool = False
Expand All @@ -200,7 +204,10 @@ def create_service(
return
if exists and not reuse:
# delete it first
self.delete_service(k8s_name=k8s_name)
try:
self.delete_service(k8s_name=k8s_name)
except ApiException as e:
logger.error(f"Error deleting service {e}")
# make sure that deletion is completed
deleting = True
for _ in range(150):
Expand Down Expand Up @@ -244,16 +251,13 @@ def delete_deployment(self, k8s_name: str) -> None:
:param k8s_name: kubernetes name
:return: boolean
"""
try:
self.kube_client.delete_namespaced_deployment(
namespace=self.namespace,
name=k8s_name,
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
),
)
except ApiException as e:
logger.error(f"error deleting deployment {e}")
self.kube_client.delete_namespaced_deployment(
namespace=self.namespace,
name=k8s_name,
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
),
)

def create_deployment(
self,
Expand Down Expand Up @@ -304,7 +308,10 @@ def create_deployment(
return
if exists and not reuse:
# delete it first
self.delete_deployment(k8s_name=k8s_name)
try:
self.delete_deployment(k8s_name=k8s_name)
except ApiException as e:
logger.error(f"Error deleting deployment {e}")
# make sure that deletion is completed
deleting = True
for _ in range(150):
Expand Down Expand Up @@ -347,6 +354,36 @@ def create_deployment(
logger.error(f"error creating deployment {e}")
raise

def _is_pod_failed(self, deployment: V1Deployment) -> bool:
label_selector = ",".join(
[f"{k}={v}" for k, v in deployment.spec.selector.match_labels.items()]
)
pods = self.kube_client_V1.list_namespaced_pod(
self.namespace, label_selector=label_selector
)
# There's really only one pod in our deployments
for pod in pods.items:
pod_name = pod.metadata.name
pod_phase = pod.status.phase

# Check if pod phase is Failed
if pod_phase == "Failed":
logger.warning(f"Pod {pod_name} is Failed")
return True

# Check container statuses for errors
if pod.status.container_statuses:
for cs in pod.status.container_statuses:
if (
cs.state.waiting
and cs.state.waiting.reason in container_waiting_error_reasons
):
logger.warning(
f"Container {cs.name} in pod {pod_name} is in error ({cs.state.waiting.reason})"
)
return True
return False

def _deployment_ready(self, k8s_name: str) -> bool:
"""
Check whether deployment pod ready
Expand All @@ -361,6 +398,8 @@ def _deployment_ready(self, k8s_name: str) -> bool:
except ApiException as e:
logger.error(f"error getting deployment {e}")
return False
if self._is_pod_failed(deployment=deployment):
return False
if deployment.status.available_replicas is None:
return False
return deployment.status.available_replicas == 1
Expand Down
3 changes: 2 additions & 1 deletion website/docs/actuators/vllm_performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ Example `point.yaml`:

```yaml
entity:
model: ibm-granite/granite-3.3-8b-instruct
model: ibm-granite/granite-3.3-2b-instruct
n_cpus: 8
memory: 128Gi
gpu_type: NVIDIA-A100-80GB-PCIe
max_batch_tokens: 8192
max_num_seq: 32
n_gpus: 1
request_rate: 10
experiments:
- actuatorIdentifier: vllm_performance
experimentIdentifier: test-deployment-v1
Expand Down