Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apiserver/deploy/local/e2e/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ images:
- name: kuberay/apiserver
newName: quay.io/kuberay/apiserver
newTag: latest
# Replace NodePort with ClusterIP as we do not need to receive requests from outside the Kubernetes cluster
Copy link
Contributor Author

@kenchung285 kenchung285 Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatically modified by make start-local-apiserver-e2e

patches:
- patch: |-
- op: replace
Expand All @@ -28,7 +29,6 @@ patches:
kind: Deployment
name: kuberay-apiserver
version: v1
# Replace NodePort with ClusterIP as we do not need to receive requests from outside the Kubernetes cluster
- patch: |-
- op: replace
path: /spec/type
Expand Down
32 changes: 12 additions & 20 deletions apiserver/pkg/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,14 @@ import (
"io"
"net/http"
"strconv"
"time"

rpcStatus "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/encoding/protojson"

apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
apiserversdkutil "github.com/ray-project/kuberay/apiserversdk/util"
api "github.com/ray-project/kuberay/proto/go_client"
)

type RetryConfig struct {
MaxRetry int
BackoffFactor float64
InitBackoff time.Duration
MaxBackoff time.Duration
OverallTimeout time.Duration
}

type KuberayAPIServerClient struct {
httpClient *http.Client
marshaler *protojson.MarshalOptions
Expand All @@ -36,7 +27,7 @@ type KuberayAPIServerClient struct {
// Store http request handling function for unit test purpose.
executeHttpRequest func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)
baseURL string
retryCfg RetryConfig
retryCfg apiserversdkutil.RetryConfig
}

type KuberayAPIServerClientError struct {
Expand All @@ -57,7 +48,7 @@ func IsNotFoundError(err error) bool {
return false
}

func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client, retryCfg RetryConfig) *KuberayAPIServerClient {
func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client, retryCfg apiserversdkutil.RetryConfig) *KuberayAPIServerClient {
client := &KuberayAPIServerClient{
httpClient: httpClient,
baseURL: baseURL,
Expand Down Expand Up @@ -704,7 +695,7 @@ func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL
break
}

if apiserverutil.IsSuccessfulStatusCode(statusCode) {
if apiserversdkutil.IsSuccessfulStatusCode(statusCode) {
return bodyBytes, nil, nil
}

Expand All @@ -721,21 +712,22 @@ func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL
HTTPStatusCode: statusCode,
}

if !apiserverutil.IsRetryableHTTPStatusCodes(statusCode) {
if !apiserversdkutil.IsRetryableHTTPStatusCodes(statusCode) {
break
}

// Backoff before retry
sleep := apiserverutil.GetRetryBackoff(attempt,
sleep := apiserversdkutil.GetRetryBackoff(attempt,
krc.retryCfg.InitBackoff,
krc.retryCfg.BackoffFactor,
krc.retryCfg.MaxBackoff)

select {
case <-time.After(sleep):
// continue to the next retry after backoff
case <-ctx.Done():
return nil, lastStatus, fmt.Errorf("overall timeout reached: %w", ctx.Err())
if ok := apiserversdkutil.CheckContextDeadline(ctx, sleep); !ok {
return nil, lastStatus, fmt.Errorf("retry timeout exceeded context deadline")
}

if err = apiserversdkutil.Sleep(ctx, sleep); err != nil {
return nil, lastStatus, fmt.Errorf("retry canceled during backoff: %w", err)
}

}
Expand Down
12 changes: 6 additions & 6 deletions apiserver/pkg/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *mockTransport) RoundTrip(_ *http.Request) (*http.Response, error) {
}

func TestUnmarshalHttpResponseOK(t *testing.T) {
retryCfg := RetryConfig{
retryCfg := util.RetryConfig{
MaxRetry: util.HTTPClientDefaultMaxRetry,
BackoffFactor: util.HTTPClientDefaultBackoffBase,
InitBackoff: util.HTTPClientDefaultInitBackoff,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestUnmarshalHttpResponseOK(t *testing.T) {

// Unmarshal response fails and check error returned.
func TestUnmarshalHttpResponseFails(t *testing.T) {
retryCfg := RetryConfig{
retryCfg := util.RetryConfig{
MaxRetry: util.HTTPClientDefaultMaxRetry,
BackoffFactor: util.HTTPClientDefaultBackoffBase,
InitBackoff: util.HTTPClientDefaultInitBackoff,
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestAPIServerClientRetry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mockClient := &http.Client{Transport: tt.transport}

retryCfg := RetryConfig{
retryCfg := util.RetryConfig{
MaxRetry: tt.maxRetry,
BackoffFactor: util.HTTPClientDefaultBackoffBase,
InitBackoff: util.HTTPClientDefaultInitBackoff,
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestAPIServerClientBackoff(t *testing.T) {

mockClient := &http.Client{Transport: mockTransport}

retryCfg := RetryConfig{
retryCfg := util.RetryConfig{
MaxRetry: util.HTTPClientDefaultMaxRetry,
BackoffFactor: util.HTTPClientDefaultBackoffBase,
// Set short backoff time
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestAPIServerClientOverallTimeout(t *testing.T) {

mockClient := &http.Client{Transport: mockTransport}

retryCfg := RetryConfig{
retryCfg := util.RetryConfig{
MaxRetry: util.HTTPClientDefaultMaxRetry,
BackoffFactor: util.HTTPClientDefaultBackoffBase,
InitBackoff: 1 * time.Millisecond,
Expand All @@ -322,5 +322,5 @@ func TestAPIServerClientOverallTimeout(t *testing.T) {

// Expect a timeout error
require.Error(t, err)
require.Contains(t, err.Error(), "timeout")
require.Contains(t, err.Error(), "retry timeout exceeded context deadline")
}
2 changes: 1 addition & 1 deletion apiserver/test/e2e/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func withHttpClient() contextOption {
return func(_ *testing.T, testingContext *End2EndTestingContext) error {
testingContext.apiServerHttpClient = &http.Client{Timeout: time.Duration(10) * time.Second}

retryCfg := kuberayHTTP.RetryConfig{
retryCfg := util.RetryConfig{
MaxRetry: util.HTTPClientDefaultMaxRetry,
BackoffFactor: util.HTTPClientDefaultBackoffBase,
InitBackoff: util.HTTPClientDefaultInitBackoff,
Expand Down
60 changes: 29 additions & 31 deletions apiserversdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package apiserversdk

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
apiserversdkutil "github.com/ray-project/kuberay/apiserversdk/util"
rayutil "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

Expand Down Expand Up @@ -95,30 +95,33 @@ func requireKubeRayService(handler http.Handler, k8sClient *kubernetes.Clientset
// retryRoundTripper is a custom implementation of http.RoundTripper that retries HTTP requests.
// It verifies retryable HTTP status codes and retries using exponential backoff.
type retryRoundTripper struct {
base http.RoundTripper

// Num of retries after the initial attempt
maxRetries int

// Retry backoff settings
initBackoff time.Duration
backoffBase float64
maxBackoff time.Duration
base http.RoundTripper
retryCfg apiserversdkutil.RetryConfig
}

func newRetryRoundTripper(base http.RoundTripper) http.RoundTripper {
retryCfg := apiserversdkutil.RetryConfig{
MaxRetry: apiserversdkutil.HTTPClientDefaultMaxRetry,
BackoffFactor: apiserversdkutil.HTTPClientDefaultBackoffBase,
InitBackoff: apiserversdkutil.HTTPClientDefaultInitBackoff,
MaxBackoff: apiserversdkutil.HTTPClientDefaultMaxBackoff,
OverallTimeout: apiserversdkutil.HTTPClientDefaultOverallTimeout,
}

return &retryRoundTripper{
base: base,
maxRetries: apiserverutil.HTTPClientDefaultMaxRetry,
initBackoff: apiserverutil.HTTPClientDefaultInitBackoff,
backoffBase: apiserverutil.HTTPClientDefaultBackoffBase,
maxBackoff: apiserverutil.HTTPClientDefaultMaxBackoff,
base: base,
retryCfg: retryCfg,
}
}

func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()

ctx, cancel := context.WithTimeout(ctx, rrt.retryCfg.OverallTimeout)
defer cancel()

req = req.WithContext(ctx)

var bodyBytes []byte
var resp *http.Response
var err error
Expand All @@ -135,8 +138,8 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
}
}

for attempt := 0; attempt <= rrt.maxRetries; attempt++ {
/* Try up to (rrt.maxRetries + 1) times: initial attempt + retries */
for attempt := 0; attempt <= rrt.retryCfg.MaxRetry; attempt++ {
/* Try up to (rrt.retryCfg.MaxRetry + 1) times: initial attempt + retries */

if bodyBytes != nil {
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
Expand All @@ -147,15 +150,15 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
return resp, fmt.Errorf("request to %s %s failed with error: %w", req.Method, req.URL.String(), err)
}

if apiserverutil.IsSuccessfulStatusCode(resp.StatusCode) {
if apiserversdkutil.IsSuccessfulStatusCode(resp.StatusCode) {
return resp, nil
}

if !apiserverutil.IsRetryableHTTPStatusCodes(resp.StatusCode) {
if !apiserversdkutil.IsRetryableHTTPStatusCodes(resp.StatusCode) {
return resp, nil
}

if attempt == rrt.maxRetries {
if attempt == rrt.retryCfg.MaxRetry {
return resp, nil
}

Expand All @@ -169,20 +172,15 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
}
}

sleepDuration := apiserverutil.GetRetryBackoff(attempt, rrt.initBackoff, rrt.backoffBase, rrt.maxBackoff)
sleepDuration := apiserversdkutil.GetRetryBackoff(attempt, rrt.retryCfg.InitBackoff, rrt.retryCfg.BackoffFactor, rrt.retryCfg.MaxBackoff)

// TODO: merge common utils for apiserver v1 and v2
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if sleepDuration > remaining {
return resp, fmt.Errorf("retry timeout exceeded context deadline")
}
if ok := apiserversdkutil.CheckContextDeadline(ctx, sleepDuration); !ok {
return resp, fmt.Errorf("retry timeout exceeded context deadline")
}

select {
case <-time.After(sleepDuration):
case <-ctx.Done():
return resp, fmt.Errorf("retry canceled during backoff: %w", ctx.Err())
if err = apiserversdkutil.Sleep(ctx, sleepDuration); err != nil {
return resp, fmt.Errorf("retry canceled during backoff: %w", err)
}
}
return resp, err
Expand Down
8 changes: 8 additions & 0 deletions apiserversdk/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,11 @@ const (
// Overall timeout for retries
HTTPClientDefaultOverallTimeout = 30 * time.Second
)

type RetryConfig struct {
MaxRetry int
BackoffFactor float64
InitBackoff time.Duration
MaxBackoff time.Duration
OverallTimeout time.Duration
}
20 changes: 20 additions & 0 deletions apiserversdk/util/http.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package util

import (
"context"
"math"
"net/http"
"time"
)

func Sleep(ctx context.Context, sleepDuration time.Duration) error {
select {
case <-time.After(sleepDuration):
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func CheckContextDeadline(ctx context.Context, sleepDuration time.Duration) bool {
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if sleepDuration > remaining {
return false
}
}
return true
}

func GetRetryBackoff(attempt int, initBackoff time.Duration, backoffBase float64, maxBackoff time.Duration) time.Duration {
sleepDuration := initBackoff * time.Duration(math.Pow(backoffBase, float64(attempt)))
if sleepDuration > maxBackoff {
Expand Down
Loading