Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -166,3 +167,25 @@ func (s *DynamoComponentDeployment) SetSpec(spec any) {
func (s *DynamoComponentDeployment) IsMainComponent() bool {
return strings.HasSuffix(s.Spec.DynamoTag, s.Spec.ServiceName)
}

func (s *DynamoComponentDeployment) GetDynamoDeploymentConfig() []byte {
for _, env := range s.Spec.Envs {
if env.Name == commonconsts.DynamoDeploymentConfigEnvVar {
return []byte(env.Value)
}
}
return nil
}

func (s *DynamoComponentDeployment) SetDynamoDeploymentConfig(config []byte) {
for i, env := range s.Spec.Envs {
if env.Name == commonconsts.DynamoDeploymentConfigEnvVar {
s.Spec.Envs[i].Value = string(config)
return
}
}
s.Spec.Envs = append(s.Spec.Envs, corev1.EnvVar{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: string(config),
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package v1alpha1

import (
"reflect"
"testing"

commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -76,3 +79,134 @@ func TestDynamoComponentDeployment_IsMainComponent(t *testing.T) {
})
}
}

func TestDynamoComponentDeployment_GetDynamoDeploymentConfig(t *testing.T) {
type fields struct {
TypeMeta metav1.TypeMeta
ObjectMeta metav1.ObjectMeta
Spec DynamoComponentDeploymentSpec
Status DynamoComponentDeploymentStatus
}
tests := []struct {
name string
fields fields
want []byte
}{
{
name: "no config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: []corev1.EnvVar{},
},
},
},
want: nil,
},
{
name: "with config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
},
},
},
},
want: []byte(`{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &DynamoComponentDeployment{
TypeMeta: tt.fields.TypeMeta,
ObjectMeta: tt.fields.ObjectMeta,
Spec: tt.fields.Spec,
Status: tt.fields.Status,
}
if got := s.GetDynamoDeploymentConfig(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("DynamoComponentDeployment.GetDynamoDeploymentConfig() = %v, want %v", got, tt.want)
}
})
}
}

func TestDynamoComponentDeployment_SetDynamoDeploymentConfig(t *testing.T) {
type fields struct {
TypeMeta metav1.TypeMeta
ObjectMeta metav1.ObjectMeta
Spec DynamoComponentDeploymentSpec
Status DynamoComponentDeploymentStatus
}
type args struct {
config []byte
}
tests := []struct {
name string
fields fields
args args
want []corev1.EnvVar
}{
{
name: "no config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: nil,
},
},
},
args: args{
config: []byte(`{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`),
},
want: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
},
},
{
name: "with config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
},
},
},
},
args: args{
config: []byte(`{"Frontend":{"port":9000},"Planner":{"environment":"kubernetes"}}`),
},
want: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":9000},"Planner":{"environment":"kubernetes"}}`,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &DynamoComponentDeployment{
TypeMeta: tt.fields.TypeMeta,
ObjectMeta: tt.fields.ObjectMeta,
Spec: tt.fields.Spec,
Status: tt.fields.Status,
}
s.SetDynamoDeploymentConfig(tt.args.config)
if !reflect.DeepEqual(s.Spec.DynamoComponentDeploymentSharedSpec.Envs, tt.want) {
t.Errorf("DynamoComponentDeployment.SetDynamoDeploymentConfig() = %v, want %v", s.Spec.DynamoComponentDeploymentSharedSpec.Envs, tt.want)
}
})
}
}
2 changes: 2 additions & 0 deletions deploy/cloud/operator/internal/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ const (
KubeAnnotationDynamoComponentHash = "nvidia.com/dynamo-request-hash"
KubeAnnotationDynamoComponentImageBuiderHash = "nvidia.com/dynamo-request-image-builder-hash"
KubeAnnotationDynamoComponentStorageNS = "nvidia.com/dynamo-storage-namespace"

DynamoDeploymentConfigEnvVar = "DYN_DEPLOYMENT_CONFIG"
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package controller

import (
"context"
"encoding/json"
"fmt"

"dario.cat/mergo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -33,9 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
)
Expand All @@ -44,8 +39,6 @@ const (
FailedState = "failed"
ReadyState = "successful"
PendingState = "pending"

DYN_DEPLOYMENT_CONFIG_ENV_VAR = "DYN_DEPLOYMENT_CONFIG"
)

type etcdStorage interface {
Expand Down Expand Up @@ -141,37 +134,12 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
}

// merge the dynamoComponentsDeployments with the dynamoComponentsDeployments from the CRD
for serviceName, deployment := range dynamoComponentsDeployments {
if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
if err != nil {
logger.Error(err, "failed to merge the DynamoComponentsDeployments")
reason = "failed_to_merge_the_DynamoComponentsDeployments"
return ctrl.Result{}, err
}
}
for _, deployment := range dynamoComponentsDeployments {
if deployment.Spec.Ingress.Enabled {
dynamoDeployment.SetEndpointStatus(r.isEndpointSecured(), getIngressHost(deployment.Spec.Ingress))
}
}

// Set common env vars on each of the dynamoComponentsDeployments
for _, deployment := range dynamoComponentsDeployments {
if len(dynamoDeployment.Spec.Envs) > 0 {
deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
}
err := updateDynDeploymentConfig(deployment, consts.DynamoServicePort)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to update the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
return ctrl.Result{}, err
}
err = overrideWithDynDeploymentConfig(ctx, deployment)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to override the component config with the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
return ctrl.Result{}, err
}
}

// reconcile the dynamoComponent
// for now we use the same component for all the services and we differentiate them by the service name when launching the component
dynamoComponent := &nvidiacomv1alpha1.DynamoComponent{
Expand Down Expand Up @@ -260,121 +228,6 @@ func (r *DynamoGraphDeploymentReconciler) isEndpointSecured() bool {
return r.IngressControllerTLSSecret != ""
}

func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
envMap := make(map[string]corev1.EnvVar)

// Add all common environment variables.
for _, env := range common {
envMap[env.Name] = env
}

// Override or add with service-specific environment variables.
for _, env := range specific {
envMap[env.Name] = env
}

// Convert the map back to a slice.
merged := make([]corev1.EnvVar, 0, len(envMap))
for _, env := range envMap {
merged = append(merged, env)
}
return merged
}

// updateDynDeploymentConfig updates the DYN_DEPLOYMENT_CONFIG env var for the given dynamoDeploymentComponent
// It updates the port for the given service in the DYN_DEPLOYMENT_CONFIG env var (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment, newPort int) error {
if dynamoDeploymentComponent.IsMainComponent() {
for i, env := range dynamoDeploymentComponent.Spec.Envs {
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
var config map[string]any
if err := json.Unmarshal([]byte(env.Value), &config); err != nil {
return fmt.Errorf("failed to unmarshal %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
}

// Safely navigate and update the config
if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
if _, portExists := serviceConfig["port"]; portExists {
serviceConfig["port"] = newPort
}
}

// Marshal back to JSON string
updated, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal updated config: %w", err)
}

// Update env var
dynamoDeploymentComponent.Spec.Envs[i].Value = string(updated)
break
}
}
}
return nil
}

func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment) error {
for _, env := range dynamoDeploymentComponent.Spec.Envs {
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
dynDeploymentConfig, err := dynamo.ParseDynDeploymentConfig(ctx, []byte(env.Value))
if err != nil {
return fmt.Errorf("failed to parse %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
}
componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
if componentDynConfig != nil {
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil && dynamoDeploymentComponent.Spec.Replicas == nil {
// we only override the replicas if it is not set in the CRD.
// replicas, if set in the CRD set in the CRD must always be the source of truth.
dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
}
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
requests := &dynamoCommon.ResourceItem{}
limits := &dynamoCommon.ResourceItem{}
if dynamoDeploymentComponent.Spec.Resources == nil {
dynamoDeploymentComponent.Spec.Resources = &dynamoCommon.Resources{
Requests: requests,
Limits: limits,
}
} else {
if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
requests = dynamoDeploymentComponent.Spec.Resources.Requests
} else {
dynamoDeploymentComponent.Spec.Resources.Requests = requests
}
if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
limits = dynamoDeploymentComponent.Spec.Resources.Limits
} else {
dynamoDeploymentComponent.Spec.Resources.Limits = limits
}
}
if componentDynConfig.ServiceArgs.Resources.GPU != nil {
requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
}
if componentDynConfig.ServiceArgs.Resources.CPU != nil {
requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
}
if componentDynConfig.ServiceArgs.Resources.Memory != nil {
requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
}
if componentDynConfig.ServiceArgs.Resources.Custom != nil {
requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
}
if err := dynamo.SetLwsAnnotations(componentDynConfig.ServiceArgs, dynamoDeploymentComponent); err != nil {
return err
}
}
}
break
}
}
return nil
}

func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
// for now doing nothing
return nil
Expand Down
Loading
Loading