diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 98b89634fe1..4a58b7bd863 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -493,6 +493,9 @@ func BuildPod(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, rayNo initLivenessAndReadinessProbe(&pod.Spec.Containers[utils.RayContainerIndex], rayNodeType, creatorCRDType) } + // Add downward API environment variables for Ray's default node labels for Ray label-based scheduling. + addDefaultRayNodeLabels(&pod) + return pod } @@ -681,6 +684,7 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, fqdnRay container.Env = append(container.Env, rayCloudInstanceID) // RAY_NODE_TYPE_NAME is used by Ray Autoscaler V2 (alpha). See https://github.com/ray-project/kuberay/issues/1965 for more details. + // This value can be used to set the ray.io/node-group default Ray node label. nodeGroupNameEnv := corev1.EnvVar{ Name: utils.RAY_NODE_TYPE_NAME, ValueFrom: &corev1.EnvVarSource{ @@ -1034,3 +1038,157 @@ func isGPUResourceKey(key string) bool { match, _ := regexp.MatchString(`nvidia\.com/mig-\d+g\.\d+gb$`, key) return match } + +// containsEnvVar is a helper function to check if a container contains a specified EnvVar +func containsEnvVar(container corev1.Container, envVar string) bool { + for _, env := range container.Env { + if env.Name == envVar { + return true + } + } + return false +} + +// addDefaultRayNodeLabels passes default Ray node labels to Ray runtime environment +func addDefaultRayNodeLabels(pod *corev1.Pod) { + rayContainer := &pod.Spec.Containers[utils.RayContainerIndex] + envVars := rayContainer.Env + + if !containsEnvVar(*rayContainer, utils.RayNodeMarketType) { + // used to set the ray.io/market-type node label + envVars = append(envVars, corev1.EnvVar{ + Name: utils.RayNodeMarketType, + Value: string(getPodMarketType(pod)), + }) + } + if !containsEnvVar(*rayContainer, utils.RayNodeZone) { + envVars = append(envVars, getPodZoneEnvVar(pod)) + } + if !containsEnvVar(*rayContainer, utils.RayNodeRegion) { + envVars = append(envVars, getPodRegionEnvVar(pod)) + } + rayContainer.Env = envVars +} + +// getPodZoneEnvVar is a helper function to determine the ray.io/availability-zone label value +// based on a Pod spec - checking labels, nodeSelectors, and then falling back to downward API. +func getPodZoneEnvVar(pod *corev1.Pod) corev1.EnvVar { + if podZone, ok := pod.Labels[utils.K8sTopologyZoneLabel]; ok && podZone != "" { + return corev1.EnvVar{ + Name: utils.RayNodeZone, + Value: podZone, + } + } else if podZone, ok := pod.Spec.NodeSelector[utils.K8sTopologyZoneLabel]; ok && podZone != "" { + return corev1.EnvVar{ + Name: utils.RayNodeZone, + Value: podZone, + } + } + // uses downward api to set the ray.io/availability-zone node label + // Ref: https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone + return corev1.EnvVar{ + Name: utils.RayNodeZone, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.labels['%s']", utils.K8sTopologyZoneLabel), + }, + }, + } +} + +// getPodRegionEnvVar is a helper function to determine the ray.io/availability-region label value +// based on a Pod spec - checking labels, nodeSelectors, and then falling back to downward API. +func getPodRegionEnvVar(pod *corev1.Pod) corev1.EnvVar { + if podRegion, ok := pod.Labels[utils.K8sTopologyRegionLabel]; ok && podRegion != "" { + return corev1.EnvVar{ + Name: utils.RayNodeRegion, + Value: podRegion, + } + } else if podRegion, ok := pod.Spec.NodeSelector[utils.K8sTopologyRegionLabel]; ok && podRegion != "" { + return corev1.EnvVar{ + Name: utils.RayNodeRegion, + Value: podRegion, + } + } + // uses downward api to set the ray.io/availability-region node label + // Ref: https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesioregion + return corev1.EnvVar{ + Name: utils.RayNodeRegion, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.labels['%s']", utils.K8sTopologyRegionLabel), + }, + }, + } +} + +// getPodMarketTypeFromNodeSelector is a helper function to determine the ray.io/market-type +// label value based on a Kubernetes Pod spec - checking labels, nodeSelector, and nodeAffinity. +func getPodMarketType(pod *corev1.Pod) utils.PodMarketType { + marketType := getPodMarketTypeFromNodeSelector(pod.Spec.NodeSelector) + + if marketType == utils.OnDemandMarketType && pod.Spec.Affinity != nil { + // check for NodeAffinity if nodeSelector specifying spot instance not found + marketType = getPodMarketTypeFromNodeAffinity(pod.Spec.Affinity.NodeAffinity) + } + return marketType +} + +// getPodMarketTypeFromNodeSelector returns a ray.io/market-type label +// based on user-provided Kubernetes nodeSelector values. +func getPodMarketTypeFromNodeSelector(selector map[string]string) utils.PodMarketType { + if selector == nil { + return utils.OnDemandMarketType + } + // check for GKE spot instance selector + if val, ok := selector[utils.GKESpotLabel]; ok && val == "true" { + return utils.SpotMarketType + } + // check for EKS spot instance selector + if val, ok := selector[utils.EKSCapacityTypeLabel]; ok && val == "SPOT" { + return utils.SpotMarketType + } + return utils.OnDemandMarketType +} + +// getPodMarketTypeFromNodeSelector returns a ray.io/market-type label +// based on user-provided Kubernetes nodeAffinity values. +func getPodMarketTypeFromNodeAffinity(nodeAffinity *corev1.NodeAffinity) utils.PodMarketType { + if nodeAffinity == nil { + return utils.OnDemandMarketType + } + + // Only add the spot instance label if the Pod is guaranteed to be on a node of + // that type when scheduled. + requiredTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution + if requiredTerms == nil { + return utils.OnDemandMarketType + } + + for _, term := range requiredTerms.NodeSelectorTerms { + for _, expr := range term.MatchExpressions { + switch expr.Key { + // GKE specific check + case utils.GKESpotLabel: + if expr.Operator == corev1.NodeSelectorOpIn { + for _, val := range expr.Values { + if val == "true" { + return utils.SpotMarketType + } + } + } + // Amazon EKS specific check + case utils.EKSCapacityTypeLabel: + if expr.Operator == corev1.NodeSelectorOpIn { + for _, val := range expr.Values { + if val == "SPOT" { + return utils.SpotMarketType + } + } + } + } + } + } + // Default to on-demand instance type + return utils.OnDemandMarketType +} diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 39c2f3cf954..80ca310dcf4 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -1934,3 +1934,401 @@ func TestSetAutoscalerV2EnvVars(t *testing.T) { }) } } + +func TestGetPodMarketTypeFromNodeSelector(t *testing.T) { + tests := []struct { + name string + nodeSelector map[string]string + expectedType utils.PodMarketType + }{ + { + name: "GKE spot instance", + nodeSelector: map[string]string{utils.GKESpotLabel: "true"}, + expectedType: utils.SpotMarketType, + }, + { + name: "EKS spot instance", + nodeSelector: map[string]string{utils.EKSCapacityTypeLabel: "SPOT"}, + expectedType: utils.SpotMarketType, + }, + { + name: "on-demand instance (no selector provided)", + nodeSelector: nil, + expectedType: utils.OnDemandMarketType, + }, + { + name: "on-demand instance (non-spot selector provided)", + nodeSelector: map[string]string{"some-label": "value"}, + expectedType: utils.OnDemandMarketType, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualType := getPodMarketTypeFromNodeSelector(tt.nodeSelector) + if actualType != tt.expectedType { + t.Errorf("got market-type %v, but expected %v", actualType, tt.expectedType) + } + }) + } +} + +func TestGetPodMarketTypeFromNodeAffinity(t *testing.T) { + tests := []struct { + name string + nodeAffinity *corev1.NodeAffinity + expectedType utils.PodMarketType + }{ + { + name: "GKE spot instance from nodeAffinity", + nodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: utils.GKESpotLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + expectedType: utils.SpotMarketType, + }, + { + name: "EKS spot instance from nodeAffinity", + nodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: utils.EKSCapacityTypeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"SPOT"}, + }, + }, + }, + }, + }, + }, + expectedType: utils.SpotMarketType, + }, + { + name: "nil nodeAffinity", + nodeAffinity: nil, + expectedType: utils.OnDemandMarketType, + }, + { + name: "nodeAffinity with other selectors provided", + nodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "region", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"us-west4"}, + }, + }, + }, + }, + }, + }, + expectedType: utils.OnDemandMarketType, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualType := getPodMarketTypeFromNodeAffinity(tt.nodeAffinity) + if actualType != tt.expectedType { + t.Errorf("got market-type %v, but expected %v", actualType, tt.expectedType) + } + }) + } +} + +func TestGetPodMarketType(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + expectedType utils.PodMarketType + }{ + { + name: "GKE spot instance from nodeSelector", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + NodeSelector: map[string]string{ + utils.GKESpotLabel: "true", + }, + }, + }, + expectedType: utils.SpotMarketType, + }, + { + name: "EKS spot from nodeAffinity when nodeSelector missing", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "eks-spot"}, + Spec: corev1.PodSpec{ + NodeSelector: nil, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: utils.EKSCapacityTypeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"SPOT"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedType: utils.SpotMarketType, + }, + { + name: "No nodeSelectors or nodeAffinity provided", + pod: &corev1.Pod{}, + expectedType: utils.OnDemandMarketType, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + marketType := getPodMarketType(tt.pod) + if marketType != tt.expectedType { + t.Errorf("got market-type of %v, but expected %v", marketType, tt.expectedType) + } + }) + } +} + +func TestAddDefaultRayNodeLabels(t *testing.T) { + tests := []struct { + labels map[string]string + nodeSelector map[string]string + nodeAffinity *corev1.NodeAffinity + expectedEnv map[string]string + name string + }{ + { + name: "Availability zone vars set from region and zone topology labels", + labels: map[string]string{ + utils.K8sTopologyRegionLabel: "us-west4", + utils.K8sTopologyZoneLabel: "us-west4-a", + }, + expectedEnv: map[string]string{ + utils.RayNodeRegion: "us-west4", + utils.RayNodeZone: "us-west4-a", + }, + }, + { + name: "Availability zone vars set from region and zone topology nodeSelectors", + nodeSelector: map[string]string{ + utils.K8sTopologyRegionLabel: "us-central2", + utils.K8sTopologyZoneLabel: "us-central2-b", + }, + expectedEnv: map[string]string{ + utils.RayNodeRegion: "us-central2", + utils.RayNodeZone: "us-central2-b", + }, + }, + { + name: "Availability zone vars set from downward API", + expectedEnv: map[string]string{ + utils.RayNodeRegion: "metadata.labels['topology.kubernetes.io/region']", + utils.RayNodeZone: "metadata.labels['topology.kubernetes.io/zone']", + }, + }, + { + name: "Market type env var set from GKE Spot nodeSelector", + nodeSelector: map[string]string{ + utils.GKESpotLabel: "true", + utils.K8sTopologyRegionLabel: "me-central1", + utils.K8sTopologyZoneLabel: "me-central1-a", + }, + expectedEnv: map[string]string{ + utils.RayNodeMarketType: string(utils.SpotMarketType), + utils.RayNodeRegion: "me-central1", + utils.RayNodeZone: "me-central1-a", + }, + }, + { + name: "Market type env var set from EKS Spot nodeSelector", + nodeSelector: map[string]string{ + utils.EKSCapacityTypeLabel: "SPOT", + utils.K8sTopologyRegionLabel: "us-central1", + utils.K8sTopologyZoneLabel: "us-central1-c", + }, + expectedEnv: map[string]string{ + utils.RayNodeMarketType: string(utils.SpotMarketType), + utils.RayNodeRegion: "us-central1", + utils.RayNodeZone: "us-central1-c", + }, + }, + { + name: "Market type env var set from nodeAffinity", + nodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: utils.EKSCapacityTypeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"SPOT"}, + }, + }, + }, + }, + }, + }, + expectedEnv: map[string]string{ + utils.RayNodeMarketType: string(utils.SpotMarketType), + utils.RayNodeRegion: "metadata.labels['topology.kubernetes.io/region']", + utils.RayNodeZone: "metadata.labels['topology.kubernetes.io/zone']", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: tt.labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "ray"}}, + NodeSelector: tt.nodeSelector, + }, + } + if tt.nodeAffinity != nil { + pod.Spec.Affinity = &corev1.Affinity{NodeAffinity: tt.nodeAffinity} + } + // validate default labels are set correctly from Pod spec as env vars + addDefaultRayNodeLabels(pod) + rayContainer := pod.Spec.Containers[utils.RayContainerIndex] + for key, expectedVar := range tt.expectedEnv { + foundVar := false + for _, env := range rayContainer.Env { + if env.Name == key { + if env.Value != "" { + if env.Value != expectedVar { + t.Errorf("%s: got value %q, but expected %q", key, env.Value, expectedVar) + } + } else if env.ValueFrom != nil && env.ValueFrom.FieldRef != nil { + if env.ValueFrom.FieldRef.FieldPath != expectedVar { + t.Errorf("%s: got FieldPath %q, but expected %q", key, env.ValueFrom.FieldRef.FieldPath, expectedVar) + } + } else { + t.Errorf("%s: environment var not set as expected", key) + } + foundVar = true + break + } + } + if !foundVar { + t.Errorf("%s: not found in container env", key) + } + } + }) + } +} + +func TestGetPodZoneEnvVar(t *testing.T) { + tests := []struct { + name string + labels map[string]string + nodeSelector map[string]string + expectedVar string + }{ + { + name: "Retrieve topology zone from labels", + labels: map[string]string{utils.K8sTopologyZoneLabel: "us-west4-a"}, + expectedVar: "us-west4-a", + }, + { + name: "Retrieve topology zone from nodeSelector", + nodeSelector: map[string]string{utils.K8sTopologyZoneLabel: "us-central2-b"}, + expectedVar: "us-central2-b", + }, + { + name: "Zone set using downward API", + expectedVar: "metadata.labels['topology.kubernetes.io/zone']", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Labels: tt.labels}, + Spec: corev1.PodSpec{NodeSelector: tt.nodeSelector}, + } + // validate expected zone env var is parsed from Pod spec + result := getPodZoneEnvVar(pod) + if result.Value != "" { + if result.Value != tt.expectedVar { + t.Errorf("got env var %q, but expected %q", result.Value, tt.expectedVar) + } + } else if result.ValueFrom != nil { + if result.ValueFrom.FieldRef.FieldPath != tt.expectedVar { + t.Errorf("got FieldPath %q, but expected %q", result.ValueFrom.FieldRef.FieldPath, tt.expectedVar) + } + } else { + t.Errorf("getPodZoneEnvVar did not return expected env value") + } + }) + } +} + +func TestGetPodRegionEnvVar(t *testing.T) { + tests := []struct { + name string + labels map[string]string + nodeSelector map[string]string + expectedVar string + }{ + { + name: "Retrieve topology region from labels", + labels: map[string]string{utils.K8sTopologyRegionLabel: "us-central1"}, + expectedVar: "us-central1", + }, + { + name: "Retrieve topology region from nodeSelector", + nodeSelector: map[string]string{utils.K8sTopologyRegionLabel: "us-central2"}, + expectedVar: "us-central2", + }, + { + name: "Region set using downward API", + expectedVar: "metadata.labels['topology.kubernetes.io/region']", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Labels: tt.labels}, + Spec: corev1.PodSpec{NodeSelector: tt.nodeSelector}, + } + // validate expected region env var is parsed from Pod spec + result := getPodRegionEnvVar(pod) + if result.Value != "" { + if result.Value != tt.expectedVar { + t.Errorf("got env var %q, but expected %q", result.Value, tt.expectedVar) + } + } else if result.ValueFrom != nil { + if result.ValueFrom.FieldRef.FieldPath != tt.expectedVar { + t.Errorf("got FieldPath %q, but expected %q", result.ValueFrom.FieldRef.FieldPath, tt.expectedVar) + } + } else { + t.Errorf("getPodRegionEnvVar did not return expected env value") + } + }) + } +} diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 7a22bc76f8f..b599c7a71f3 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -224,6 +224,21 @@ const ( // MaxRayJobNameLength is the maximum RayJob name to make sure it pass the RayCluster validation // Minus 6 since we append 6 characters to the RayJob name to create the cluster (GenerateRayClusterName). MaxRayJobNameLength = MaxRayClusterNameLength - 6 + + RayNodeMarketType = "RAY_NODE_MARKET_TYPE" + RayNodeZone = "RAY_NODE_ZONE" + RayNodeRegion = "RAY_NODE_REGION" + GKESpotLabel = "cloud.google.com/gke-spot" + EKSCapacityTypeLabel = "eks.amazonaws.com/capacityType" + K8sTopologyRegionLabel = "topology.kubernetes.io/region" + K8sTopologyZoneLabel = "topology.kubernetes.io/zone" +) + +type PodMarketType string + +const ( + SpotMarketType PodMarketType = "spot" + OnDemandMarketType PodMarketType = "on-demand" ) type ServiceType string