Skip to content

Commit 3fff50c

Browse files
committed
fix: take into account number of workers from config
1 parent ac53c0b commit 3fff50c

File tree

7 files changed

+1231
-677
lines changed

7 files changed

+1231
-677
lines changed

deploy/cloud/operator/api/v1alpha1/dynamocomponentdeployment_types.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strings"
2424

2525
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
26+
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
2627
corev1 "k8s.io/api/core/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
)
@@ -166,3 +167,25 @@ func (s *DynamoComponentDeployment) SetSpec(spec any) {
166167
func (s *DynamoComponentDeployment) IsMainComponent() bool {
167168
return strings.HasSuffix(s.Spec.DynamoTag, s.Spec.ServiceName)
168169
}
170+
171+
func (s *DynamoComponentDeployment) GetDynamoDeploymentConfig() []byte {
172+
for _, env := range s.Spec.Envs {
173+
if env.Name == commonconsts.DynamoDeploymentConfigEnvVar {
174+
return []byte(env.Value)
175+
}
176+
}
177+
return nil
178+
}
179+
180+
func (s *DynamoComponentDeployment) SetDynamoDeploymentConfig(config []byte) {
181+
for i, env := range s.Spec.Envs {
182+
if env.Name == commonconsts.DynamoDeploymentConfigEnvVar {
183+
s.Spec.Envs[i].Value = string(config)
184+
return
185+
}
186+
}
187+
s.Spec.Envs = append(s.Spec.Envs, corev1.EnvVar{
188+
Name: commonconsts.DynamoDeploymentConfigEnvVar,
189+
Value: string(config),
190+
})
191+
}

deploy/cloud/operator/api/v1alpha1/dynamocomponentdeployment_types_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
package v1alpha1
2121

2222
import (
23+
"reflect"
2324
"testing"
2425

26+
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
27+
corev1 "k8s.io/api/core/v1"
2528
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2629
)
2730

@@ -76,3 +79,134 @@ func TestDynamoComponentDeployment_IsMainComponent(t *testing.T) {
7679
})
7780
}
7881
}
82+
83+
func TestDynamoComponentDeployment_GetDynamoDeploymentConfig(t *testing.T) {
84+
type fields struct {
85+
TypeMeta metav1.TypeMeta
86+
ObjectMeta metav1.ObjectMeta
87+
Spec DynamoComponentDeploymentSpec
88+
Status DynamoComponentDeploymentStatus
89+
}
90+
tests := []struct {
91+
name string
92+
fields fields
93+
want []byte
94+
}{
95+
{
96+
name: "no config",
97+
fields: fields{
98+
Spec: DynamoComponentDeploymentSpec{
99+
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
100+
Envs: []corev1.EnvVar{},
101+
},
102+
},
103+
},
104+
want: nil,
105+
},
106+
{
107+
name: "with config",
108+
fields: fields{
109+
Spec: DynamoComponentDeploymentSpec{
110+
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
111+
Envs: []corev1.EnvVar{
112+
{
113+
Name: commonconsts.DynamoDeploymentConfigEnvVar,
114+
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
115+
},
116+
},
117+
},
118+
},
119+
},
120+
want: []byte(`{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`),
121+
},
122+
}
123+
for _, tt := range tests {
124+
t.Run(tt.name, func(t *testing.T) {
125+
s := &DynamoComponentDeployment{
126+
TypeMeta: tt.fields.TypeMeta,
127+
ObjectMeta: tt.fields.ObjectMeta,
128+
Spec: tt.fields.Spec,
129+
Status: tt.fields.Status,
130+
}
131+
if got := s.GetDynamoDeploymentConfig(); !reflect.DeepEqual(got, tt.want) {
132+
t.Errorf("DynamoComponentDeployment.GetDynamoDeploymentConfig() = %v, want %v", got, tt.want)
133+
}
134+
})
135+
}
136+
}
137+
138+
func TestDynamoComponentDeployment_SetDynamoDeploymentConfig(t *testing.T) {
139+
type fields struct {
140+
TypeMeta metav1.TypeMeta
141+
ObjectMeta metav1.ObjectMeta
142+
Spec DynamoComponentDeploymentSpec
143+
Status DynamoComponentDeploymentStatus
144+
}
145+
type args struct {
146+
config []byte
147+
}
148+
tests := []struct {
149+
name string
150+
fields fields
151+
args args
152+
want []corev1.EnvVar
153+
}{
154+
{
155+
name: "no config",
156+
fields: fields{
157+
Spec: DynamoComponentDeploymentSpec{
158+
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
159+
Envs: nil,
160+
},
161+
},
162+
},
163+
args: args{
164+
config: []byte(`{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`),
165+
},
166+
want: []corev1.EnvVar{
167+
{
168+
Name: commonconsts.DynamoDeploymentConfigEnvVar,
169+
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
170+
},
171+
},
172+
},
173+
{
174+
name: "with config",
175+
fields: fields{
176+
Spec: DynamoComponentDeploymentSpec{
177+
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
178+
Envs: []corev1.EnvVar{
179+
{
180+
Name: commonconsts.DynamoDeploymentConfigEnvVar,
181+
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
182+
},
183+
},
184+
},
185+
},
186+
},
187+
args: args{
188+
config: []byte(`{"Frontend":{"port":9000},"Planner":{"environment":"kubernetes"}}`),
189+
},
190+
want: []corev1.EnvVar{
191+
{
192+
Name: commonconsts.DynamoDeploymentConfigEnvVar,
193+
Value: `{"Frontend":{"port":9000},"Planner":{"environment":"kubernetes"}}`,
194+
},
195+
},
196+
},
197+
}
198+
for _, tt := range tests {
199+
t.Run(tt.name, func(t *testing.T) {
200+
s := &DynamoComponentDeployment{
201+
TypeMeta: tt.fields.TypeMeta,
202+
ObjectMeta: tt.fields.ObjectMeta,
203+
Spec: tt.fields.Spec,
204+
Status: tt.fields.Status,
205+
}
206+
s.SetDynamoDeploymentConfig(tt.args.config)
207+
if !reflect.DeepEqual(s.Spec.DynamoComponentDeploymentSharedSpec.Envs, tt.want) {
208+
t.Errorf("DynamoComponentDeployment.SetDynamoDeploymentConfig() = %v, want %v", s.Spec.DynamoComponentDeploymentSharedSpec.Envs, tt.want)
209+
}
210+
})
211+
}
212+
}

deploy/cloud/operator/internal/consts/consts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,6 @@ const (
6767
KubeAnnotationDynamoComponentHash = "nvidia.com/dynamo-request-hash"
6868
KubeAnnotationDynamoComponentImageBuiderHash = "nvidia.com/dynamo-request-image-builder-hash"
6969
KubeAnnotationDynamoComponentStorageNS = "nvidia.com/dynamo-storage-namespace"
70+
71+
DynamoDeploymentConfigEnvVar = "DYN_DEPLOYMENT_CONFIG"
7072
)

deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go

Lines changed: 1 addition & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ package controller
1919

2020
import (
2121
"context"
22-
"encoding/json"
2322
"fmt"
2423

25-
"dario.cat/mergo"
26-
corev1 "k8s.io/api/core/v1"
2724
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2825
"k8s.io/client-go/tools/record"
2926
ctrl "sigs.k8s.io/controller-runtime"
@@ -33,9 +30,7 @@ import (
3330
"sigs.k8s.io/controller-runtime/pkg/log"
3431
"sigs.k8s.io/controller-runtime/pkg/predicate"
3532

36-
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
3733
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
38-
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
3934
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
4035
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
4136
)
@@ -44,8 +39,6 @@ const (
4439
FailedState = "failed"
4540
ReadyState = "successful"
4641
PendingState = "pending"
47-
48-
DYN_DEPLOYMENT_CONFIG_ENV_VAR = "DYN_DEPLOYMENT_CONFIG"
4942
)
5043

5144
type etcdStorage interface {
@@ -141,37 +134,12 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
141134
}
142135

143136
// merge the dynamoComponentsDeployments with the dynamoComponentsDeployments from the CRD
144-
for serviceName, deployment := range dynamoComponentsDeployments {
145-
if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
146-
err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
147-
if err != nil {
148-
logger.Error(err, "failed to merge the DynamoComponentsDeployments")
149-
reason = "failed_to_merge_the_DynamoComponentsDeployments"
150-
return ctrl.Result{}, err
151-
}
152-
}
137+
for _, deployment := range dynamoComponentsDeployments {
153138
if deployment.Spec.Ingress.Enabled {
154139
dynamoDeployment.SetEndpointStatus(r.isEndpointSecured(), getIngressHost(deployment.Spec.Ingress))
155140
}
156141
}
157142

158-
// Set common env vars on each of the dynamoComponentsDeployments
159-
for _, deployment := range dynamoComponentsDeployments {
160-
if len(dynamoDeployment.Spec.Envs) > 0 {
161-
deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
162-
}
163-
err := updateDynDeploymentConfig(deployment, consts.DynamoServicePort)
164-
if err != nil {
165-
logger.Error(err, fmt.Sprintf("Failed to update the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
166-
return ctrl.Result{}, err
167-
}
168-
err = overrideWithDynDeploymentConfig(ctx, deployment)
169-
if err != nil {
170-
logger.Error(err, fmt.Sprintf("Failed to override the component config with the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
171-
return ctrl.Result{}, err
172-
}
173-
}
174-
175143
// reconcile the dynamoComponent
176144
// for now we use the same component for all the services and we differentiate them by the service name when launching the component
177145
dynamoComponent := &nvidiacomv1alpha1.DynamoComponent{
@@ -260,121 +228,6 @@ func (r *DynamoGraphDeploymentReconciler) isEndpointSecured() bool {
260228
return r.IngressControllerTLSSecret != ""
261229
}
262230

263-
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
264-
envMap := make(map[string]corev1.EnvVar)
265-
266-
// Add all common environment variables.
267-
for _, env := range common {
268-
envMap[env.Name] = env
269-
}
270-
271-
// Override or add with service-specific environment variables.
272-
for _, env := range specific {
273-
envMap[env.Name] = env
274-
}
275-
276-
// Convert the map back to a slice.
277-
merged := make([]corev1.EnvVar, 0, len(envMap))
278-
for _, env := range envMap {
279-
merged = append(merged, env)
280-
}
281-
return merged
282-
}
283-
284-
// updateDynDeploymentConfig updates the DYN_DEPLOYMENT_CONFIG env var for the given dynamoDeploymentComponent
285-
// It updates the port for the given service in the DYN_DEPLOYMENT_CONFIG env var (if it is the main component)
286-
func updateDynDeploymentConfig(dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment, newPort int) error {
287-
if dynamoDeploymentComponent.IsMainComponent() {
288-
for i, env := range dynamoDeploymentComponent.Spec.Envs {
289-
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
290-
var config map[string]any
291-
if err := json.Unmarshal([]byte(env.Value), &config); err != nil {
292-
return fmt.Errorf("failed to unmarshal %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
293-
}
294-
295-
// Safely navigate and update the config
296-
if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
297-
if _, portExists := serviceConfig["port"]; portExists {
298-
serviceConfig["port"] = newPort
299-
}
300-
}
301-
302-
// Marshal back to JSON string
303-
updated, err := json.Marshal(config)
304-
if err != nil {
305-
return fmt.Errorf("failed to marshal updated config: %w", err)
306-
}
307-
308-
// Update env var
309-
dynamoDeploymentComponent.Spec.Envs[i].Value = string(updated)
310-
break
311-
}
312-
}
313-
}
314-
return nil
315-
}
316-
317-
func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment) error {
318-
for _, env := range dynamoDeploymentComponent.Spec.Envs {
319-
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
320-
dynDeploymentConfig, err := dynamo.ParseDynDeploymentConfig(ctx, []byte(env.Value))
321-
if err != nil {
322-
return fmt.Errorf("failed to parse %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
323-
}
324-
componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
325-
if componentDynConfig != nil {
326-
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil && dynamoDeploymentComponent.Spec.Replicas == nil {
327-
// we only override the replicas if it is not set in the CRD.
328-
// replicas, if set in the CRD set in the CRD must always be the source of truth.
329-
dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
330-
}
331-
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
332-
requests := &dynamoCommon.ResourceItem{}
333-
limits := &dynamoCommon.ResourceItem{}
334-
if dynamoDeploymentComponent.Spec.Resources == nil {
335-
dynamoDeploymentComponent.Spec.Resources = &dynamoCommon.Resources{
336-
Requests: requests,
337-
Limits: limits,
338-
}
339-
} else {
340-
if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
341-
requests = dynamoDeploymentComponent.Spec.Resources.Requests
342-
} else {
343-
dynamoDeploymentComponent.Spec.Resources.Requests = requests
344-
}
345-
if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
346-
limits = dynamoDeploymentComponent.Spec.Resources.Limits
347-
} else {
348-
dynamoDeploymentComponent.Spec.Resources.Limits = limits
349-
}
350-
}
351-
if componentDynConfig.ServiceArgs.Resources.GPU != nil {
352-
requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
353-
limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
354-
}
355-
if componentDynConfig.ServiceArgs.Resources.CPU != nil {
356-
requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
357-
limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
358-
}
359-
if componentDynConfig.ServiceArgs.Resources.Memory != nil {
360-
requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
361-
limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
362-
}
363-
if componentDynConfig.ServiceArgs.Resources.Custom != nil {
364-
requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
365-
limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
366-
}
367-
if err := dynamo.SetLwsAnnotations(componentDynConfig.ServiceArgs, dynamoDeploymentComponent); err != nil {
368-
return err
369-
}
370-
}
371-
}
372-
break
373-
}
374-
}
375-
return nil
376-
}
377-
378231
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
379232
// for now doing nothing
380233
return nil

0 commit comments

Comments
 (0)