Skip to content
2 changes: 1 addition & 1 deletion apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct {
// Create RayJobSubmissionServiceServer
func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer {
zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel)
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)}
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil)}
}

// Submit Ray job
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package v1alpha1

import (
cmap "github.com/orcaman/concurrent-map/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -86,7 +88,8 @@ type Configuration struct {
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
jobInfoMap := cmap.New[*utiltypes.RayJobCache]()
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, &jobInfoMap)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
Expand Down
32 changes: 24 additions & 8 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)

Expand All @@ -41,9 +42,8 @@ const (
// RayJobReconciler reconciles a RayJob object
type RayJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

Scheme *runtime.Scheme
Recorder record.EventRecorder
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
options RayJobReconcilerOptions
}
Expand Down Expand Up @@ -275,10 +275,24 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
var jobInfo *utiltypes.RayJobInfo
jobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId)
if jobCache != nil {
if jobCache.Err != nil {
if errors.IsBadRequest(jobCache.Err) && isSubmitterFinished {
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
rayJobInstance.Status.Reason = rayv1.AppFailed
rayJobInstance.Status.Message = "Submitter completed but Ray job not found in RayCluster."
break
}
logger.Error(jobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", jobCache.Err)
rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, jobCache.Err
}
jobInfo = jobCache.JobInfo
} else {
// Cache miss: try a direct fetch to disambiguate not-found vs. transient
jobInfo, err = rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if errors.IsBadRequest(err) {
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode {
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
Expand All @@ -295,11 +309,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}
}
}

rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
if jobInfo == nil {
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

// If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job
// to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob
// as "Complete" or "Failed" to avoid unnecessary reconciliation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"

cmap "github.com/orcaman/concurrent-map/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -27,28 +28,36 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}])
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error)
GetMultiApplicationStatus(context.Context) (map[string]*utiltypes.ServeApplicationStatus, error)
GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error)
AsyncGetJobInfo(ctx context.Context, jobId string)
ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error)
SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error)
SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error)
GetJobLog(ctx context.Context, jobName string) (*string, error)
StopJob(ctx context.Context, jobName string) error
DeleteJob(ctx context.Context, jobName string) error
GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache
}

type RayDashboardClient struct {
client *http.Client
dashboardURL string
client *http.Client
workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]
workerPool *WorkerPool
jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]
dashboardURL string
}

func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) {
func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]) {
r.client = client
r.dashboardURL = dashboardURL
r.workerPool = workerPool
r.jobInfoMap = jobInfoMap
r.workerPoolChannelContent = workerPoolChannelContent
}

// UpdateDeployments update the deployments in the Ray cluster.
Expand Down Expand Up @@ -171,6 +180,25 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti
return &jobInfo, nil
}

func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) {
if _, ok := r.workerPoolChannelContent.Get(jobId); ok {
return
}
r.workerPoolChannelContent.Set(jobId, struct{}{})
r.workerPool.taskQueue <- func() {
defer r.workerPoolChannelContent.Remove(jobId)
jobInfo, err := r.GetJobInfo(ctx, jobId)
if err != nil {
err = fmt.Errorf("failed to get job info: %w", err)
r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: nil, Err: err})
return
}
if jobInfo != nil {
r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: jobInfo, Err: nil})
}
}
}
Comment on lines 183 to 200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually an edge case.
Let's assume

  1. RayJob finalizer deletes the jobID item from jobInfoMap and workerPool.channelContent
  2. the background go routine pool retrieves jobID from r.workerPool.taskQueue
  3. query job info using jobID from 2
  4. store result from 3 in jobInfoMap

In this case, we shouldn't store the result.
However, it's hard to handle this edge case, and the data we store will be near 100 bytes, is it ok not to handle this?
(Let's do the calculation, let's say we have 100,000 RayJob CR, the most stale cache we can produce will be 10MB (100 bytes *100000)

I think the solution to handle this edge case is using another backgroud go routine to list all rayjob CR, and check is there any additional key in jobInfoMap, and delete them

cc @rueian @andrewsykim

need your two's advice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is not hard to avoid. We just need to put a placeholder into the map and only update the map if the placeholder exists.

Copy link
Collaborator

@rueian rueian Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we also need to clear the jobInfoMap before each job retry and deletion.


func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath, nil)
if err != nil {
Expand Down Expand Up @@ -221,6 +249,7 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype
}

req.Header.Set("Content-Type", "application/json")

resp, err := r.client.Do(req)
if err != nil {
return
Expand Down Expand Up @@ -333,6 +362,13 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro
return nil
}

func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache {
if jobInfo, ok := r.jobInfoMap.Get(jobId); ok {
return jobInfo
}
return nil
}

func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*utiltypes.RayJobRequest, error) {
req := &utiltypes.RayJobRequest{
Entrypoint: rayJob.Spec.Entrypoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dashboardclient

import (
"sync"
)

type WorkerPool struct {
taskQueue chan func()
stopChan chan struct{}
wg sync.WaitGroup
workers int
}

func NewWorkerPool(taskQueue chan func()) *WorkerPool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewWorkerPool(taskQueue chan func()) *WorkerPool {
func NewWorkerPool(workers int) *WorkerPool {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a task queue channel is weird. Specifying a worker count is more understandable. You can also make a buffered channel based on the worker count internally.

wp := &WorkerPool{
taskQueue: taskQueue,
workers: 10,
stopChan: make(chan struct{}),
}

// Start workers immediately
wp.start()
return wp
}

// Start launches worker goroutines to consume from queue
func (wp *WorkerPool) start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker()
}
}

// worker consumes and executes tasks from the queue
func (wp *WorkerPool) worker() {
defer wp.wg.Done()

for {
select {
case <-wp.stopChan:
return
case task := <-wp.taskQueue:
if task != nil {
task() // Execute the job
}
}
}
}
11 changes: 10 additions & 1 deletion ray-operator/controllers/ray/utils/fake_serve_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"sync/atomic"

cmap "github.com/orcaman/concurrent-map/v2"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
Expand All @@ -19,7 +21,7 @@ type FakeRayDashboardClient struct {

var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) {
func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], _ *cmap.ConcurrentMap[string, struct{}]) {
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error {
Expand All @@ -46,6 +48,9 @@ func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (
return &utiltypes.RayJobInfo{JobStatus: rayv1.JobStatusRunning}, nil
}

func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) {
}

func (r *FakeRayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
if mock := r.GetJobInfoMock.Load(); mock != nil {
info, err := (*mock)(ctx, "job_id")
Expand Down Expand Up @@ -77,3 +82,7 @@ func (r *FakeRayDashboardClient) StopJob(_ context.Context, _ string) (err error
func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error {
return nil
}

func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobCache {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ type RayJobStopResponse struct {
type RayJobLogsResponse struct {
Logs string `json:"logs,omitempty"`
}

type RayJobCache struct {
JobInfo *RayJobInfo
Err error
}
12 changes: 10 additions & 2 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
"unicode"

cmap "github.com/orcaman/concurrent-map/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -27,6 +28,7 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

const (
Expand Down Expand Up @@ -758,7 +760,10 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
return headServiceURL, nil
}

func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
taskQueue := make(chan func())
workerPool := dashboardclient.NewWorkerPool(taskQueue)
workerPoolChannelContent := cmap.New[struct{}]()
return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
dashboardClient := &dashboardclient.RayDashboardClient{}
if useKubernetesProxy {
Expand All @@ -777,13 +782,16 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
// configured to communicate with the Kubernetes API server.
mgr.GetHTTPClient(),
fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
workerPool,
jobInfoMap,
&workerPoolChannelContent,
)
return dashboardClient, nil
}

dashboardClient.InitClient(&http.Client{
Timeout: 2 * time.Second,
}, "http://"+url)
}, "http://"+url, workerPool, jobInfoMap, &workerPoolChannelContent)
return dashboardClient, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/rayjob-submitter/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
}
rayDashboardClient := &dashboardclient.RayDashboardClient{}
address = rayjobsubmitter.JobSubmissionURL(address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil, nil)
submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req)
if err != nil {
if strings.Contains(err.Error(), "Please use a different submission_id") {
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/e2erayjob/rayjob_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestRayJobRetry(t *testing.T) {
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)

// Ensure JobDeploymentStatus transit to Failed
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
// Ensure JobStatus is empty
g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg

g.Expect(err).ToNot(HaveOccurred())
url := fmt.Sprintf("127.0.0.1:%d", localPort)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil)
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
g.Expect(err).ToNot(HaveOccurred())
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
Expand Down
Loading