Skip to content

Commit

Permalink
Adding capability to create ray cluster with serve support -clean (#1672
Browse files Browse the repository at this point in the history
)
  • Loading branch information
blublinsky authored Nov 22, 2023
1 parent 5c0e2e9 commit 6e4ac23
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 63 deletions.
97 changes: 97 additions & 0 deletions apiserver/CreatingServe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Creating a cluster with RayServe support

Up until rescently the only way to create a Ray cluster supporting RayServe was by using `Create ray service` APIs. Although it does work, quite often you want to create cluster supporting Ray serve so that you can experiment with serve APIs directly. Now it is possible by adding the following annotation to the cluster:

```json
"annotations" : {
"ray.io/enableAgentService": "true"
},
```

the complete curl command to creation such cluster is as follows:

```shell
curl -X POST 'localhost:31888/apis/v1/namespaces/default/clusters' \
--header 'Content-Type: application/json' \
--data '{
"name": "test-cluster",
"namespace": "default",
"user": "boris",
"annotations" : {
"ray.io/enableAgentService": "true"
},
"clusterSpec": {
"headGroupSpec": {
"computeTemplate": "default-template",
"image": "rayproject/ray:2.7.0-py310",
"serviceType": "ClusterIP",
"rayStartParams": {
"dashboard-host": "0.0.0.0",
"metrics-export-port": "8080",
"dashboard-agent-listen-port": "52365"
},
"volumes": [
{
"name": "code-sample",
"mountPath": "/home/ray/samples",
"volumeType": "CONFIGMAP",
"source": "ray-job-code-sample",
"items": {"sample_code.py" : "sample_code.py"}
}
]
},
"workerGroupSpec": [
{
"groupName": "small-wg",
"computeTemplate": "default-template",
"image": "rayproject/ray:2.7.0-py310",
"replicas": 1,
"minReplicas": 0,
"maxReplicas": 5,
"rayStartParams": {
"node-ip-address": "$MY_POD_IP"
},
"volumes": [
{
"name": "code-sample",
"mountPath": "/home/ray/samples",
"volumeType": "CONFIGMAP",
"source": "ray-job-code-sample",
"items": {"sample_code.py" : "sample_code.py"}
}
]
}
]
}
}'
```

Note, that before creating a cluster you need to install this [configmap](test/job/code.yaml) and create default template using the following command:

```shell
curl -X POST 'localhost:31888/apis/v1/namespaces/default/compute_templates' \
--header 'Content-Type: application/json' \
--data '{
"name": "default-template",
"namespace": "default",
"cpu": 2,
"memory": 4
}'
```

To confirm that the cluster is created correctly, check created services using that following command:

```shell
kubectl get service
```

that should return the following:

```shell
test-cluster-head-svc ClusterIP 10.96.19.185 <none> 8265/TCP,52365/TCP,10001/TCP,8080/TCP,6379/TCP,8000/TCP
test-cluster-serve-svc ClusterIP 10.96.144.162 <none> 8000/TCP
```

As you can see, in this case two services are created - one for the head node to be able to see the dashboard and configure the cluster and one for submission of the serve requests.

For the head node service note that the additional port - 52365 is created for serve configuration.
20 changes: 16 additions & 4 deletions apiserver/pkg/util/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ type RayCluster struct {
// NewRayCluster creates a RayCluster.
// func NewRayCluster(apiCluster *api.Cluster, clusterRuntime *api.ClusterRuntime, computeRuntime *api.ComputeRuntime) *RayCluster {
func NewRayCluster(apiCluster *api.Cluster, computeTemplateMap map[string]*api.ComputeTemplate) (*RayCluster, error) {
// Check for "ray.io/enableAgentService"
enableAgentService := false
if enableAgentServiceValue, exist := apiCluster.Annotations["ray.io/enableAgentService"]; exist && enableAgentServiceValue == "true" {
enableAgentService = true
}

// Build cluster spec
spec, err := buildRayClusterSpec(apiCluster.Version, apiCluster.Envs, apiCluster.ClusterSpec, computeTemplateMap)
spec, err := buildRayClusterSpec(apiCluster.Version, apiCluster.Envs, apiCluster.ClusterSpec, computeTemplateMap, enableAgentService)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -64,9 +70,9 @@ func buildRayClusterAnnotations(cluster *api.Cluster) map[string]string {

// TODO(Basasuya & MissionToMars): The job spec depends on ClusterSpec which not all cluster-related configs are included,
// such as `metadata` and `envs`. We just put `imageVersion` and `envs` in the arguments list, and should be refactored later.
func buildRayClusterSpec(imageVersion string, envs *api.EnvironmentVariables, clusterSpec *api.ClusterSpec, computeTemplateMap map[string]*api.ComputeTemplate) (*rayv1api.RayClusterSpec, error) {
func buildRayClusterSpec(imageVersion string, envs *api.EnvironmentVariables, clusterSpec *api.ClusterSpec, computeTemplateMap map[string]*api.ComputeTemplate, enableAgentService bool) (*rayv1api.RayClusterSpec, error) {
computeTemplate := computeTemplateMap[clusterSpec.HeadGroupSpec.ComputeTemplate]
headPodTemplate, err := buildHeadPodTemplate(imageVersion, envs, clusterSpec.HeadGroupSpec, computeTemplate)
headPodTemplate, err := buildHeadPodTemplate(imageVersion, envs, clusterSpec.HeadGroupSpec, computeTemplate, enableAgentService)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -126,7 +132,7 @@ func buildNodeGroupAnnotations(computeTemplate *api.ComputeTemplate, image strin
}

// Build head node template
func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, spec *api.HeadGroupSpec, computeRuntime *api.ComputeTemplate) (*v1.PodTemplateSpec, error) {
func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, spec *api.HeadGroupSpec, computeRuntime *api.ComputeTemplate, enableAgentService bool) (*v1.PodTemplateSpec, error) {
image := constructRayImage(RayClusterDefaultImageRepository, imageVersion)
if len(spec.Image) != 0 {
image = spec.Image
Expand Down Expand Up @@ -224,6 +230,12 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
container.Env = append(container.Env, specEnv...)
}

// If enableAgentService add port
if enableAgentService {
container.Ports = append(container.Ports, v1.ContainerPort{Name: "dashboard-agent", ContainerPort: 52365})
container.Ports = append(container.Ports, v1.ContainerPort{Name: "serve", ContainerPort: 8000})
}

// Replace container
podTemplateSpec.Spec.Containers[index] = container
}
Expand Down
11 changes: 10 additions & 1 deletion apiserver/pkg/util/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func TestBuildVolumeMounts(t *testing.T) {
}

func TestBuildHeadPodTemplate(t *testing.T) {
podSpec, err := buildHeadPodTemplate("2.4", &api.EnvironmentVariables{}, &headGroup, &template)
podSpec, err := buildHeadPodTemplate("2.4", &api.EnvironmentVariables{}, &headGroup, &template, false)
assert.Nil(t, err)

if podSpec.Spec.ServiceAccountName != "account" {
Expand All @@ -453,6 +453,9 @@ func TestBuildHeadPodTemplate(t *testing.T) {
if len(podSpec.Spec.Containers[0].Env) != 6 {
t.Errorf("failed to propagate environment")
}
if len(podSpec.Spec.Containers[0].Ports) != 4 {
t.Errorf("failed build ports")
}
// Sort values for comparison
sort.SliceStable(podSpec.Spec.Containers[0].Env, func(i, j int) bool {
return podSpec.Spec.Containers[0].Env[i].Name < podSpec.Spec.Containers[0].Env[j].Name
Expand All @@ -478,6 +481,12 @@ func TestBuildHeadPodTemplate(t *testing.T) {
if !reflect.DeepEqual(podSpec.Labels, expectedLabels) {
t.Errorf("failed to convert labels, got %v, expected %v", podSpec.Labels, expectedLabels)
}

podSpec, err = buildHeadPodTemplate("2.4", &api.EnvironmentVariables{}, &headGroup, &template, true)
assert.Nil(t, err)
if len(podSpec.Spec.Containers[0].Ports) != 6 {
t.Errorf("failed build ports")
}
}

func TestBuildRayCluster(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/util/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewRayJob(apiJob *api.RayJob, computeTemplateMap map[string]*api.ComputeTem
},
}
if apiJob.ClusterSpec != nil {
clusterSpec, err := buildRayClusterSpec(rayJobDefaultVersion, nil, apiJob.ClusterSpec, computeTemplateMap)
clusterSpec, err := buildRayClusterSpec(rayJobDefaultVersion, nil, apiJob.ClusterSpec, computeTemplateMap, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/util/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func buildRayServiceSpec(apiService *api.RayService, computeTemplateMap map[stri
return nil, errors.New("two serve configuration are defined, only one is allowed")
}
// generate Ray cluster spec and buid cluster
newRayClusterSpec, err := buildRayClusterSpec(rayServiceDefaultVersion, nil, apiService.ClusterSpec, computeTemplateMap)
newRayClusterSpec, err := buildRayClusterSpec(rayServiceDefaultVersion, nil, apiService.ClusterSpec, computeTemplateMap, true)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ func initHealthProbe(probe *v1.Probe, rayNodeType rayv1.RayNodeType) {
}

// BuildPod a pod config
func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayv1.RayNodeType, rayStartParams map[string]string, headPort string, enableRayAutoscaler *bool, creator string, fqdnRayIP string) (aPod v1.Pod) {
func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayv1.RayNodeType, rayStartParams map[string]string, headPort string, enableRayAutoscaler *bool, creator string, fqdnRayIP string, enableAgentService bool) (aPod v1.Pod) {
if enableAgentService {
podTemplateSpec.Labels[RayClusterServingServiceLabelKey] = EnableRayClusterServingServiceTrue
}
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand Down
30 changes: 20 additions & 10 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestBuildPod(t *testing.T) {
// Test head pod
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayv1.HeadNode) + DashSymbol + utils.FormatInt32(0))
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "", false)

// Check environment variables
rayContainer := pod.Spec.Containers[RayContainerIndex]
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestBuildPod(t *testing.T) {
podName = cluster.Name + DashSymbol + string(rayv1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)
pod = BuildPod(podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP, false)

// Check environment variables
rayContainer = pod.Spec.Containers[RayContainerIndex]
Expand All @@ -400,14 +400,24 @@ func TestBuildPod(t *testing.T) {
// Check Envs
rayContainer = pod.Spec.Containers[RayContainerIndex]
checkContainerEnv(t, rayContainer, "TEST_ENV_NAME", "TEST_ENV_VALUE")

// Try to build pod for serve
pod = BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "", true)
val, ok := pod.Labels[RayClusterServingServiceLabelKey]
if !ok {
t.Error("Expected serve label is not present")
}
if val != EnableRayClusterServingServiceTrue {
t.Error("Wrong serve label value")
}
}

func TestBuildPod_WithAutoscalerEnabled(t *testing.T) {
cluster := instance.DeepCopy()
cluster.Spec.EnableInTreeAutoscaling = &trueFlag
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayv1.HeadNode) + DashSymbol + utils.FormatInt32(0))
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, "", "")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, "", "", false)

actualResult := pod.Labels[RayClusterLabelKey]
expectedResult := cluster.Name
Expand Down Expand Up @@ -462,7 +472,7 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) {
cluster.Spec.EnableInTreeAutoscaling = &trueFlag
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayv1.HeadNode) + DashSymbol + utils.FormatInt32(0))
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, RayServiceCreatorLabelValue, "")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, RayServiceCreatorLabelValue, "", false)

hasCorrectDeathEnv := false
for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -493,7 +503,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
// Build a head Pod.
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayv1.HeadNode) + DashSymbol + utils.FormatInt32(0))
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "", false)

// Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
rayContainer := pod.Spec.Containers[RayContainerIndex]
Expand All @@ -511,7 +521,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[RayContainerIndex].Env = append(cluster.Spec.HeadGroupSpec.Template.Spec.Containers[RayContainerIndex].Env,
v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "60"})
podTemplateSpec = DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod = BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "")
pod = BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "", false)
rayContainer = pod.Spec.Containers[RayContainerIndex]

// Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
Expand All @@ -528,7 +538,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
podName = cluster.Name + DashSymbol + string(rayv1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
fqdnRayIP := utils.GenerateFQDNServiceName(*cluster, cluster.Namespace)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)
pod = BuildPod(podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP, false)

// Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
rayContainer = pod.Spec.Containers[RayContainerIndex]
Expand All @@ -545,7 +555,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "120"})
worker = cluster.Spec.WorkerGroupSpecs[0]
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)
pod = BuildPod(podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP, false)

// Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
rayContainer = pod.Spec.Containers[RayContainerIndex]
Expand Down Expand Up @@ -614,7 +624,7 @@ func TestBuildPodWithAutoscalerOptions(t *testing.T) {
SecurityContext: &customSecurityContext,
}
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, "", "")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, "", "", false)
expectedContainer := *autoscalerContainer.DeepCopy()
expectedContainer.Image = customAutoscalerImage
expectedContainer.ImagePullPolicy = customPullPolicy
Expand Down Expand Up @@ -783,7 +793,7 @@ func TestCleanupInvalidVolumeMounts(t *testing.T) {
// Test head pod
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayv1.HeadNode) + DashSymbol + utils.FormatInt32(0))
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "")
pod := BuildPod(podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "", false)

pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, []v1.VolumeMount{
{
Expand Down
Loading

0 comments on commit 6e4ac23

Please sign in to comment.