diff --git a/benchmarks/profiler/README.md b/benchmarks/profiler/README.md new file mode 120000 index 0000000000..30861f2786 --- /dev/null +++ b/benchmarks/profiler/README.md @@ -0,0 +1 @@ +../../docs/architecture/pre_deployment_profiling.md \ No newline at end of file diff --git a/benchmarks/profiler/profile_sla.py b/benchmarks/profiler/profile_sla.py index 3ea2f0ceb9..cc4404956b 100644 --- a/benchmarks/profiler/profile_sla.py +++ b/benchmarks/profiler/profile_sla.py @@ -589,9 +589,9 @@ async def run_profile(args): parser.add_argument( "--backend", type=str, - default="vllm_v1", - choices=["vllm_v1"], - help="backend type, currently support [vllm_v1]", + default="vllm", + choices=["vllm"], + help="backend type, currently support [vllm]", ) parser.add_argument( "--config", diff --git a/benchmarks/profiler/utils/config.py b/benchmarks/profiler/utils/config.py index d188616738..17ec95c6cc 100644 --- a/benchmarks/profiler/utils/config.py +++ b/benchmarks/profiler/utils/config.py @@ -80,7 +80,7 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d config = deepcopy(config) # set metadata name - config["metadata"]["name"] = "vllm-v1-agg" + config["metadata"]["name"] = "vllm-agg" # disable planner if "Planner" in config["spec"]["services"]: @@ -89,16 +89,16 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if target == "prefill": # convert prefill worker into decode worker config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker + WORKER_COMPONENT_NAMES["vllm"].decode_worker ] = config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker + WORKER_COMPONENT_NAMES["vllm"].prefill_worker ] del config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker + WORKER_COMPONENT_NAMES["vllm"].prefill_worker ] args = config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker + WORKER_COMPONENT_NAMES["vllm"].decode_worker ]["extraPodSpec"]["mainContainer"]["args"] args = break_arguments(args) @@ -112,18 +112,18 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if "--no-enable-prefix-caching" not in args: args = append_argument(args, "--no-enable-prefix-caching") - config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ + config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm"].decode_worker][ "extraPodSpec" ]["mainContainer"]["args"] = join_arguments(args) elif target == "decode": # delete prefill worker del config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].prefill_worker + WORKER_COMPONENT_NAMES["vllm"].prefill_worker ] args = config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker + WORKER_COMPONENT_NAMES["vllm"].decode_worker ]["extraPodSpec"]["mainContainer"]["args"] args = break_arguments(args) @@ -134,13 +134,13 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if "--no-enable-prefix-caching" in args: args.remove("--no-enable-prefix-caching") - config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ + config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm"].decode_worker][ "extraPodSpec" ]["mainContainer"]["args"] = join_arguments(args) # set num workers to 1 decode_worker_config = config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker + WORKER_COMPONENT_NAMES["vllm"].decode_worker ] decode_worker_config["replicas"] = 1 @@ -150,16 +150,16 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d def set_config_tp_size(cls, config: dict, tp_size: int): config = deepcopy(config) - config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ + config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm"].decode_worker][ "resources" ]["requests"]["gpu"] = str(tp_size) - config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ + config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm"].decode_worker][ "resources" ]["limits"]["gpu"] = str(tp_size) - args = config["spec"]["services"][ - WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker - ]["extraPodSpec"]["mainContainer"]["args"] + args = config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm"].decode_worker][ + "extraPodSpec" + ]["mainContainer"]["args"] args = break_arguments(args) @@ -169,7 +169,7 @@ def set_config_tp_size(cls, config: dict, tp_size: int): except ValueError: args = append_argument(args, ["--tensor-parallel-size", str(tp_size)]) - config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker][ + config["spec"]["services"][WORKER_COMPONENT_NAMES["vllm"].decode_worker][ "extraPodSpec" ]["mainContainer"]["args"] = join_arguments(args) @@ -177,7 +177,7 @@ def set_config_tp_size(cls, config: dict, tp_size: int): @classmethod def get_model_name(cls, config: dict) -> str: - worker_name = WORKER_COMPONENT_NAMES["vllm_v1"].decode_worker + worker_name = WORKER_COMPONENT_NAMES["vllm"].decode_worker args = config["spec"]["services"][worker_name]["extraPodSpec"]["mainContainer"][ "args" ] @@ -232,5 +232,5 @@ def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int: CONFIG_MODIFIERS = { - "vllm_v1": VllmV1ConfigModifier, + "vllm": VllmV1ConfigModifier, } diff --git a/benchmarks/profiler/utils/dynamo_deployment.py b/benchmarks/profiler/utils/dynamo_deployment.py index bad7f861ce..16964a7337 100644 --- a/benchmarks/profiler/utils/dynamo_deployment.py +++ b/benchmarks/profiler/utils/dynamo_deployment.py @@ -17,9 +17,9 @@ import asyncio import time from pathlib import Path -from typing import Optional, Union +from typing import Any, Dict, List, Optional, Union -import aiofiles +import aiofiles # type: ignore[import-untyped] import httpx # added for HTTP requests import kubernetes_asyncio as kubernetes import yaml @@ -62,9 +62,9 @@ def __init__( self.deployment_name = deployment_name self.model_name = model_name self.service_name = service_name or f"{deployment_name}-frontend" - self.components: list[str] = [] # Will store component names from CR + self.components: List[str] = [] # Will store component names from CR self.deployment_spec: Optional[ - dict + Dict[str, Any] ] = None # Will store the full deployment spec self.base_log_dir = Path(base_log_dir) if base_log_dir else Path("logs") self.frontend_port = frontend_port diff --git a/components/backends/vllm/README.md b/components/backends/vllm/README.md index 525c8311f5..d93753087b 100644 --- a/components/backends/vllm/README.md +++ b/components/backends/vllm/README.md @@ -112,6 +112,7 @@ For Kubernetes deployment, YAML manifests are provided in the `deploy/` director - `agg_router.yaml` - Aggregated serving with KV routing - `disagg.yaml` - Disaggregated serving - `disagg_router.yaml` - Disaggregated serving with KV routing +- `disagg_planner.yaml` - Disaggregated serving with [SLA Planner](../../../docs/architecture/sla_planner.md). See [SLA Planner Deployment Guide](../../../docs/guides/dynamo_deploy/sla_planner_deployment.md) for more details. #### Prerequisites @@ -124,6 +125,8 @@ For Kubernetes deployment, YAML manifests are provided in the `deploy/` director # Update the image references in the YAML files ``` +- **Pre-Deployment Profiling (if Using SLA Planner)**: Follow the [pre-deployment profiling guide](../../../docs/architecture/pre_deployment_profiling.md) to run pre-deployment profiling. The results will be saved to the `profiling-pvc` PVC and queried by the SLA Planner. + - **Port Forwarding**: After deployment, forward the frontend service to access the API: ```bash kubectl port-forward deployment/vllm-v1-disagg-frontend- 8080:8000 diff --git a/components/backends/vllm/deploy/disagg_planner.yaml b/components/backends/vllm/deploy/disagg_planner.yaml index cfc9af022b..6d974ecc38 100644 --- a/components/backends/vllm/deploy/disagg_planner.yaml +++ b/components/backends/vllm/deploy/disagg_planner.yaml @@ -6,6 +6,13 @@ kind: DynamoGraphDeployment metadata: name: vllm-disagg-planner spec: + envs: + - name: DYNAMO_SERVICE_CONFIG + value: '{"Prometheus":{"global":{"scrape_interval":"5s"},"scrape_configs":[{"job_name":"prometheus","static_configs":[{"targets":["localhost:9090"]}]},{"job_name":"frontend","static_configs":[{"targets":["vllm-disagg-planner-frontend:8000"]}]}]}}' + - name: DYNAMO_PORT + value: "8000" + - name: DYNAMO_NAMESPACE + value: "vllm-disagg-planner" services: Frontend: dynamoNamespace: vllm-disagg-planner @@ -31,25 +38,114 @@ spec: failureThreshold: 10 resources: requests: - cpu: "1" + cpu: "32" + memory: "10Gi" + limits: + cpu: "32" + memory: "10Gi" + extraPodSpec: + mainContainer: + image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-253.17 + workingDir: /workspace/components/backends/vllm + command: + - /bin/sh + - -c + args: + - "python3 -m dynamo.frontend --http-port 8000" + Planner: + dynamoNamespace: vllm-disagg-planner + envFromSecret: hf-token-secret + componentType: planner + replicas: 1 + livenessProbe: + exec: + command: + - /bin/sh + - -c + - "exit 0" + periodSeconds: 60 + timeoutSeconds: 30 + failureThreshold: 10 + readinessProbe: + exec: + command: + - /bin/sh + - -c + - "exit 0" + initialDelaySeconds: 60 + periodSeconds: 60 + timeoutSeconds: 30 + failureThreshold: 10 + resources: + requests: + cpu: "2" memory: "2Gi" limits: - cpu: "1" + cpu: "2" memory: "2Gi" + pvc: + create: false + name: profiling-pvc # Must be pre-created before deployment and SLA profiler must have been run + mountPoint: /workspace/profiling_results extraPodSpec: mainContainer: - image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17 + image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-253.17 + workingDir: /workspace/components/planner/src/dynamo/planner + args: + - python + - -m + - planner_sla + - --environment=kubernetes + - --backend=vllm + - --adjustment-interval=60 + - --profile-results-dir=/workspace/profiling_results + Prometheus: + dynamoNamespace: vllm-disagg-planner + componentType: main + replicas: 1 + envs: + - name: PYTHONPATH + value: "/workspace/components/planner/src" + livenessProbe: + exec: + command: + - /bin/sh + - -c + - "exit 0" + periodSeconds: 60 + timeoutSeconds: 30 + failureThreshold: 10 + readinessProbe: + exec: + command: + - /bin/sh + - -c + - "exit 0" + initialDelaySeconds: 30 + periodSeconds: 60 + timeoutSeconds: 30 + failureThreshold: 10 + resources: + requests: + cpu: "2" + memory: "2Gi" + limits: + cpu: "2" + memory: "2Gi" + extraPodSpec: + mainContainer: + image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-253.17 workingDir: /workspace/components/backends/vllm command: - /bin/sh - -c args: - - "python3 -m dynamo.frontend --http-port 8000" - VllmDecodeWorker: + - "python3 -m dynamo.planner.prometheus" + backend: dynamoNamespace: vllm-disagg-planner envFromSecret: hf-token-secret componentType: worker - replicas: 1 + replicas: 2 livenessProbe: httpGet: path: /live @@ -66,12 +162,12 @@ spec: failureThreshold: 60 resources: requests: - cpu: "10" - memory: "20Gi" + cpu: "8" + memory: "16Gi" gpu: "1" limits: - cpu: "10" - memory: "20Gi" + cpu: "8" + memory: "16Gi" gpu: "1" envs: - name: DYN_SYSTEM_ENABLED @@ -88,18 +184,18 @@ spec: port: 9090 periodSeconds: 10 failureThreshold: 60 - image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17 + image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-253.17 workingDir: /workspace/components/backends/vllm command: - /bin/sh - -c args: - - "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B 2>&1 | tee /tmp/vllm.log" - VllmPrefillWorker: + - "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B 2>&1 | tee /tmp/vllm.log" + prefill: dynamoNamespace: vllm-disagg-planner envFromSecret: hf-token-secret componentType: worker - replicas: 1 + replicas: 2 livenessProbe: httpGet: path: /health @@ -116,12 +212,12 @@ spec: failureThreshold: 60 resources: requests: - cpu: "10" - memory: "20Gi" + cpu: "8" + memory: "16Gi" gpu: "1" limits: - cpu: "10" - memory: "20Gi" + cpu: "8" + memory: "16Gi" gpu: "1" envs: - name: DYN_SYSTEM_ENABLED @@ -138,10 +234,10 @@ spec: port: 9090 periodSeconds: 10 failureThreshold: 60 - image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-233.17 + image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:dep-253.17 workingDir: /workspace/components/backends/vllm command: - /bin/sh - -c args: - - python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --is-prefill-worker 2>&1 | tee /tmp/vllm.log + - python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --is-prefill-worker 2>&1 | tee /tmp/vllm.log diff --git a/components/backends/vllm/src/dynamo/vllm/args.py b/components/backends/vllm/src/dynamo/vllm/args.py index 79f5d8e4b7..d39131e5c8 100644 --- a/components/backends/vllm/src/dynamo/vllm/args.py +++ b/components/backends/vllm/src/dynamo/vllm/args.py @@ -55,7 +55,7 @@ def parse_args() -> Config: parser.add_argument( "--is-prefill-worker", action="store_true", - help="Enable prefill functionality for this worker. Currently overwrites the --endpoint to be a specially chosen dyn://dynamo.prefill.generate", + help="Enable prefill functionality for this worker. Uses the provided namespace to construct dyn://namespace.prefill.generate", ) parser = AsyncEngineArgs.add_cli_args(parser) @@ -79,8 +79,13 @@ def parse_args() -> Config: # This becomes an `Option` on the Rust side config.served_model_name = None + namespace = os.environ.get("DYNAMO_NAMESPACE", "dynamo") + if args.is_prefill_worker: - args.endpoint = "dyn://dynamo.prefill.generate" + args.endpoint = f"dyn://{namespace}.prefill.generate" + else: + # For decode workers, also use the provided namespace instead of hardcoded "dynamo" + args.endpoint = f"dyn://{namespace}.backend.generate" endpoint_str = args.endpoint.replace("dyn://", "", 1) endpoint_parts = endpoint_str.split(".") @@ -127,6 +132,14 @@ async def allocate_and_reserve_port( """ node_name = socket.gethostname() + try: + node_ip = socket.gethostbyname(node_name) + except socket.gaierror: + # If hostname cannot be resolved, fall back to localhost + logger.warning( + f"Hostname '{node_name}' cannot be resolved, falling back to '127.0.0.1'" + ) + node_ip = "127.0.0.1" for attempt in range(1, max_attempts + 1): # Hold socket open just long enough to reserve in ETCD @@ -136,7 +149,7 @@ async def allocate_and_reserve_port( port = sock.getsockname()[1] # Reserve in ETCD while holding the socket - key = f"dyn://{namespace}/ports/{node_name}/{port}" + key = f"dyn://{namespace}/ports/{node_ip}/{port}" value = { "worker_id": worker_id, "reason": reason, @@ -238,23 +251,41 @@ def overwrite_args(config): raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.") -def set_side_channel_host_and_port(config: Config, hostname: Optional[str] = None): - """vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors. - This sets the port number for the side channel. +def get_host_ip() -> str: + """Get the IP address of the host. + This is needed for the side channel to work in multi-node deployments. """ - if hostname is None: - hostname = socket.gethostname() - # Test if hostname is usable by attempting to bind to it + try: + host_name = socket.gethostname() + except socket.error as e: + logger.warning(f"Failed to get hostname: {e}, falling back to '127.0.0.1'") + return "127.0.0.1" + else: try: + # Get the IP address of the hostname - this is needed for the side channel to work in multi-node deployments + host_ip = socket.gethostbyname(host_name) + # Test if the IP is actually usable by binding to it with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket: - test_socket.bind((hostname, 0)) - except (socket.error, socket.gaierror): - # If hostname is not usable, fall back to localhost + test_socket.bind((host_ip, 0)) + return host_ip + except socket.gaierror as e: logger.warning( - f"Hostname '{hostname}' is not usable, falling back to '127.0.0.1'" + f"Hostname '{host_name}' cannot be resolved: {e}, falling back to '127.0.0.1'" ) - hostname = "127.0.0.1" + return "127.0.0.1" + except socket.error as e: + # If hostname is not usable for binding, fall back to localhost + logger.warning( + f"Hostname '{host_name}' is not usable for binding: {e}, falling back to '127.0.0.1'" + ) + return "127.0.0.1" + - os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = hostname +def set_side_channel_host_and_port(config: Config): + """vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors. + This sets the port number for the side channel. + """ + host_ip = get_host_ip() + os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = host_ip os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(config.side_channel_port) - logger.debug(f"Set NIXL side channel to {hostname}:{config.side_channel_port}") + logger.debug(f"Set NIXL side channel to {host_ip}:{config.side_channel_port}") diff --git a/components/planner/README.md b/components/planner/README.md index f02737fdbc..f666881bb1 100644 --- a/components/planner/README.md +++ b/components/planner/README.md @@ -15,113 +15,4 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Planner - -The planner is a component that monitors the state of the system and makes adjustments to the number of workers to ensure that the system is running efficiently. It can dynamically scale prefill/decode workers up and down based on a variety of KV metrics. You can find documentation and benchmarking examples in the [planner docs](../../docs/guides/planner.md). - -## Usage - -After you've deployed a dynamo graph, you can start the planner with the following command: - -```bash -PYTHONPATH=/workspace/examples/llm python components/planner.py --namespace -``` - -## Backends - -1. `local` - uses circus to start/stop worker subprocesses -2. `kubernetes` - uses the kubernetes API to adjust replicas of the DynamoGraphDeployment resource, which automatically scales the corresponding worker pods up or down - -## Local Backend (LocalPlanner) - -The LocalPlanner is built on top of circus, which is what we use to manage component subprocesses when running with the frontend and workers. LocalPlanner allows the planner component to scale workers up and down based on system metrics. - -**Current limitations** -1. Single node only -2. Workers must be using only a single GPU -3. Your initial deployment must be replicas=1 for both prefill and decode - -We are working on addressing these as fast as possible. - -### Under the Hood - -Circus has a concept of an arbiter and a watcher: -- **Arbiter**: The supervisor process that manages all watchers -- **Watcher**: A process that encodes environment variables, command, name, and other information needed to run a component - -When a service is started, each worker process is spun up as a watcher. For example, when starting a VllmWorker, a watcher is created that looks like: - -```json -{ - "dynamo_VllmWorker": { - "watcher_name": "dynamo_VllmWorker", - "cmd": "/opt/dynamo/venv/bin/python3 -m dynamo.sdk.cli.serve_dynamo graphs.agg_router:Frontend --service-name VllmWorker --worker-id $(CIRCUS.WID) --worker-env [{\"CUDA_VISIBLE_DEVICES\": \"0\"}]", - "resources": { - "allocated_gpus": [ - 0 - ] - }, - "lease": 7587886183172559418 - } -} -``` - -The arbiter exposes an endpoint allowing messages to add/remove/change watchers. The LocalPlanner leverages this functionality to dynamically adjust worker counts. - -### Implementation - -The planner architecture is designed to be simple and extensible: -- An abstract class supports basic add/remove component operations -- This is implemented in `local_connector.py` -- Circus interaction logic is in `circusd.py`, which reads the statefile, connects to the endpoint, and provides add/remove functionality -- Planner starts an instance of `LocalConnector` and uses it to modify the deployment topology - -### Statefile - -The statefile maintains the current state of all running workers and is used by the LocalPlanner to track and modify the deployment. It's stored at `~/.dynamo/state/{namespace}.json` (or in the directory specified by `DYN_LOCAL_STATE_DIR`). The statefile is automatically created when you run the frontend with workers and is cleaned up when the arbiter terminates. Each worker is identified as `{namespace}_{component_name}` with an optional numeric suffix for additional instances. - -#### Example: Adding and Removing Workers - -Starting with a single decode worker: -```json -{ - "dynamo_VllmWorker": {..., "resources":{...}} -} -``` - -After adding a worker: -```json -{ - "dynamo_VllmWorker": {..., "resources":{...}}, - "dynamo_VllmWorker_1": {..., "resources":{...}} -} -``` - -After removing a worker (removes the highest suffix): -```json -{ - "dynamo_VllmWorker": {..., "resources":{...}} -} -``` - -If scaled to zero, the initial entry is kept without resources to maintain configuration information: -```json -{ - "dynamo_VllmWorker": {...} -} -``` - -### Looking forward - -- Support for a multinode LocalPlanner -- Storing the statefile (and initial configurations) in ETCD using the the new `EtcdKvCache`. - -### Testing - -For manual testing, you can use the controller_test.py file to add/remove components after you've run a serve command on a Dynamo pipeline where the planner is linked. - -## Kubernetes Backend - -The Kubernetes backend works by updating the replicas count of the DynamoGraphDeployment custom resource. When the planner determines that workers need to be scaled up or down based on workload metrics, it uses the Kubernetes API to patch the DynamoGraphDeployment resource specification, changing the replicas count for the appropriate worker component. The Kubernetes operator then reconciles this change by creating or terminating the necessary pods. This provides a seamless autoscaling experience in Kubernetes environments without requiring manual intervention. - -The Kubernetes backend will automatically be used by Planner when your pipeline is deployed using a DynamoGraphDeployment CR. By default, the planner will run in no-op mode, which means it will monitor metrics but not take scaling actions. To enable actual scaling, you should also specify `--Planner.no-operation=false`. +Please refer to [planner docs](../../docs/architecture/planner_intro.rst) for planner documentation. diff --git a/components/planner/src/dynamo/planner/__init__.py b/components/planner/src/dynamo/planner/__init__.py index 09b52a2f15..5bddaa17a2 100644 --- a/components/planner/src/dynamo/planner/__init__.py +++ b/components/planner/src/dynamo/planner/__init__.py @@ -15,7 +15,6 @@ __all__ = [ "CircusController", - "LocalConnector", "PlannerConnector", "KubernetesConnector", "LoadPlannerDefaults", @@ -26,5 +25,4 @@ from dynamo.planner.circusd import CircusController from dynamo.planner.defaults import LoadPlannerDefaults, SLAPlannerDefaults from dynamo.planner.kubernetes_connector import KubernetesConnector -from dynamo.planner.local_connector import LocalConnector from dynamo.planner.planner_connector import PlannerConnector diff --git a/components/planner/src/dynamo/planner/defaults.py b/components/planner/src/dynamo/planner/defaults.py index 60a6216f02..2eae047167 100644 --- a/components/planner/src/dynamo/planner/defaults.py +++ b/components/planner/src/dynamo/planner/defaults.py @@ -13,12 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import os + +from dynamo.planner.kube import get_current_k8s_namespace +from dynamo.runtime.logging import configure_dynamo_logging + +configure_dynamo_logging() +logger = logging.getLogger(__name__) + # Source of truth for planner defaults class BasePlannerDefaults: namespace = "dynamo" - environment = "local" - backend = "vllm_v0" + environment = "kubernetes" + backend = "vllm" no_operation = False log_dir = None adjustment_interval = 180 # in seconds @@ -36,8 +45,25 @@ class LoadPlannerDefaults(BasePlannerDefaults): prefill_queue_scale_down_threshold = 0.2 +def _get_default_prometheus_endpoint(port: str, namespace: str): + """Compute default prometheus endpoint using environment variables and Kubernetes service discovery""" + + k8s_namespace = get_current_k8s_namespace() + if k8s_namespace and k8s_namespace != "default": + prometheus_service = f"{namespace}-prometheus" + return f"http://{prometheus_service}.{k8s_namespace}.svc.cluster.local:{port}" + else: + logger.warning( + f"Cannot determine Prometheus endpoint. Running in namespace '{k8s_namespace}'. " + "Ensure the planner is deployed in a Kubernetes cluster with proper namespace configuration." + ) + return f"{namespace}-prometheus" + + class SLAPlannerDefaults(BasePlannerDefaults): - prometheus_endpoint = "http://localhost:9090" + port = os.environ.get("DYNAMO_PORT", "8000") + namespace = os.environ.get("DYNAMO_NAMESPACE", "vllm-disagg-planner") + prometheus_endpoint = _get_default_prometheus_endpoint(port, namespace) profile_results_dir = "profiling_results" isl = 3000 # in number of tokens osl = 150 # in number of tokens @@ -47,21 +73,13 @@ class SLAPlannerDefaults(BasePlannerDefaults): load_prediction_window_size = 50 # predict load using how many recent load samples -class VllmV0ComponentName: - prefill_worker = "PrefillWorker" - prefill_worker_endpoint = "mock" - decode_worker = "VllmWorker" - decode_worker_endpoint = "generate" - - -class VllmV1ComponentName: - prefill_worker = "VllmPrefillWorker" +class VllmComponentName: + prefill_worker = "prefill" prefill_worker_endpoint = "generate" - decode_worker = "VllmDecodeWorker" + decode_worker = "backend" decode_worker_endpoint = "generate" WORKER_COMPONENT_NAMES = { - "vllm_v0": VllmV0ComponentName, - "vllm_v1": VllmV1ComponentName, + "vllm": VllmComponentName, } diff --git a/components/planner/src/dynamo/planner/kube.py b/components/planner/src/dynamo/planner/kube.py index b368ffa20b..7b9e846bd5 100644 --- a/components/planner/src/dynamo/planner/kube.py +++ b/components/planner/src/dynamo/planner/kube.py @@ -17,26 +17,41 @@ from typing import Optional from kubernetes import client, config +from kubernetes.config.config_exception import ConfigException + + +def get_current_k8s_namespace() -> str: + """Get the current namespace if running inside a k8s cluster""" + try: + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: + return f.read().strip() + except FileNotFoundError: + # Fallback to 'default' if not running in k8s + return "default" class KubernetesAPI: - def __init__(self): + def __init__(self, k8s_namespace: Optional[str] = None): # Load kubernetes configuration - config.load_incluster_config() # for in-cluster deployment + try: + config.load_incluster_config() # for in-cluster deployment + except ConfigException: + config.load_kube_config() # for out-of-cluster deployment self.custom_api = client.CustomObjectsApi() - self.current_namespace = self._get_current_namespace() + self.current_namespace = k8s_namespace or get_current_k8s_namespace() - def _get_current_namespace(self) -> str: - """Get the current namespace if running inside a k8s cluster""" - try: - with open( - "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r" - ) as f: - return f.read().strip() - except FileNotFoundError: - # Fallback to 'default' if not running in k8s - return "default" + def _get_graph_deployment_from_name( + self, graph_deployment_name: str + ) -> Optional[dict]: + """Get the graph deployment from the dynamo graph deployment name""" + return self.custom_api.get_namespaced_custom_object( + group="nvidia.com", + version="v1alpha1", + namespace=self.current_namespace, + plural="dynamographdeployments", + name=graph_deployment_name, + ) async def get_graph_deployment( self, component_name: str, dynamo_namespace: str @@ -98,12 +113,8 @@ async def get_graph_deployment( if not graph_deployment_name: return None - graph_deployment = self.custom_api.get_namespaced_custom_object( - group="nvidia.com", - version="v1alpha1", - namespace=self.current_namespace, - plural="dynamographdeployments", - name=graph_deployment_name, + graph_deployment = self._get_graph_deployment_from_name( + graph_deployment_name ) return graph_deployment @@ -127,19 +138,36 @@ async def update_graph_replicas( body=patch, ) + async def is_deployment_ready(self, graph_deployment_name: str) -> bool: + """Check if a graph deployment is ready""" + + graph_deployment = self._get_graph_deployment_from_name(graph_deployment_name) + + if not graph_deployment: + raise ValueError(f"Graph deployment {graph_deployment_name} not found") + + conditions = graph_deployment.get("status", {}).get("conditions", []) + ready_condition = next( + (c for c in conditions if c.get("type") == "Ready"), None + ) + + return ready_condition is not None and ready_condition.get("status") == "True" + async def wait_for_graph_deployment_ready( self, graph_deployment_name: str, - max_attempts: int = 60, # default: 10 minutes total + max_attempts: int = 180, # default: 30 minutes total delay_seconds: int = 10, # default: check every 10 seconds ) -> None: """Wait for a graph deployment to be ready""" for attempt in range(max_attempts): await asyncio.sleep(delay_seconds) - graph_deployment = await self.get_graph_deployment( - graph_deployment_name, self.current_namespace + + graph_deployment = self._get_graph_deployment_from_name( + graph_deployment_name ) + if not graph_deployment: raise ValueError(f"Graph deployment {graph_deployment_name} not found") diff --git a/components/planner/src/dynamo/planner/kubernetes_connector.py b/components/planner/src/dynamo/planner/kubernetes_connector.py index e206f91d69..9a9933507b 100644 --- a/components/planner/src/dynamo/planner/kubernetes_connector.py +++ b/components/planner/src/dynamo/planner/kubernetes_connector.py @@ -13,24 +13,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .kube import KubernetesAPI -from .planner_connector import PlannerConnector +import logging +from typing import Optional + +from dynamo.planner.kube import KubernetesAPI +from dynamo.planner.planner_connector import PlannerConnector +from dynamo.runtime.logging import configure_dynamo_logging + +configure_dynamo_logging() +logger = logging.getLogger(__name__) class KubernetesConnector(PlannerConnector): - def __init__(self, namespace: str): - self.kube_api = KubernetesAPI() - self.namespace = namespace + def __init__(self, dynamo_namespace: str, k8s_namespace: Optional[str] = None): + self.kube_api = KubernetesAPI(k8s_namespace) + self.dynamo_namespace = dynamo_namespace async def add_component(self, component_name: str, blocking: bool = True): """Add a component by increasing its replica count by 1""" + deployment = await self.kube_api.get_graph_deployment( - component_name, self.namespace + component_name, self.dynamo_namespace ) if deployment is None: raise ValueError( - f"Graph not found for component {component_name} in dynamo namespace {self.namespace}" + f"Graph not found for component {component_name} in dynamo namespace {self.dynamo_namespace}" ) + # get current replicas or 1 if not found current_replicas = self._get_current_replicas(deployment, component_name) await self.kube_api.update_graph_replicas( @@ -45,13 +54,15 @@ async def add_component(self, component_name: str, blocking: bool = True): async def remove_component(self, component_name: str, blocking: bool = True): """Remove a component by decreasing its replica count by 1""" + deployment = await self.kube_api.get_graph_deployment( - component_name, self.namespace + component_name, self.dynamo_namespace ) if deployment is None: raise ValueError( - f"Graph {component_name} not found for namespace {self.namespace}" + f"Graph {component_name} not found for namespace {self.dynamo_namespace}" ) + # get current replicas or 1 if not found current_replicas = self._get_current_replicas(deployment, component_name) if current_replicas > 0: @@ -65,6 +76,68 @@ async def remove_component(self, component_name: str, blocking: bool = True): self._get_graph_deployment_name(deployment) ) + async def _validate_components_same_deployment( + self, target_replicas: dict[str, int] + ) -> dict: + """ + Validate that all target components belong to the same DynamoGraphDeployment. + """ + if not target_replicas: + raise ValueError("target_replicas cannot be empty") + + # Get deployment for first component + first_component = next(iter(target_replicas)) + deployment = await self.kube_api.get_graph_deployment( + first_component, self.dynamo_namespace + ) + if deployment is None: + raise ValueError( + f"Component {first_component} not found in namespace {self.dynamo_namespace}" + ) + + # Validate that all components belong to the same DGD + graph_name = deployment["metadata"]["name"] + for component in target_replicas: + comp_deployment = await self.kube_api.get_graph_deployment( + component, self.dynamo_namespace + ) + if comp_deployment is None: + raise ValueError( + f"Component {component} not found in namespace {self.dynamo_namespace}" + ) + if comp_deployment["metadata"]["name"] != graph_name: + raise ValueError( + f"Component {component} belongs to graph '{comp_deployment['metadata']['name']}' " + f"but expected graph '{graph_name}'. All components must belong to the same GraphDeployment." + ) + + return deployment + + async def set_component_replicas( + self, target_replicas: dict[str, int], blocking: bool = True + ): + """Set the replicas for multiple components at once""" + deployment = await self._validate_components_same_deployment(target_replicas) + if not await self.kube_api.is_deployment_ready( + self._get_graph_deployment_name(deployment) + ): + logger.warning( + f"Deployment {self._get_graph_deployment_name(deployment)} is not ready, ignoring this scaling" + ) + return + + for component_name, replicas in target_replicas.items(): + await self.kube_api.update_graph_replicas( + self._get_graph_deployment_name(deployment), + component_name, + replicas, + ) + + if blocking: + await self.kube_api.wait_for_graph_deployment_ready( + self._get_graph_deployment_name(deployment) + ) + def _get_current_replicas(self, deployment: dict, component_name: str) -> int: """Get the current replicas for a component in a graph deployment""" return ( @@ -84,12 +157,13 @@ def _get_graph_deployment_name(self, deployment: dict) -> str: import asyncio parser = argparse.ArgumentParser() - parser.add_argument("--namespace", type=str, default="dynamo") + parser.add_argument("--dynamo_namespace", type=str, default="dynamo") + parser.add_argument("--k8s_namespace", type=str, default="default") parser.add_argument("--action", type=str, choices=["add", "remove"]) parser.add_argument("--component", type=str, default="planner") parser.add_argument("--blocking", action="store_true") args = parser.parse_args() - connector = KubernetesConnector(args.namespace) + connector = KubernetesConnector(args.dynamo_namespace, args.k8s_namespace) if args.action == "add": task = connector.add_component(args.component, args.blocking) diff --git a/components/planner/src/dynamo/planner/local_connector.py b/components/planner/src/dynamo/planner/local_connector.py deleted file mode 100644 index e8654231ee..0000000000 --- a/components/planner/src/dynamo/planner/local_connector.py +++ /dev/null @@ -1,310 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import json -import logging -import os -from pathlib import Path -from typing import Any, Dict, List - -import filelock - -from dynamo.planner.circusd import CircusController -from dynamo.planner.planner_connector import PlannerConnector -from dynamo.runtime import DistributedRuntime -from dynamo.runtime.logging import configure_dynamo_logging - -configure_dynamo_logging() -logger = logging.getLogger(__name__) - - -class LocalConnector(PlannerConnector): - def __init__(self, namespace: str, runtime: DistributedRuntime): - """ - Initialize LocalConnector and connect to CircusController. - - Args: - namespace: The Dynamo namespace - runtime: Optional DistributedRuntime instance - """ - self.namespace = namespace - self.runtime = runtime - self.state_file = Path.home() / ".dynamo" / "state" / f"{namespace}.json" - self.circus = CircusController.from_state_file(namespace) - self._lockfile = self.state_file.with_suffix(".lock") - self._file_lock = filelock.FileLock(self._lockfile) - self.worker_client: Any | None = None - self.prefill_client: Any | None = None - self.etcd_client: Any | None = None - - async def _load_state(self) -> Dict[str, Any]: - """Load state from state file. - - Returns: - State dictionary - """ - if not self.state_file.exists(): - raise FileNotFoundError(f"State file not found: {self.state_file}") - - with self._file_lock: - with open(self.state_file, "r") as f: - return json.load(f) - - async def _save_state(self, state: Dict[str, Any]) -> bool: - """Save state to state file. - - Args: - state: State dictionary to save - - Returns: - True if successful - """ - try: - with self._file_lock: - with open(self.state_file, "w") as f: - json.dump(state, f, indent=2) - return True - except Exception as e: - logger.error(f"Failed to save state: {e}") - return False - - async def _get_available_gpus(self) -> List[str]: - """Get list of unallocated GPU IDs. - - Returns: - List of available GPU IDs - """ - state = await self._load_state() - system_resources = state.get("environment", {}).get("SYSTEM_RESOURCES", {}) - all_gpus = set(str(gpu) for gpu in system_resources.get("gpu_info", [])) - - allocated_gpus: set[str] = set() - for component_info in state.get("components", {}).values(): - resources = component_info.get("resources", {}) - gpu_list = resources.get("allocated_gpus", []) - allocated_gpus.update(str(gpu) for gpu in gpu_list) - - logger.info(f"Allocated GPUs: {allocated_gpus}") - available = sorted(list(all_gpus - allocated_gpus)) - logger.info(f"Available GPUs: {available}") - return available - - async def add_component(self, component_name: str, blocking: bool = True) -> bool: - """ - Add a component. The steps are as follows: - - 1. Load state - 2. Find max suffix to create unique watcher name - 3. Built environment and command for watcher - 4. Block until component is running - - Args: - component_name: Name of the component - - Returns: - True if successful - """ - state = await self._load_state() - # Find max suffix - max_suffix = 0 - for watcher_name in state["components"].keys(): - if watcher_name.startswith(f"{self.namespace}_{component_name}_"): - suffix = int( - watcher_name.replace(f"{self.namespace}_{component_name}_", "") - ) - max_suffix = max(max_suffix, suffix) - - watcher_name = f"{self.namespace}_{component_name}_{max_suffix + 1}" - - if component_name not in [ - c.replace(f"{self.namespace}_", "") for c in state["components"] - ]: - raise ValueError( - f"Component {component_name} not found in state configuration" - ) - - # Get base command and config - component_info = state["components"][f"{self.namespace}_{component_name}"] - base_cmd = component_info["cmd"].split("--worker-env")[0].strip() - service_config = state["environment"].get("DYNAMO_SERVICE_CONFIG") - - # Build environment - watcher_env = os.environ.copy() - if component_name in ["VllmWorker", "PrefillWorker"]: - available_gpus = await self._get_available_gpus() - if not available_gpus: - raise ValueError("No GPUs available for allocation") - gpu_id = available_gpus[0] - watcher_env["CUDA_VISIBLE_DEVICES"] = gpu_id - - watcher_env["DYNAMO_SERVICE_CONFIG"] = service_config - - # Build worker env list and command - worker_env_list = [watcher_env] - worker_env_arg = json.dumps(worker_env_list) - # We add a custom component name to ensure that the lease is attatched to this specific watcher - full_cmd = f"{base_cmd} --worker-env '{worker_env_arg}' --custom-component-name '{watcher_name}'" - - pre_add_endpoint_ids = await self._count_instance_ids(component_name) - logger.info(f"Pre-add endpoint IDs: {pre_add_endpoint_ids}") - - logger.info(f"Adding watcher {watcher_name}") - success = await self.circus.add_watcher( - name=watcher_name, cmd=full_cmd, env=watcher_env, singleton=True - ) - - if success: - resources = {} - if component_name in ["VllmWorker", "PrefillWorker"]: - resources["allocated_gpus"] = [gpu_id] - - state["components"][watcher_name] = { - "watcher_name": watcher_name, - "cmd": full_cmd, - "resources": resources, - } - await self._save_state(state) - logger.info( - f"Succesfully created {watcher_name}. Waiting for worker to start..." - ) - - if blocking: - required_endpoint_ids = pre_add_endpoint_ids + 1 - while True: - current_endpoint_ids = await self._count_instance_ids(component_name) - if current_endpoint_ids == required_endpoint_ids: - break - logger.info( - f"Waiting for {component_name} to start. Current endpoint IDs: {current_endpoint_ids}, Required endpoint IDs: {required_endpoint_ids}" - ) - await asyncio.sleep(5) - - return success - - async def remove_component( - self, component_name: str, blocking: bool = True - ) -> bool: - """ - Remove a component. The initial components are not numbered so we simply remove their resources - and lease but keep the entry in order to use the cmd. This allows us to re-add the component - without having to re-specify the cmd. For components that have been added, we remove their entry - entry - - Args: - component_name: Name of the component - - Returns: - True if successful - """ - logger.info(f"Attempting to remove component {component_name}") - state = await self._load_state() - matching_components = {} - - base_name = f"{self.namespace}_{component_name}" - base_name_with_underscore = f"{base_name}_" - - for watcher_name in state["components"].keys(): - if watcher_name == base_name: - matching_components[0] = watcher_name - elif watcher_name.startswith(base_name_with_underscore): - suffix = int(watcher_name.replace(base_name_with_underscore, "")) - matching_components[suffix] = watcher_name - - if not matching_components: - logger.error(f"No matching components found for {component_name}") - return False - - highest_suffix = max(matching_components.keys()) - target_watcher = matching_components[highest_suffix] - logger.info(f"Removing watcher {target_watcher}") - - success = await self.circus.remove_watcher( - name=target_watcher, blocking=blocking - ) - if not blocking: - logger.info( - f"Circus remove_watcher for {target_watcher} {'succeeded' if success else 'failed'}" - ) - - if success: - if highest_suffix > 0: # Numbered watcher - remove entire entry - if target_watcher in state["components"]: - del state["components"][target_watcher] - else: # Base watcher - just clear resources and lease - if target_watcher in state["components"]: - state["components"][target_watcher]["resources"] = {} - state["components"][target_watcher]["lease"] = None - await self._save_state(state) - - return success - - async def _count_instance_ids(self, component_name: str) -> int: - """ - Count the instance IDs for the 'generate' endpoint of given component. - - Args: - component_name: Name of the component - - Returns: - Number of endpoint IDs for a component - """ - if component_name == "VllmWorker": - if self.worker_client is None: - self.worker_client = ( - await self.runtime.namespace(self.namespace) - .component(component_name) - .endpoint("generate") - .client() - ) - worker_ids = self.worker_client.instance_ids() - return len(worker_ids) - elif component_name == "PrefillWorker": - if self.prefill_client is None: - self.prefill_client = ( - await self.runtime.namespace(self.namespace) - .component(component_name) - .endpoint("mock") - .client() - ) - prefill_ids = self.prefill_client.instance_ids() - return len(prefill_ids) - else: - raise ValueError(f"Component {component_name} not supported") - - async def _revoke_lease(self, lease_id: int) -> bool: - """ - Wrapper function around the etcd client to revoke a lease - - Args: - lease_id: Lease ID to revoke - - Returns: - True if successful - """ - if self.etcd_client is None: - self.etcd_client = self.runtime.etcd_client() # type: ignore - try: - await self.etcd_client.revoke_lease(lease_id) - logger.info(f"Revoked lease {lease_id}") - return True - except Exception as e: - logger.error(f"Failed to revoke lease {lease_id}: {e}") - return False - - def __del__(self): - """Cleanup circus controller connection on deletion.""" - if hasattr(self, "circus"): - self.circus.close() diff --git a/components/planner/src/dynamo/planner/planner_sla.py b/components/planner/src/dynamo/planner/planner_sla.py index c43ea52006..788c65862d 100644 --- a/components/planner/src/dynamo/planner/planner_sla.py +++ b/components/planner/src/dynamo/planner/planner_sla.py @@ -21,11 +21,7 @@ from dynamo.planner.defaults import SLAPlannerDefaults from dynamo.planner.utils.planner_core import start_sla_planner -from dynamo.runtime.logging import configure_dynamo_logging -from dynamo.sdk import async_on_start, dynamo_context, endpoint, service -from dynamo.sdk.core.protocol.interface import ComponentType -from dynamo.sdk.lib.config import ServiceConfig -from dynamo.sdk.lib.image import DYNAMO_IMAGE +from dynamo.runtime import DistributedRuntime, dynamo_worker logger = logging.getLogger(__name__) @@ -38,79 +34,107 @@ class RequestType(BaseModel): text: str -@service( - dynamo={ - "namespace": "dynamo", - "component_type": ComponentType.PLANNER, - }, - resources={"cpu": "10", "memory": "20Gi"}, - workers=1, - image=DYNAMO_IMAGE, -) -class Planner: - def __init__(self): - configure_dynamo_logging(service_name="Planner") - logger.info("Starting planner") - self.runtime = dynamo_context["runtime"] - - config = ServiceConfig.get_instance() - - # Get namespace directly from dynamo_context as it contains the active namespace - self.namespace = dynamo_context["namespace"] - config_instance = config.get("Planner", {}) - - self.args = argparse.Namespace( - namespace=self.namespace, - environment=config_instance.get( - "environment", SLAPlannerDefaults.environment - ), - backend=config_instance.get("backend", SLAPlannerDefaults.backend), - no_operation=config_instance.get( - "no-operation", SLAPlannerDefaults.no_operation - ), - log_dir=config_instance.get("log-dir", SLAPlannerDefaults.log_dir), - adjustment_interval=config_instance.get( - "adjustment-interval", SLAPlannerDefaults.adjustment_interval - ), - max_gpu_budget=config_instance.get( - "max-gpu-budget", SLAPlannerDefaults.max_gpu_budget - ), - min_endpoint=config_instance.get( - "min-endpoint", SLAPlannerDefaults.min_endpoint - ), - decode_engine_num_gpu=config_instance.get( - "decode-engine-num-gpu", SLAPlannerDefaults.decode_engine_num_gpu - ), - prefill_engine_num_gpu=config_instance.get( - "prefill-engine-num-gpu", SLAPlannerDefaults.prefill_engine_num_gpu - ), - prometheus_endpoint=config_instance.get( - "prometheus-endpoint", SLAPlannerDefaults.prometheus_endpoint - ), - profile_results_dir=config_instance.get( - "profile-results-dir", SLAPlannerDefaults.profile_results_dir - ), - isl=config_instance.get("isl", SLAPlannerDefaults.isl), - osl=config_instance.get("osl", SLAPlannerDefaults.osl), - ttft=config_instance.get("ttft", SLAPlannerDefaults.ttft), - itl=config_instance.get("itl", SLAPlannerDefaults.itl), - load_predictor=config_instance.get( - "load-predictor", SLAPlannerDefaults.load_predictor - ), - load_prediction_window_size=config_instance.get( - "load-prediction-window-size", - SLAPlannerDefaults.load_prediction_window_size, - ), - ) - - @async_on_start - async def async_init(self): - await asyncio.sleep(INIT_PLANNER_START_DELAY) - logger.info("Calling start_planner") - await start_sla_planner(self.runtime, self.args) - logger.info("Planner started") - - @endpoint() - async def generate(self, request: RequestType): +@dynamo_worker(static=False) +async def init_planner(runtime: DistributedRuntime, args): + await asyncio.sleep(INIT_PLANNER_START_DELAY) + + await start_sla_planner(runtime, args) + + component = runtime.namespace(SLAPlannerDefaults.namespace).component("Planner") + await component.create_service() + + async def generate(request: RequestType): """Dummy endpoint to satisfy that each component has an endpoint""" yield "mock endpoint" + + generate_endpoint = component.endpoint("generate") + await generate_endpoint.serve_endpoint(generate) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="SLA Planner") + parser.add_argument( + "--environment", + default=SLAPlannerDefaults.environment, + choices=["kubernetes"], + help="Environment type", + ) + parser.add_argument( + "--backend", + default=SLAPlannerDefaults.backend, + choices=["vllm"], + help="Backend type", + ) + parser.add_argument( + "--no-operation", + action="store_true", + default=SLAPlannerDefaults.no_operation, + help="Enable no-operation mode", + ) + parser.add_argument( + "--log-dir", default=SLAPlannerDefaults.log_dir, help="Log directory path" + ) + parser.add_argument( + "--adjustment-interval", + type=int, + default=SLAPlannerDefaults.adjustment_interval, + help="Adjustment interval in seconds", + ) + parser.add_argument( + "--max-gpu-budget", + type=int, + default=SLAPlannerDefaults.max_gpu_budget, + help="Maximum GPU budget", + ) + parser.add_argument( + "--min-endpoint", + type=int, + default=SLAPlannerDefaults.min_endpoint, + help="Minimum number of endpoints", + ) + parser.add_argument( + "--decode-engine-num-gpu", + type=int, + default=SLAPlannerDefaults.decode_engine_num_gpu, + help="Number of GPUs for decode engine", + ) + parser.add_argument( + "--prefill-engine-num-gpu", + type=int, + default=SLAPlannerDefaults.prefill_engine_num_gpu, + help="Number of GPUs for prefill engine", + ) + parser.add_argument( + "--profile-results-dir", + default=SLAPlannerDefaults.profile_results_dir, + help="Profile results directory", + ) + parser.add_argument( + "--isl", type=int, default=SLAPlannerDefaults.isl, help="Input sequence length" + ) + parser.add_argument( + "--osl", type=int, default=SLAPlannerDefaults.osl, help="Output sequence length" + ) + parser.add_argument( + "--ttft", + type=float, + default=SLAPlannerDefaults.ttft, + help="Time to first token", + ) + parser.add_argument( + "--itl", type=float, default=SLAPlannerDefaults.itl, help="Inter-token latency" + ) + parser.add_argument( + "--load-predictor", + default=SLAPlannerDefaults.load_predictor, + help="Load predictor type", + ) + parser.add_argument( + "--load-prediction-window-size", + type=int, + default=SLAPlannerDefaults.load_prediction_window_size, + help="Load prediction window size", + ) + + args = parser.parse_args() + asyncio.run(init_planner(args)) diff --git a/components/planner/src/dynamo/planner/prometheus.py b/components/planner/src/dynamo/planner/prometheus.py index fc0370e56e..dc0a5d96f7 100644 --- a/components/planner/src/dynamo/planner/prometheus.py +++ b/components/planner/src/dynamo/planner/prometheus.py @@ -13,55 +13,69 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import subprocess import tempfile import yaml -from dynamo.sdk import service +from dynamo.planner.defaults import SLAPlannerDefaults +from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.sdk.lib.config import ServiceConfig -from dynamo.sdk.lib.image import DYNAMO_IMAGE logger = logging.getLogger(__name__) -@service( - dynamo={ - "namespace": "dynamo", - }, - workers=1, - image=DYNAMO_IMAGE, -) -class Prometheus: - def __init__(self): - """Initialize Frontend service with HTTP server and model configuration.""" - self.config = ServiceConfig.get_parsed_config("Prometheus") - self.process = None - - logger.info(f"Prometheus config: {self.config}") - - self.start_prometheus_server() - - def start_prometheus_server(self): - logger.info("Starting prometheus server...") - - self.temp_file = tempfile.NamedTemporaryFile( - mode="w", suffix=".yml", delete=False - ) - yaml.dump(self.config, self.temp_file) - self.temp_file.close() - config_path = self.temp_file.name - - cmd = [ - "prometheus", - f"--config.file={config_path}", - ] - - logger.info(f"Prometheus cmd: {cmd}") - - self.process = subprocess.Popen( - cmd, - stdout=None, - stderr=None, - ) +@dynamo_worker(static=False) +async def worker(runtime: DistributedRuntime): + """Initialize and run Prometheus server with Dynamo config.""" + config = ServiceConfig.get_parsed_config("Prometheus") + + logger.info(f"Prometheus config: {config}") + + await start_prometheus_server(config) + + +async def start_prometheus_server(config): + logger.info("Starting prometheus server...") + + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) + yaml.dump(config, temp_file) + temp_file.close() + config_path = temp_file.name + + prometheus_port = SLAPlannerDefaults.port + cmd = [ + "prometheus", + f"--config.file={config_path}", + f"--web.listen-address=0.0.0.0:{prometheus_port}", + ] + + logger.info(f"Prometheus cmd: {cmd}") + + process = subprocess.Popen( + cmd, + stdout=None, + stderr=None, + ) + + # Keep the worker running + try: + while True: + await asyncio.sleep(1) + if process.poll() is not None: + logger.error("Prometheus process died") + break + except asyncio.CancelledError: + logger.info("Shutting down Prometheus...") + process.terminate() + process.wait() + raise + + +if __name__ == "__main__": + # The dynamo_worker decorator handles runtime setup + import asyncio + + asyncio.run(worker()) diff --git a/components/planner/src/dynamo/planner/utils/planner_core.py b/components/planner/src/dynamo/planner/utils/planner_core.py index f6d4de5e06..ffe532d216 100644 --- a/components/planner/src/dynamo/planner/utils/planner_core.py +++ b/components/planner/src/dynamo/planner/utils/planner_core.py @@ -21,7 +21,7 @@ from dataclasses import dataclass from typing import Optional -from dynamo.planner import KubernetesConnector, LocalConnector +from dynamo.planner import KubernetesConnector from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SLAPlannerDefaults from dynamo.planner.utils.load_predictor import LOAD_PREDICTORS from dynamo.planner.utils.perf_interpolation import ( @@ -47,22 +47,35 @@ class Metrics: p_load: Optional[float] = None d_load: Optional[float] = None + def is_valid(self) -> bool: + """Check if all metrics are valid (not None and not NaN).""" + return ( + self.ttft is not None + and self.itl is not None + and self.isl is not None + and self.osl is not None + and not math.isnan(self.ttft) + and not math.isnan(self.itl) + and not math.isnan(self.isl) + and not math.isnan(self.osl) + ) + class Planner: def __init__(self, runtime: DistributedRuntime, args: argparse.Namespace): self.runtime = runtime self.args = args - self.namespace = args.namespace + self.namespace = SLAPlannerDefaults.namespace if not args.no_operation: - if args.environment == "local": - self.connector = LocalConnector(args.namespace, runtime) - elif args.environment == "kubernetes": - self.connector = KubernetesConnector(args.namespace) + if args.environment == "kubernetes": + self.connector = KubernetesConnector(self.namespace) else: raise ValueError(f"Invalid environment: {args.environment}") - self.prometheus_api_client = PrometheusAPIClient(args.prometheus_endpoint) + self.prometheus_api_client = PrometheusAPIClient( + SLAPlannerDefaults.prometheus_endpoint + ) self.num_req_predictor = LOAD_PREDICTORS[args.load_predictor]( window_size=args.load_prediction_window_size, @@ -167,6 +180,13 @@ def observe_metrics(self): async def make_adjustments(self): try: + # Skip adjustment if no traffic + if not self.last_metrics.is_valid(): + logger.info( + "Metrics contain None or NaN values (no active requests), skipping adjustment" + ) + return + self.p_endpoints, self.d_endpoints = await self.get_workers_info() logger.info( f"Number of prefill workers: {len(self.p_endpoints)}, number of decode workers: {len(self.d_endpoints)}" @@ -224,7 +244,14 @@ async def make_adjustments(self): # compute how many replicas are needed for decode # 1. apply d_correction_factor to the ITL SLA - corrected_itl = self.args.itl / self.d_correction_factor + # Prevent divide by zero when d_correction_factor is 0 (no metrics yet) + if self.d_correction_factor <= 0: + logger.warning( + f"d_correction_factor is {self.d_correction_factor}, using default value of 1.0" + ) + corrected_itl = self.args.itl + else: + corrected_itl = self.args.itl / self.d_correction_factor # 2. reversely find out what is best throughput/gpu that can achieve corrected_itl under the predicted context length pred_decode_thpt_per_gpu = ( self.decode_interpolator.find_best_throughput_per_gpu( @@ -272,33 +299,11 @@ async def make_adjustments(self): return if not self.args.no_operation: - # scale up/down the number of prefill/decode non-blockingly - # TODO: add a check to avoid scaling before the previous scaling is completed - if next_num_p > len(self.p_endpoints): - for _ in range(next_num_p - len(self.p_endpoints)): - self.connector.add_component( - WORKER_COMPONENT_NAMES[self.args.backend].prefill_worker, - blocking=False, - ) - elif next_num_p < len(self.p_endpoints): - for _ in range(len(self.p_endpoints) - next_num_p): - self.connector.remove_component( - WORKER_COMPONENT_NAMES[self.args.backend].prefill_worker, - blocking=False, - ) - - if next_num_d > len(self.d_endpoints): - for _ in range(next_num_d - len(self.d_endpoints)): - self.connector.add_component( - WORKER_COMPONENT_NAMES[self.args.backend].decode_worker, - blocking=False, - ) - elif next_num_d < len(self.d_endpoints): - for _ in range(len(self.d_endpoints) - next_num_d): - self.connector.remove_component( - WORKER_COMPONENT_NAMES[self.args.backend].decode_worker, - blocking=False, - ) + target_replicas = { + WORKER_COMPONENT_NAMES[self.args.backend].prefill_worker: next_num_p, + WORKER_COMPONENT_NAMES[self.args.backend].decode_worker: next_num_d, + } + await self.connector.set_component_replicas(target_replicas, blocking=False) async def run(self): """Main loop for the planner""" @@ -329,12 +334,6 @@ async def start_sla_planner(runtime: DistributedRuntime, args: argparse.Namespac if __name__ == "__main__": parser = argparse.ArgumentParser() # Common planner arguments - parser.add_argument( - "--namespace", - type=str, - default=SLAPlannerDefaults.namespace, - help="Namespace planner will look at", - ) parser.add_argument( "--environment", type=str, diff --git a/components/planner/src/dynamo/planner/utils/prometheus.py b/components/planner/src/dynamo/planner/utils/prometheus.py index 6d63863bca..586c3efacc 100644 --- a/components/planner/src/dynamo/planner/utils/prometheus.py +++ b/components/planner/src/dynamo/planner/utils/prometheus.py @@ -27,40 +27,55 @@ class PrometheusAPIClient: def __init__(self, url: str): self.prom = PrometheusConnect(url=url, disable_ssl=True) - def get_avg_inter_token_latency(self, interval: str): + def _get_average_metric( + self, metric_name: str, interval: str, operation_name: str + ) -> float: + """ + Helper method to get average metrics using the pattern: + increase(metric_sum[interval])/increase(metric_count[interval]) + + Args: + metric_name: Base metric name (e.g., 'nv_llm_http_service_inter_token_latency_seconds') + interval: Time interval for the query (e.g., '60s') + operation_name: Human-readable name for error logging + + Returns: + Average metric value or 0 if no data/error + """ try: - return float( - self.prom.custom_query( - query=f"increase(nv_llm_http_service_inter_token_latency_seconds_sum[{interval}])/increase(nv_llm_http_service_inter_token_latency_seconds_count[{interval}])", - )[0]["value"][1] - ) + query = f"increase({metric_name}_sum[{interval}])/increase({metric_name}_count[{interval}])" + result = self.prom.custom_query(query=query) + if not result: + # No data available yet (no requests made) - return 0 silently + return 0 + return float(result[0]["value"][1]) except Exception as e: - logger.error(f"Error getting avg inter token latency: {e}") + logger.error(f"Error getting {operation_name}: {e}") return 0 + def get_avg_inter_token_latency(self, interval: str): + return self._get_average_metric( + "nv_llm_http_service_inter_token_latency_seconds", + interval, + "avg inter token latency", + ) + def get_avg_time_to_first_token(self, interval: str): - try: - return float( - self.prom.custom_query( - query=f"increase(nv_llm_http_service_time_to_first_token_seconds_sum[{interval}])/increase(nv_llm_http_service_time_to_first_token_seconds_count[{interval}])", - )[0]["value"][1] - ) - except Exception as e: - logger.error(f"Error getting avg time to first token: {e}") - return 0 + return self._get_average_metric( + "nv_llm_http_service_time_to_first_token_seconds", + interval, + "avg time to first token", + ) def get_avg_request_duration(self, interval: str): - try: - return float( - self.prom.custom_query( - query=f"increase(nv_llm_http_service_request_duration_seconds_sum[{interval}])/increase(nv_llm_http_service_request_duration_seconds_count[{interval}])", - )[0]["value"][1] - ) - except Exception as e: - logger.error(f"Error getting avg request duration: {e}") - return 0 + return self._get_average_metric( + "nv_llm_http_service_request_duration_seconds", + interval, + "avg request duration", + ) def get_avg_request_count(self, interval: str): + # This function follows a different query pattern than the other metrics try: raw_res = self.prom.custom_query( query=f"increase(nv_llm_http_service_requests_total[{interval}])" @@ -75,23 +90,15 @@ def get_avg_request_count(self, interval: str): return 0 def get_avg_input_sequence_tokens(self, interval: str): - try: - return float( - self.prom.custom_query( - query=f"increase(nv_llm_http_service_input_sequence_tokens_sum[{interval}])/increase(nv_llm_http_service_input_sequence_tokens_count[{interval}])", - )[0]["value"][1] - ) - except Exception as e: - logger.error(f"Error getting avg input sequence tokens: {e}") - return 0 + return self._get_average_metric( + "nv_llm_http_service_input_sequence_tokens", + interval, + "avg input sequence tokens", + ) def get_avg_output_sequence_tokens(self, interval: str): - try: - return float( - self.prom.custom_query( - query=f"increase(nv_llm_http_service_output_sequence_tokens_sum[{interval}])/increase(nv_llm_http_service_output_sequence_tokens_count[{interval}])", - )[0]["value"][1] - ) - except Exception as e: - logger.error(f"Error getting avg output sequence tokens: {e}") - return 0 + return self._get_average_metric( + "nv_llm_http_service_output_sequence_tokens", + interval, + "avg output sequence tokens", + ) diff --git a/components/planner/test/kube.py b/components/planner/test/kube.py index 2a2a124345..0f2f1bc04f 100644 --- a/components/planner/test/kube.py +++ b/components/planner/test/kube.py @@ -14,7 +14,7 @@ # limitations under the License. from typing import Any, Dict -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, patch import pytest @@ -39,9 +39,45 @@ def k8s_api(mock_custom_api, mock_config): return KubernetesAPI() +@pytest.fixture +def k8s_api_with_namespace(mock_custom_api, mock_config): + return KubernetesAPI(k8s_namespace="test-namespace") + + +def test_kubernetes_api_init_with_namespace(mock_custom_api, mock_config): + """Test KubernetesAPI initialization with custom namespace""" + api = KubernetesAPI(k8s_namespace="custom-namespace") + assert api.current_namespace == "custom-namespace" + + +def test_kubernetes_api_init_without_namespace(mock_custom_api, mock_config): + """Test KubernetesAPI initialization without custom namespace""" + api = KubernetesAPI() + # Should use the default namespace logic + assert api.current_namespace == "default" + + +def test_get_graph_deployment_from_name(k8s_api, mock_custom_api): + """Test _get_graph_deployment_from_name method""" + mock_deployment = {"metadata": {"name": "test-deployment"}} + mock_custom_api.get_namespaced_custom_object.return_value = mock_deployment + + result = k8s_api._get_graph_deployment_from_name("test-deployment") + + assert result == mock_deployment + mock_custom_api.get_namespaced_custom_object.assert_called_once_with( + group="nvidia.com", + version="v1alpha1", + namespace=k8s_api.current_namespace, + plural="dynamographdeployments", + name="test-deployment", + ) + + @pytest.mark.asyncio -async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api): - # Mock the get_graph_deployment response +async def test_is_deployment_ready_true(k8s_api, mock_custom_api): + """Test is_deployment_ready method when deployment is ready""" + # Mock the _get_graph_deployment_from_name response mock_deployment: Dict[str, Any] = { "status": { "conditions": [ @@ -49,22 +85,18 @@ async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api) ] } } - k8s_api.get_graph_deployment = AsyncMock(return_value=mock_deployment) - - # Test with minimal attempts and delay for faster testing - await k8s_api.wait_for_graph_deployment_ready( - "test-deployment", max_attempts=2, delay_seconds=0.1 - ) - # Verify get_graph_deployment was called - k8s_api.get_graph_deployment.assert_called_once_with( - "test-deployment", k8s_api.current_namespace - ) + # Mock the method on the instance + with patch.object( + k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment + ): + result = await k8s_api.is_deployment_ready("test-deployment") + assert result is True @pytest.mark.asyncio -async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api): - # Mock the get_graph_deployment response with not ready status +async def test_is_deployment_ready_false(k8s_api, mock_custom_api): + """Test is_deployment_ready method when deployment is not ready""" mock_deployment: Dict[str, Any] = { "status": { "conditions": [ @@ -76,54 +108,115 @@ async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api) ] } } - k8s_api.get_graph_deployment = AsyncMock(return_value=mock_deployment) - # Test with minimal attempts and delay for faster testing - with pytest.raises(TimeoutError) as exc_info: - await k8s_api.wait_for_graph_deployment_ready( - "test-deployment", max_attempts=2, delay_seconds=0.1 - ) + # Mock the method on the instance + with patch.object( + k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment + ): + result = await k8s_api.is_deployment_ready("test-deployment") + assert result is False - assert "is not ready after" in str(exc_info.value) - assert k8s_api.get_graph_deployment.call_count == 2 + +@pytest.mark.asyncio +async def test_is_deployment_ready_not_found(k8s_api, mock_custom_api): + """Test is_deployment_ready method when deployment is not found""" + # Mock the method on the instance + with patch.object(k8s_api, "_get_graph_deployment_from_name", return_value=None): + with pytest.raises(ValueError) as exc_info: + await k8s_api.is_deployment_ready("test-deployment") + + assert "not found" in str(exc_info.value) @pytest.mark.asyncio -async def test_wait_for_graph_deployment_not_found(k8s_api, mock_custom_api): - # Mock the get_graph_deployment response to return None - k8s_api.get_graph_deployment = AsyncMock(return_value=None) +async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api): + """Test wait_for_graph_deployment_ready when deployment becomes ready""" + # Mock the _get_graph_deployment_from_name response + mock_deployment: Dict[str, Any] = { + "status": { + "conditions": [ + {"type": "Ready", "status": "True", "message": "Deployment is ready"} + ] + } + } - # Test with minimal attempts and delay for faster testing - with pytest.raises(ValueError) as exc_info: + # Mock the method on the instance + with patch.object( + k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment + ): + # Test with minimal attempts and delay for faster testing await k8s_api.wait_for_graph_deployment_ready( "test-deployment", max_attempts=2, delay_seconds=0.1 ) - assert "not found" in str(exc_info.value) - assert k8s_api.get_graph_deployment.call_count == 1 + +@pytest.mark.asyncio +async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api): + """Test wait_for_graph_deployment_ready when deployment times out""" + # Mock the _get_graph_deployment_from_name response with not ready status + mock_deployment: Dict[str, Any] = { + "status": { + "conditions": [ + { + "type": "Ready", + "status": "False", + "message": "Deployment is not ready", + } + ] + } + } + + # Mock the method on the instance + with patch.object( + k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment + ): + # Test with minimal attempts and delay for faster testing + with pytest.raises(TimeoutError) as exc_info: + await k8s_api.wait_for_graph_deployment_ready( + "test-deployment", max_attempts=2, delay_seconds=0.1 + ) + + assert "is not ready after" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_wait_for_graph_deployment_not_found(k8s_api, mock_custom_api): + """Test wait_for_graph_deployment_ready when deployment is not found""" + # Mock the _get_graph_deployment_from_name response to return None + with patch.object(k8s_api, "_get_graph_deployment_from_name", return_value=None): + # Test with minimal attempts and delay for faster testing + with pytest.raises(ValueError) as exc_info: + await k8s_api.wait_for_graph_deployment_ready( + "test-deployment", max_attempts=2, delay_seconds=0.1 + ) + + assert "not found" in str(exc_info.value) @pytest.mark.asyncio async def test_wait_for_graph_deployment_no_conditions(k8s_api, mock_custom_api): - # Mock the get_graph_deployment response with no conditions + """Test wait_for_graph_deployment_ready when deployment has no conditions""" + # Mock the _get_graph_deployment_from_name response with no conditions mock_deployment: Dict[str, Any] = {"status": {}} - k8s_api.get_graph_deployment = AsyncMock(return_value=mock_deployment) - # Test with minimal attempts and delay for faster testing - with pytest.raises(TimeoutError) as exc_info: - await k8s_api.wait_for_graph_deployment_ready( - "test-deployment", max_attempts=2, delay_seconds=0.1 - ) + with patch.object( + k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment + ): + # Test with minimal attempts and delay for faster testing + with pytest.raises(TimeoutError) as exc_info: + await k8s_api.wait_for_graph_deployment_ready( + "test-deployment", max_attempts=2, delay_seconds=0.1 + ) - assert "is not ready after" in str(exc_info.value) - assert k8s_api.get_graph_deployment.call_count == 2 + assert "is not ready after" in str(exc_info.value) @pytest.mark.asyncio async def test_wait_for_graph_deployment_ready_on_second_attempt( k8s_api, mock_custom_api ): - # Mock the get_graph_deployment response to return not ready first, then ready + """Test wait_for_graph_deployment_ready when deployment becomes ready on second attempt""" + # Mock the _get_graph_deployment_from_name response to return not ready first, then ready mock_deployment_not_ready: Dict[str, Any] = { "status": { "conditions": [ @@ -142,13 +235,13 @@ async def test_wait_for_graph_deployment_ready_on_second_attempt( ] } } - k8s_api.get_graph_deployment = AsyncMock( - side_effect=[mock_deployment_not_ready, mock_deployment_ready] - ) - - # Test with minimal attempts and delay for faster testing - await k8s_api.wait_for_graph_deployment_ready( - "test-deployment", max_attempts=2, delay_seconds=0.1 - ) - assert k8s_api.get_graph_deployment.call_count == 2 + with patch.object( + k8s_api, + "_get_graph_deployment_from_name", + side_effect=[mock_deployment_not_ready, mock_deployment_ready], + ): + # Test with minimal attempts and delay for faster testing + await k8s_api.wait_for_graph_deployment_ready( + "test-deployment", max_attempts=2, delay_seconds=0.1 + ) diff --git a/components/planner/test/kubernetes_connector.py b/components/planner/test/kubernetes_connector.py index 318cb40954..c795248bc0 100644 --- a/components/planner/test/kubernetes_connector.py +++ b/components/planner/test/kubernetes_connector.py @@ -63,7 +63,7 @@ async def test_add_component_increases_replicas(kubernetes_connector, mock_kube_ # Assert mock_kube_api.get_graph_deployment.assert_called_once_with( - component_name, kubernetes_connector.namespace + component_name, kubernetes_connector.dynamo_namespace ) mock_kube_api.update_graph_replicas.assert_called_once_with( "test-graph", component_name, 2 diff --git a/docs/architecture/pre_deployment_profiling.md b/docs/architecture/pre_deployment_profiling.md index d0c745b442..ea2ddaab2c 100644 --- a/docs/architecture/pre_deployment_profiling.md +++ b/docs/architecture/pre_deployment_profiling.md @@ -29,7 +29,7 @@ The script will recommend the best TP size for prefill and decode, as well as th 2025-05-16 15:20:24 - __main__ - INFO - Suggested planner upper/lower bound for decode kv cache utilization: 0.20/0.10 ``` -After finding the best TP size for prefill and decode, the script will then interpolate the TTFT with ISL and ITL with active KV cache and decode context length. This is to provide a more accurate estimation of the performance when ISL and OSL changes and will be used in the sla-planner. The results will be saved to `/_tp_interpolation`. +After finding the best TP size for prefill and decode, the script will then interpolate the TTFT with ISL and ITL with active KV cache and decode context length. This is to provide a more accurate estimation of the performance when ISL and OSL changes and will be used in the sla-planner. The results will be saved to `/_tp_interpolation`. Please change the prefill and decode TP size in the config file to match the best TP sizes obtained from the profiling script. ### Prefill Interpolation Data diff --git a/docs/architecture/sla_planner.md b/docs/architecture/sla_planner.md index 1fc7137392..3c2fcf428c 100644 --- a/docs/architecture/sla_planner.md +++ b/docs/architecture/sla_planner.md @@ -8,7 +8,7 @@ The SLA (Service Level Agreement)-based planner is an intelligent autoscaling sy > Currently, SLA-based planner only supports disaggregated setup. > [!WARNING] -> Bare metal deployment with local connector is deprecated. The only option to deploy SLA-based planner is via k8s. We will update the examples in this document soon. +> Bare metal deployment with local connector is deprecated. Please deploy the SLA planner in k8s. ## Features @@ -106,11 +106,13 @@ Finally, SLA planner applies the change by scaling up/down the number of prefill ## Deploying -To deploy SLA-planner, ensure etcd and NATS are running first, then use the frontend that reports metrics at `/metrics` HTTP endpoint. You can also use your own frontend, but it must report number of requests, ISL, OSL, TTFT, ITL in the same format. +For detailed deployment instructions including setup, configuration, troubleshooting, and architecture overview, see the [SLA Planner Deployment Guide](../guides/dynamo_deploy/sla_planner_deployment.md). -SLA-planner and prometheus server are provided as common components that can be directly imported from `dynamo` package. The following changes are needed: -- Add `Planner` and `Prometheus` components' dependency in `Frontend`. -- Link `Planner` and `Prometheus` in the graph. -- Add `Planner` and `Prometheus` configurations in the config file. +**Quick Start:** +```bash +cd components/backends/vllm/deploy +kubectl apply -f disagg_planner.yaml -n {$NAMESPACE} +``` -The SLA planner integration with the new frontend + worker architecture is currently a work in progress. This documentation will be updated with the new deployment patterns and code examples once the SLA planner component has been fully adapted to the new workflow. \ No newline at end of file +> [!NOTE] +> The SLA planner requires a frontend that reports metrics at `/metrics` HTTP endpoint with number of requests, ISL, OSL, TTFT, ITL in the correct format. The dynamo frontend provides these metrics automatically. diff --git a/docs/guides/dynamo_deploy/sla_planner_deployment.md b/docs/guides/dynamo_deploy/sla_planner_deployment.md new file mode 100644 index 0000000000..04af6b1d51 --- /dev/null +++ b/docs/guides/dynamo_deploy/sla_planner_deployment.md @@ -0,0 +1,116 @@ +# SLA Planner Deployment Guide + +Quick deployment guide for the vLLM disaggregated planner with automatic scaling. + +> [!NOTE] +> For high-level architecture and concepts, see [SLA-based Planner](../../architecture/sla_planner.md). + +## Architecture Overview + +**Components:** +- **Frontend**: Serves requests and exposes `/metrics` +- **Prometheus**: Scrapes frontend metrics every 5 seconds +- **Planner**: Queries Prometheus and adjusts worker scaling every 60 seconds +- **Workers**: prefill and backend workers handle inference + +```mermaid +flowchart LR + Frontend --"/metrics"--> Prometheus + Prometheus --"scrape"--> Prometheus + Planner --"query API"--> Prometheus + Planner --"scaling decisions"--> Workers["prefill
backend"] + Frontend -.->|"requests"| Workers +``` + +## Prerequisites +- Kubernetes cluster with GPU nodes +- `hf-token-secret` created in target namespace +- [Pre-Deployment Profiling](../../architecture/pre_deployment_profiling.md) results saved to `profiling-pvc` PVC. +- Prefill and decode worker uses the best parallelization mapping suggested by the pre-deployment profiling script. + +```bash +export NAMESPACE=your-namespace +``` + +## 1. Deploy the System + +```bash +# Apply the disaggregated planner deployment +kubectl apply -f components/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE + +# Check deployment status +kubectl get pods -n $NAMESPACE +``` + +Expected pods (all should be `1/1 Running`): +``` +vllm-disagg-planner-frontend-* 1/1 Running +vllm-disagg-planner-prometheus-* 1/1 Running +vllm-disagg-planner-planner-* 1/1 Running +vllm-disagg-planner-backend-* 1/1 Running +vllm-disagg-planner-prefill-* 1/1 Running +``` + +## 2. Test the System + +**Important:** Streaming requests (`"stream": true`) are required for the planner to collect latency metrics and make scaling decisions. Non-streaming requests will produce successful inference outputs but won't provide the necessary telemetry for automatic scaling. + +```bash +# Port forward to frontend +kubectl port-forward -n $NAMESPACE deployment/vllm-disagg-planner-frontend 8000:8000 + +# Send a streaming request (required for full metrics) +curl http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "Qwen/Qwen3-0.6B", + "messages": [ + { + "role": "user", + "content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden." + } + ], + "stream":true, + "max_tokens": 30 + }' +``` + +## 3. Monitor Scaling + +```bash +# Check planner logs for scaling decisions +kubectl logs -n $NAMESPACE deployment/vllm-disagg-planner-planner --tail=10 + +# Expected successful output (after streaming requests): + +# New adjustment interval started! +# Observed num_req: X.XXX isl: X.XXX osl: X.XXX +# Observed ttft: X.XXXs itl: X.XXXs +# Number of prefill workers: 1, number of decode workers: 1 +``` + +### Metrics Requirements +- **Basic metrics** (request count): Available with any request type +- **Latency metrics** (TTFT/ITL): Only available with `"stream": true` requests +- **Scaling decisions**: Require sufficient request volume and streaming requests + +## 4. Troubleshooting + +**Connection Issues:** +```bash +# Verify Prometheus is accessible (runs on port 8000) +kubectl port-forward -n $NAMESPACE deployment/vllm-disagg-planner-prometheus 8000:8000 +curl "http://localhost:8000/api/v1/query?query=up" +``` + +**Missing Metrics:** +```bash +# Check frontend metrics +kubectl port-forward -n $NAMESPACE deployment/vllm-disagg-planner-frontend 8000:8000 +curl http://localhost:8000/metrics | grep nv_llm_http_service +``` + +**Worker Issues:** +- Large models can take 10+ minutes to initialize +- Check worker logs: `kubectl logs -n $NAMESPACE deployment/vllm-disagg-planner-backend` +- Ensure GPU resources are available for workers