diff --git a/components/src/dynamo/planner/kube.py b/components/src/dynamo/planner/kube.py index 67599467374..aa0d7e45fd6 100644 --- a/components/src/dynamo/planner/kube.py +++ b/components/src/dynamo/planner/kube.py @@ -78,11 +78,48 @@ def get_graph_deployment(self, graph_deployment_name: str) -> dict: ) raise - def update_graph_replicas( - self, graph_deployment_name: str, component_name: str, replicas: int + def update_service_replicas( + self, graph_deployment_name: str, service_name: str, replicas: int + ) -> None: + """ + Update replicas for a service using Scale subresource when DGDSA exists. + Falls back to DGD patch for backward compatibility with older operators. + + Args: + graph_deployment_name: Name of the DynamoGraphDeployment + service_name: Name of the service in DGD.spec.services + replicas: Desired number of replicas + """ + # DGDSA naming convention: - + adapter_name = f"{graph_deployment_name}-{service_name.lower()}" + + try: + # Try to scale via DGDSA Scale subresource + self.custom_api.patch_namespaced_custom_object_scale( + group="nvidia.com", + version="v1alpha1", + namespace=self.current_namespace, + plural="dynamographdeploymentscalingadapters", + name=adapter_name, + body={"spec": {"replicas": replicas}}, + ) + logger.info(f"Scaled DGDSA {adapter_name} to {replicas} replicas") + + except client.ApiException as e: + if e.status == 404: + # DGDSA doesn't exist - fall back to DGD patch (old operator) + logger.info( + f"DGDSA {adapter_name} not found, falling back to DGD update" + ) + self._update_dgd_replicas(graph_deployment_name, service_name, replicas) + else: + raise + + def _update_dgd_replicas( + self, graph_deployment_name: str, service_name: str, replicas: int ) -> None: - """Update the replicas count for a component in a DynamoGraphDeployment""" - patch = {"spec": {"services": {component_name: {"replicas": replicas}}}} + """Update replicas directly in DGD (fallback for old operators)""" + patch = {"spec": {"services": {service_name: {"replicas": replicas}}}} self.custom_api.patch_namespaced_custom_object( group="nvidia.com", version="v1alpha1", @@ -91,6 +128,20 @@ def update_graph_replicas( name=graph_deployment_name, body=patch, ) + logger.info( + f"Updated DGD {graph_deployment_name} service {service_name} to {replicas} replicas" + ) + + def update_graph_replicas( + self, graph_deployment_name: str, component_name: str, replicas: int + ) -> None: + """ + Update replicas for a service. Now uses DGDSA when available. + + Deprecated: Use update_service_replicas() instead for clarity. + This method is kept for backward compatibility. + """ + self.update_service_replicas(graph_deployment_name, component_name, replicas) def is_deployment_ready(self, deployment: dict) -> bool: """Check if a graph deployment is ready""" diff --git a/deploy/cloud/helm/platform/components/operator/templates/planner.yaml b/deploy/cloud/helm/platform/components/operator/templates/planner.yaml index 11f60b5a483..a893a5afdf9 100644 --- a/deploy/cloud/helm/platform/components/operator/templates/planner.yaml +++ b/deploy/cloud/helm/platform/components/operator/templates/planner.yaml @@ -39,6 +39,9 @@ rules: - apiGroups: ["nvidia.com"] resources: ["dynamocomponentdeployments", "dynamographdeployments"] verbs: ["get", "list", "create", "update", "patch"] +- apiGroups: ["nvidia.com"] + resources: ["dynamographdeploymentscalingadapters/scale"] + verbs: ["patch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -68,4 +71,7 @@ rules: - apiGroups: ["nvidia.com"] resources: ["dynamocomponentdeployments", "dynamographdeployments"] verbs: ["get", "list", "create", "update", "patch"] -{{- end }} \ No newline at end of file +- apiGroups: ["nvidia.com"] + resources: ["dynamographdeploymentscalingadapters/scale"] + verbs: ["patch"] +{{- end }} diff --git a/tests/planner/unit/kube.py b/tests/planner/unit/kube.py index 87a10713baa..b1ef79d741b 100644 --- a/tests/planner/unit/kube.py +++ b/tests/planner/unit/kube.py @@ -76,11 +76,87 @@ def test_get_graph_deployment_from_name(k8s_api, mock_custom_api): ) -def test_update_graph_replicas(k8s_api, mock_custom_api): +def test_update_service_replicas_uses_dgdsa_scale(k8s_api, mock_custom_api): + """Test that update_service_replicas uses DGDSA Scale API when available""" + mock_custom_api.patch_namespaced_custom_object_scale.return_value = None + + k8s_api.update_service_replicas("test-deployment", "Frontend", 3) + + # Should use Scale subresource with lowercase adapter name + mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with( + group="nvidia.com", + version="v1alpha1", + namespace=k8s_api.current_namespace, + plural="dynamographdeploymentscalingadapters", + name="test-deployment-frontend", # lowercase service name + body={"spec": {"replicas": 3}}, + ) + # Should NOT fall back to DGD patch + mock_custom_api.patch_namespaced_custom_object.assert_not_called() + + +def test_update_service_replicas_fallback_to_dgd(k8s_api, mock_custom_api): + """Test that update_service_replicas falls back to DGD when DGDSA not found""" + # DGDSA doesn't exist (404) + mock_custom_api.patch_namespaced_custom_object_scale.side_effect = ( + client.ApiException(status=404) + ) mock_custom_api.patch_namespaced_custom_object.return_value = None + k8s_api.update_service_replicas("test-deployment", "test-component", 1) + + # Should have tried DGDSA first + mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once() + + # Should fall back to DGD patch + mock_custom_api.patch_namespaced_custom_object.assert_called_once_with( + group="nvidia.com", + version="v1alpha1", + namespace=k8s_api.current_namespace, + plural="dynamographdeployments", + name="test-deployment", + body={"spec": {"services": {"test-component": {"replicas": 1}}}}, + ) + + +def test_update_service_replicas_propagates_other_errors(k8s_api, mock_custom_api): + """Test that update_service_replicas propagates non-404 errors""" + mock_custom_api.patch_namespaced_custom_object_scale.side_effect = ( + client.ApiException(status=500, reason="Internal Server Error") + ) + + with pytest.raises(client.ApiException) as exc_info: + k8s_api.update_service_replicas("test-deployment", "test-component", 1) + + assert exc_info.value.status == 500 + # Should NOT fall back to DGD + mock_custom_api.patch_namespaced_custom_object.assert_not_called() + + +def test_update_graph_replicas_calls_update_service_replicas(k8s_api, mock_custom_api): + """Test that deprecated update_graph_replicas calls update_service_replicas""" + mock_custom_api.patch_namespaced_custom_object_scale.return_value = None + + # Use the deprecated method k8s_api.update_graph_replicas("test-deployment", "test-component", 1) + # Should delegate to update_service_replicas which uses Scale API + mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with( + group="nvidia.com", + version="v1alpha1", + namespace=k8s_api.current_namespace, + plural="dynamographdeploymentscalingadapters", + name="test-deployment-test-component", + body={"spec": {"replicas": 1}}, + ) + + +def test_update_dgd_replicas_directly(k8s_api, mock_custom_api): + """Test the internal _update_dgd_replicas method""" + mock_custom_api.patch_namespaced_custom_object.return_value = None + + k8s_api._update_dgd_replicas("test-deployment", "test-component", 1) + mock_custom_api.patch_namespaced_custom_object.assert_called_once_with( group="nvidia.com", version="v1alpha1",