Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions components/src/dynamo/planner/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,48 @@ def get_graph_deployment(self, graph_deployment_name: str) -> dict:
)
raise

def update_graph_replicas(
self, graph_deployment_name: str, component_name: str, replicas: int
def update_service_replicas(
self, graph_deployment_name: str, service_name: str, replicas: int
) -> None:
"""
Update replicas for a service using Scale subresource when DGDSA exists.
Falls back to DGD patch for backward compatibility with older operators.

Args:
graph_deployment_name: Name of the DynamoGraphDeployment
service_name: Name of the service in DGD.spec.services
replicas: Desired number of replicas
"""
# DGDSA naming convention: <dgd-name>-<lowercase-service-name>
adapter_name = f"{graph_deployment_name}-{service_name.lower()}"

try:
# Try to scale via DGDSA Scale subresource
self.custom_api.patch_namespaced_custom_object_scale(
group="nvidia.com",
version="v1alpha1",
namespace=self.current_namespace,
plural="dynamographdeploymentscalingadapters",
name=adapter_name,
body={"spec": {"replicas": replicas}},
)
logger.info(f"Scaled DGDSA {adapter_name} to {replicas} replicas")

except client.ApiException as e:
if e.status == 404:
# DGDSA doesn't exist - fall back to DGD patch (old operator)
logger.info(
f"DGDSA {adapter_name} not found, falling back to DGD update"
)
self._update_dgd_replicas(graph_deployment_name, service_name, replicas)
else:
raise

def _update_dgd_replicas(
self, graph_deployment_name: str, service_name: str, replicas: int
) -> None:
"""Update the replicas count for a component in a DynamoGraphDeployment"""
patch = {"spec": {"services": {component_name: {"replicas": replicas}}}}
"""Update replicas directly in DGD (fallback for old operators)"""
patch = {"spec": {"services": {service_name: {"replicas": replicas}}}}
self.custom_api.patch_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
Expand All @@ -91,6 +128,20 @@ def update_graph_replicas(
name=graph_deployment_name,
body=patch,
)
logger.info(
f"Updated DGD {graph_deployment_name} service {service_name} to {replicas} replicas"
)

def update_graph_replicas(
self, graph_deployment_name: str, component_name: str, replicas: int
) -> None:
"""
Update replicas for a service. Now uses DGDSA when available.

Deprecated: Use update_service_replicas() instead for clarity.
This method is kept for backward compatibility.
"""
self.update_service_replicas(graph_deployment_name, component_name, replicas)

def is_deployment_ready(self, deployment: dict) -> bool:
"""Check if a graph deployment is ready"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ rules:
- apiGroups: ["nvidia.com"]
resources: ["dynamocomponentdeployments", "dynamographdeployments"]
verbs: ["get", "list", "create", "update", "patch"]
- apiGroups: ["nvidia.com"]
resources: ["dynamographdeploymentscalingadapters/scale"]
verbs: ["patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down Expand Up @@ -68,4 +71,7 @@ rules:
- apiGroups: ["nvidia.com"]
resources: ["dynamocomponentdeployments", "dynamographdeployments"]
verbs: ["get", "list", "create", "update", "patch"]
{{- end }}
- apiGroups: ["nvidia.com"]
resources: ["dynamographdeploymentscalingadapters/scale"]
verbs: ["patch"]
{{- end }}
78 changes: 77 additions & 1 deletion tests/planner/unit/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,87 @@ def test_get_graph_deployment_from_name(k8s_api, mock_custom_api):
)


def test_update_graph_replicas(k8s_api, mock_custom_api):
def test_update_service_replicas_uses_dgdsa_scale(k8s_api, mock_custom_api):
"""Test that update_service_replicas uses DGDSA Scale API when available"""
mock_custom_api.patch_namespaced_custom_object_scale.return_value = None

k8s_api.update_service_replicas("test-deployment", "Frontend", 3)

# Should use Scale subresource with lowercase adapter name
mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
namespace=k8s_api.current_namespace,
plural="dynamographdeploymentscalingadapters",
name="test-deployment-frontend", # lowercase service name
body={"spec": {"replicas": 3}},
)
# Should NOT fall back to DGD patch
mock_custom_api.patch_namespaced_custom_object.assert_not_called()


def test_update_service_replicas_fallback_to_dgd(k8s_api, mock_custom_api):
"""Test that update_service_replicas falls back to DGD when DGDSA not found"""
# DGDSA doesn't exist (404)
mock_custom_api.patch_namespaced_custom_object_scale.side_effect = (
client.ApiException(status=404)
)
mock_custom_api.patch_namespaced_custom_object.return_value = None

k8s_api.update_service_replicas("test-deployment", "test-component", 1)

# Should have tried DGDSA first
mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once()

# Should fall back to DGD patch
mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
namespace=k8s_api.current_namespace,
plural="dynamographdeployments",
name="test-deployment",
body={"spec": {"services": {"test-component": {"replicas": 1}}}},
)


def test_update_service_replicas_propagates_other_errors(k8s_api, mock_custom_api):
"""Test that update_service_replicas propagates non-404 errors"""
mock_custom_api.patch_namespaced_custom_object_scale.side_effect = (
client.ApiException(status=500, reason="Internal Server Error")
)

with pytest.raises(client.ApiException) as exc_info:
k8s_api.update_service_replicas("test-deployment", "test-component", 1)

assert exc_info.value.status == 500
# Should NOT fall back to DGD
mock_custom_api.patch_namespaced_custom_object.assert_not_called()


def test_update_graph_replicas_calls_update_service_replicas(k8s_api, mock_custom_api):
"""Test that deprecated update_graph_replicas calls update_service_replicas"""
mock_custom_api.patch_namespaced_custom_object_scale.return_value = None

# Use the deprecated method
k8s_api.update_graph_replicas("test-deployment", "test-component", 1)

# Should delegate to update_service_replicas which uses Scale API
mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
namespace=k8s_api.current_namespace,
plural="dynamographdeploymentscalingadapters",
name="test-deployment-test-component",
body={"spec": {"replicas": 1}},
)


def test_update_dgd_replicas_directly(k8s_api, mock_custom_api):
"""Test the internal _update_dgd_replicas method"""
mock_custom_api.patch_namespaced_custom_object.return_value = None

k8s_api._update_dgd_replicas("test-deployment", "test-component", 1)

mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
Expand Down
Loading