From 4425be77f5435be1f59403befec8b26c3820cda0 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 31 Jul 2025 15:42:43 -0700 Subject: [PATCH 01/14] changed to support both v1 and v1a2 ip # Conflicts: # pkg/epp/controller/inferencemodel_reconciler.go # pkg/epp/datastore/datastore.go # pkg/epp/server/controller_manager.go # pkg/epp/server/runserver.go # Conflicts: # cmd/epp/runner/runner.go --- cmd/epp/runner/runner.go | 17 +++++++- pkg/common/convert.go | 28 +++++++++++++ pkg/common/kubemeta.go | 41 +++++++++++++++++++ .../controller/inferencepool_reconciler.go | 6 ++- 4 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 pkg/common/convert.go create mode 100644 pkg/common/kubemeta.go diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 7d92c7898b..40a1f1d0a4 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -22,9 +22,11 @@ import ( "errors" "flag" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" "net/http" "net/http/pprof" "os" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -82,6 +84,10 @@ var ( "pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") + poolGroup = flag.String( + "pool-group", + runserver.DefaultPoolGroup, + "group of the InferencePool this Endpoint Picker is associated with.") poolNamespace = flag.String( "pool-namespace", runserver.DefaultPoolNamespace, @@ -301,7 +307,15 @@ func (r *Runner) Run(ctx context.Context) error { Name: *poolName, Namespace: *poolNamespace, } - mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg, metricsServerOptions) + poolGroupKind := schema.GroupKind{ + Group: *poolGroup, + Kind: "InferencePool", + } + poolGKNN := common.GKNN{ + NamespacedName: poolNamespacedName, + GroupKind: poolGroupKind, + } + mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err @@ -344,6 +358,7 @@ func (r *Runner) Run(ctx context.Context) error { DestinationEndpointHintKey: *destinationEndpointHintKey, FairnessIDHeaderKey: *fairnessIDHeaderKey, PoolNamespacedName: poolNamespacedName, + PoolGKNN: poolGKNN, Datastore: datastore, SecureServing: *secureServing, HealthChecking: *healthChecking, diff --git a/pkg/common/convert.go b/pkg/common/convert.go new file mode 100644 index 0000000000..c8dffafb1a --- /dev/null +++ b/pkg/common/convert.go @@ -0,0 +1,28 @@ +package common + +import ( + "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" +) + +func ToUnstructured(obj any) (*unstructured.Unstructured, error) { + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, err + } + return &unstructured.Unstructured{Object: u}, nil +} + +var ToInferencePool = convert[v1.InferencePool] + +var ToXInferencePool = convert[v1.InferencePool] + +func convert[T any](u *unstructured.Unstructured) (*T, error) { + var res T + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &res); err != nil { + return nil, fmt.Errorf("error converting unstructured to T: %v", err) + } + return &res, nil +} diff --git a/pkg/common/kubemeta.go b/pkg/common/kubemeta.go new file mode 100644 index 0000000000..db2dfdfa53 --- /dev/null +++ b/pkg/common/kubemeta.go @@ -0,0 +1,41 @@ +// Package common defines structs for referring to fully qualified k8s resources. +package common + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "strings" +) + +// GKNN represents a fully qualified k8s resource. +type GKNN struct { + types.NamespacedName + schema.GroupKind +} + +// String implements Stringer. +func (g *GKNN) String() string { + return fmt.Sprintf("%s %s", g.GroupKind.String(), g.NamespacedName.String()) +} + +// Compare returns the comparison of a and b where less than, equal, and greater than return -1, 0, +// and 1 respectively. +func Compare(a, b GKNN) int { + if v := strings.Compare(a.Group, b.Group); v != 0 { + return v + } + if v := strings.Compare(a.Kind, b.Kind); v != 0 { + return v + } + if v := strings.Compare(a.Namespace, b.Namespace); v != 0 { + return v + } + return strings.Compare(a.Name, b.Name) +} + +// Less returns true if a is less than b. +func Less(a, b GKNN) bool { + return Compare(a, b) < 0 +} diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 94fc726c0d..325d7d33ed 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -18,6 +18,7 @@ package controller import ( "context" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" @@ -35,14 +36,16 @@ import ( type InferencePoolReconciler struct { client.Reader Datastore datastore.Datastore + PoolGKNN common.GKNN } func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) + c.PoolGKNN logger.Info("Reconciling InferencePool") - + // TODO: Change to unstructured infPool := &v1.InferencePool{} if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { @@ -68,6 +71,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques } func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + //TODO: use c.PoolGKNN to register wither v1 InferencePool or v1a2 InferencePool return ctrl.NewControllerManagedBy(mgr). For(&v1.InferencePool{}). Complete(c) From ce810eea2f03478783901c7157dcbabeb4248bf1 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 09:38:08 -0700 Subject: [PATCH 02/14] rebase with main --- .../inferenceobjective_reconciler.go | 9 +- .../inferenceobjective_reconciler_test.go | 89 ++++++++++--------- .../controller/inferencepool_reconciler.go | 1 - pkg/epp/server/controller_manager.go | 41 +++++---- pkg/epp/server/runserver.go | 37 +++++++- 5 files changed, 110 insertions(+), 67 deletions(-) diff --git a/pkg/epp/controller/inferenceobjective_reconciler.go b/pkg/epp/controller/inferenceobjective_reconciler.go index 8b029bcf2a..6e4f44a74a 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler.go +++ b/pkg/epp/controller/inferenceobjective_reconciler.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -35,8 +36,8 @@ import ( type InferenceObjectiveReconciler struct { client.Reader - Datastore datastore.Datastore - PoolNamespacedName types.NamespacedName + Datastore datastore.Datastore + PoolGKNN common.GKNN } func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -55,7 +56,7 @@ func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.R notFound = true } - if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolNamespacedName.Name) { + if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolGKNN.Name) { // InferenceObjective object got deleted or changed the referenced pool. err := c.handleObjectiveDeleted(ctx, req.NamespacedName) return ctrl.Result{}, err @@ -125,5 +126,5 @@ func (c *InferenceObjectiveReconciler) SetupWithManager(ctx context.Context, mgr } func (c *InferenceObjectiveReconciler) eventPredicate(infObjective *v1alpha2.InferenceObjective) bool { - return string(infObjective.Spec.PoolRef.Name) == c.PoolNamespacedName.Name + return string(infObjective.Spec.PoolRef.Name) == c.PoolGKNN.Name } diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 046cd1ecf0..79322deb32 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -18,6 +18,8 @@ package controller import ( "context" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "testing" "time" @@ -40,55 +42,55 @@ import ( var ( pool = utiltest.MakeInferencePool("test-pool1").Namespace("ns1").ObjRef() infObjective1 = utiltest.MakeInferenceObjective("model1"). - Namespace(pool.Namespace). - ModelName("fake model1"). - Criticality(v1alpha2.Standard). - CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model1"). + Criticality(v1alpha2.Standard). + CreationTimestamp(metav1.Unix(1000, 0)). + PoolName(pool.Name).ObjRef() infObjective1Pool2 = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(infObjective1.Namespace). - ModelName(infObjective1.Spec.ModelName). - Criticality(*infObjective1.Spec.Criticality). - CreationTimestamp(metav1.Unix(1001, 0)). - PoolName("test-pool2").ObjRef() + Namespace(infObjective1.Namespace). + ModelName(infObjective1.Spec.ModelName). + Criticality(*infObjective1.Spec.Criticality). + CreationTimestamp(metav1.Unix(1001, 0)). + PoolName("test-pool2").ObjRef() infObjective1NS2 = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace("ns2"). - ModelName(infObjective1.Spec.ModelName). - Criticality(*infObjective1.Spec.Criticality). - CreationTimestamp(metav1.Unix(1002, 0)). - PoolName(pool.Name).ObjRef() + Namespace("ns2"). + ModelName(infObjective1.Spec.ModelName). + Criticality(*infObjective1.Spec.Criticality). + CreationTimestamp(metav1.Unix(1002, 0)). + PoolName(pool.Name).ObjRef() infObjective1Critical = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(infObjective1.Namespace). - ModelName(infObjective1.Spec.ModelName). - Criticality(v1alpha2.Critical). - CreationTimestamp(metav1.Unix(1003, 0)). - PoolName(pool.Name).ObjRef() + Namespace(infObjective1.Namespace). + ModelName(infObjective1.Spec.ModelName). + Criticality(v1alpha2.Critical). + CreationTimestamp(metav1.Unix(1003, 0)). + PoolName(pool.Name).ObjRef() infObjective1Deleted = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(infObjective1.Namespace). - ModelName(infObjective1.Spec.ModelName). - CreationTimestamp(metav1.Unix(1004, 0)). - DeletionTimestamp(). - PoolName(pool.Name).ObjRef() + Namespace(infObjective1.Namespace). + ModelName(infObjective1.Spec.ModelName). + CreationTimestamp(metav1.Unix(1004, 0)). + DeletionTimestamp(). + PoolName(pool.Name).ObjRef() // Same ModelName, different object with newer creation timestamp infObjective1Newer = utiltest.MakeInferenceObjective("model1-newer"). - Namespace(pool.Namespace). - ModelName("fake model1"). - Criticality(v1alpha2.Standard). - CreationTimestamp(metav1.Unix(1005, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model1"). + Criticality(v1alpha2.Standard). + CreationTimestamp(metav1.Unix(1005, 0)). + PoolName(pool.Name).ObjRef() // Same ModelName, different object with older creation timestamp infObjective1Older = utiltest.MakeInferenceObjective("model1-older"). - Namespace(pool.Namespace). - ModelName("fake model1"). - Criticality(v1alpha2.Standard). - CreationTimestamp(metav1.Unix(999, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model1"). + Criticality(v1alpha2.Standard). + CreationTimestamp(metav1.Unix(999, 0)). + PoolName(pool.Name).ObjRef() infObjective2 = utiltest.MakeInferenceObjective("model2"). - Namespace(pool.Namespace). - ModelName("fake model2"). - CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model2"). + CreationTimestamp(metav1.Unix(1000, 0)). + PoolName(pool.Name).ObjRef() ) func TestInferenceObjectiveReconciler(t *testing.T) { @@ -203,9 +205,12 @@ func TestInferenceObjectiveReconciler(t *testing.T) { } _ = ds.PoolSet(context.Background(), fakeClient, pool) reconciler := &InferenceObjectiveReconciler{ - Reader: fakeClient, - Datastore: ds, - PoolNamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, + Reader: fakeClient, + Datastore: ds, + PoolGKNN: common.GKNN{ + NamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, + GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + }, } if test.incomingReq == nil { test.incomingReq = &types.NamespacedName{Name: test.objective.Name, Namespace: test.objective.Namespace} diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 325d7d33ed..ec40d02e62 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -42,7 +42,6 @@ type InferencePoolReconciler struct { func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) - c.PoolGKNN logger.Info("Reconciling InferencePool") // TODO: Change to unstructured diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 9be7ceb5c5..3aa790bf43 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -18,11 +18,11 @@ package server import ( "fmt" + "k8s.io/apimachinery/pkg/fields" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -45,39 +45,46 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(namespacedName types.NamespacedName, metricsServerOptions metricsserver.Options) ctrl.Options { - return ctrl.Options{ +func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) ctrl.Options { + opt := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ &corev1.Pod{}: { Namespaces: map[string]cache.Config{ - namespacedName.Namespace: {}, - }, - }, - &v1.InferencePool{}: { - Namespaces: map[string]cache.Config{ - namespacedName.Namespace: { - FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": namespacedName.Name, - }), - }, + gknn.Namespace: {}, }, }, &v1alpha2.InferenceObjective{}: { Namespaces: map[string]cache.Config{ - namespacedName.Namespace: {}, + gknn.Namespace: {}, }, }, }, }, Metrics: metricsServerOptions, } + if gknn.Group == v1alpha2.GroupName && gknn.Kind == "InferencePool" { + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + } + if gknn.Group == v1.GroupName && gknn.Kind == "InferencePool" { + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + } + + return opt } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(namespacedName types.NamespacedName, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { - manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(namespacedName, metricsServerOptions)) +func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(gknn, metricsServerOptions)) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 2cbcbec0ed..e1ac6c8f50 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -20,6 +20,8 @@ import ( "context" "crypto/tls" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -48,6 +50,7 @@ type ExtProcServerRunner struct { DestinationEndpointHintKey string FairnessIDHeaderKey string PoolNamespacedName types.NamespacedName + PoolGKNN common.GKNN Datastore datastore.Datastore SecureServing bool HealthChecking bool @@ -83,18 +86,45 @@ const ( DefaultCertPath = "" // default for --cert-path DefaultConfigFile = "" // default for --config-file DefaultConfigText = "" // default for --config-text + DefaultGrpcPort = 9002 // default for --grpc-port + DefaultGrpcHealthPort = 9003 // default for --grpc-health-port + DefaultMetricsPort = 9090 // default for --metrics-port + DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace + DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destination-endpoint-hint-key + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --pool-namespace + DefaultPoolGroup = "inference.networking.k8s.io" // default for --pool-group + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refresh-metrics-interval + DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refresh-prometheus-metrics-interval + DefaultSecureServing = true // default for --secure-serving + DefaultHealthChecking = false // default for --health-checking + DefaultEnablePprof = true // default for --enable-pprof + DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric + DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric + DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric + DefaultCertPath = "" // default for --cert-path + DefaultConfigFile = "" // default for --config-file + DefaultConfigText = "" // default for --config-text DefaultMetricsStalenessThreshold = 2 * time.Second ) // NewDefaultExtProcServerRunner creates a runner with default values. // Note: Dependencies like Datastore, Scheduler, SD need to be set separately. func NewDefaultExtProcServerRunner() *ExtProcServerRunner { + poolGKNN := common.GKNN{ + NamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + GroupKind: schema.GroupKind{ + Group: DefaultPoolGroup, + Kind: "InferencePool", + }, + } return &ExtProcServerRunner{ GrpcPort: DefaultGrpcPort, DestinationEndpointHintKey: DefaultDestinationEndpointHintKey, DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace, FairnessIDHeaderKey: DefaultFairnessIDHeaderKey, PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + PoolGKNN: poolGKNN, SecureServing: DefaultSecureServing, HealthChecking: DefaultHealthChecking, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, @@ -109,14 +139,15 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man if err := (&controller.InferencePoolReconciler{ Datastore: r.Datastore, Reader: mgr.GetClient(), + PoolGKNN: r.PoolGKNN, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) } if err := (&controller.InferenceObjectiveReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolNamespacedName: r.PoolNamespacedName, + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.PoolGKNN, }).SetupWithManager(ctx, mgr); err != nil { return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) } From 97fe365f053ffc1a8c3de010b5d556f26e105653 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 11:47:15 -0700 Subject: [PATCH 03/14] support both v1 and v1a2 IP --- pkg/common/convert.go | 5 +- .../inferenceobjective_reconciler_test.go | 78 ++++---- .../controller/inferencepool_reconciler.go | 84 ++++++-- .../inferencepool_reconciler_test.go | 180 +++++++++++++++++- pkg/epp/server/controller_manager.go | 20 +- pkg/epp/util/testing/wrappers.go | 57 ++++++ 6 files changed, 347 insertions(+), 77 deletions(-) diff --git a/pkg/common/convert.go b/pkg/common/convert.go index c8dffafb1a..0b4f64aa0e 100644 --- a/pkg/common/convert.go +++ b/pkg/common/convert.go @@ -1,10 +1,13 @@ +// Package common defines structs for referring to fully qualified k8s resources. package common import ( "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" ) func ToUnstructured(obj any) (*unstructured.Unstructured, error) { @@ -17,7 +20,7 @@ func ToUnstructured(obj any) (*unstructured.Unstructured, error) { var ToInferencePool = convert[v1.InferencePool] -var ToXInferencePool = convert[v1.InferencePool] +var ToXInferencePool = convert[v1alpha2.InferencePool] func convert[T any](u *unstructured.Unstructured) (*T, error) { var res T diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 79322deb32..33d44e3813 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -42,55 +42,55 @@ import ( var ( pool = utiltest.MakeInferencePool("test-pool1").Namespace("ns1").ObjRef() infObjective1 = utiltest.MakeInferenceObjective("model1"). - Namespace(pool.Namespace). - ModelName("fake model1"). - Criticality(v1alpha2.Standard). - CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model1"). + Criticality(v1alpha2.Standard). + CreationTimestamp(metav1.Unix(1000, 0)). + PoolName(pool.Name).ObjRef() infObjective1Pool2 = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(infObjective1.Namespace). - ModelName(infObjective1.Spec.ModelName). - Criticality(*infObjective1.Spec.Criticality). - CreationTimestamp(metav1.Unix(1001, 0)). - PoolName("test-pool2").ObjRef() + Namespace(infObjective1.Namespace). + ModelName(infObjective1.Spec.ModelName). + Criticality(*infObjective1.Spec.Criticality). + CreationTimestamp(metav1.Unix(1001, 0)). + PoolName("test-pool2").ObjRef() infObjective1NS2 = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace("ns2"). - ModelName(infObjective1.Spec.ModelName). - Criticality(*infObjective1.Spec.Criticality). - CreationTimestamp(metav1.Unix(1002, 0)). - PoolName(pool.Name).ObjRef() + Namespace("ns2"). + ModelName(infObjective1.Spec.ModelName). + Criticality(*infObjective1.Spec.Criticality). + CreationTimestamp(metav1.Unix(1002, 0)). + PoolName(pool.Name).ObjRef() infObjective1Critical = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(infObjective1.Namespace). - ModelName(infObjective1.Spec.ModelName). - Criticality(v1alpha2.Critical). - CreationTimestamp(metav1.Unix(1003, 0)). - PoolName(pool.Name).ObjRef() + Namespace(infObjective1.Namespace). + ModelName(infObjective1.Spec.ModelName). + Criticality(v1alpha2.Critical). + CreationTimestamp(metav1.Unix(1003, 0)). + PoolName(pool.Name).ObjRef() infObjective1Deleted = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(infObjective1.Namespace). - ModelName(infObjective1.Spec.ModelName). - CreationTimestamp(metav1.Unix(1004, 0)). - DeletionTimestamp(). - PoolName(pool.Name).ObjRef() + Namespace(infObjective1.Namespace). + ModelName(infObjective1.Spec.ModelName). + CreationTimestamp(metav1.Unix(1004, 0)). + DeletionTimestamp(). + PoolName(pool.Name).ObjRef() // Same ModelName, different object with newer creation timestamp infObjective1Newer = utiltest.MakeInferenceObjective("model1-newer"). - Namespace(pool.Namespace). - ModelName("fake model1"). - Criticality(v1alpha2.Standard). - CreationTimestamp(metav1.Unix(1005, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model1"). + Criticality(v1alpha2.Standard). + CreationTimestamp(metav1.Unix(1005, 0)). + PoolName(pool.Name).ObjRef() // Same ModelName, different object with older creation timestamp infObjective1Older = utiltest.MakeInferenceObjective("model1-older"). - Namespace(pool.Namespace). - ModelName("fake model1"). - Criticality(v1alpha2.Standard). - CreationTimestamp(metav1.Unix(999, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model1"). + Criticality(v1alpha2.Standard). + CreationTimestamp(metav1.Unix(999, 0)). + PoolName(pool.Name).ObjRef() infObjective2 = utiltest.MakeInferenceObjective("model2"). - Namespace(pool.Namespace). - ModelName("fake model2"). - CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name).ObjRef() + Namespace(pool.Namespace). + ModelName("fake model2"). + CreationTimestamp(metav1.Unix(1000, 0)). + PoolName(pool.Name).ObjRef() ) func TestInferenceObjectiveReconciler(t *testing.T) { diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index ec40d02e62..edf0e3de78 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -18,6 +18,8 @@ package controller import ( "context" + "fmt" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "k8s.io/apimachinery/pkg/api/errors" @@ -40,38 +42,78 @@ type InferencePoolReconciler struct { } func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithValues("inferencePool", req.NamespacedName).V(logutil.DEFAULT) + logger := log.FromContext(ctx).WithValues("inferencePool", c.PoolGKNN.Group, req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) - logger.Info("Reconciling InferencePool") - // TODO: Change to unstructured - infPool := &v1.InferencePool{} + logger.Info("Reconciling group %s InferencePool", c.PoolGKNN.Group) - if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { - if errors.IsNotFound(err) { - logger.Info("InferencePool not found. Clearing the datastore") + if c.PoolGKNN.Group == v1alpha2.GroupName { + infPool := &v1alpha2.InferencePool{} + if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { + if errors.IsNotFound(err) { + logger.Info("group %s InferencePool % s not found. Clearing the datastore", c.PoolGKNN.Group, req.NamespacedName) + c.Datastore.Clear() + return ctrl.Result{}, nil + } + logger.Error(err, "Unable to get InferencePool") + return ctrl.Result{}, err + } else if !infPool.DeletionTimestamp.IsZero() { + logger.Info("InferencePool is marked for deletion. Clearing the datastore") c.Datastore.Clear() return ctrl.Result{}, nil } - logger.Error(err, "Unable to get InferencePool") - return ctrl.Result{}, err - } else if !infPool.DeletionTimestamp.IsZero() { - logger.Info("InferencePool is marked for deletion. Clearing the datastore") - c.Datastore.Clear() - return ctrl.Result{}, nil + uns, err := common.ToUnstructured(infPool) + if err != nil { + logger.Error(err, "Failed to convert group % InferencePool to unstructured", c.PoolGKNN.Group) + } + v1infPool, err := common.ToInferencePool(uns) + if err != nil { + logger.Error(err, "Failed to convert unstructured to InferencePool") + return ctrl.Result{}, err + } + + // update pool in datastore + if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil { + logger.Error(err, "Failed to update datastore") + return ctrl.Result{}, err + } } - // update pool in datastore - if err := c.Datastore.PoolSet(ctx, c.Reader, infPool); err != nil { - logger.Error(err, "Failed to update datastore") - return ctrl.Result{}, err + if c.PoolGKNN.Group == v1.GroupName { + infPool := &v1.InferencePool{} + if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { + if errors.IsNotFound(err) { + logger.Info("InferencePool not found. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + logger.Error(err, "Unable to get InferencePool") + return ctrl.Result{}, err + } else if !infPool.DeletionTimestamp.IsZero() { + logger.Info("InferencePool is marked for deletion. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + // update pool in datastore + if err := c.Datastore.PoolSet(ctx, c.Reader, infPool); err != nil { + logger.Error(err, "Failed to update datastore") + return ctrl.Result{}, err + } } return ctrl.Result{}, nil } func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { - //TODO: use c.PoolGKNN to register wither v1 InferencePool or v1a2 InferencePool - return ctrl.NewControllerManagedBy(mgr). - For(&v1.InferencePool{}). - Complete(c) + switch c.PoolGKNN.Group { + case v1alpha2.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha2.InferencePool{}). + Complete(c) + case v1.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1.InferencePool{}). + Complete(c) + default: + return fmt.Errorf("unknown group %s", c.PoolGKNN.Group) + } } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index d4f7253b68..9870eca8ee 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -18,6 +18,8 @@ package controller import ( "context" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "testing" "time" @@ -41,12 +43,7 @@ import ( var ( selector_v1 = map[string]string{"app": "vllm_v1"} selector_v2 = map[string]string{"app": "vllm_v2"} - pool1 = utiltest.MakeInferencePool("pool1"). - Namespace("pool1-ns"). - Selector(selector_v1). - TargetPortNumber(8080).ObjRef() - pool2 = utiltest.MakeInferencePool("pool2").Namespace("pool2-ns").ObjRef() - pods = []*corev1.Pod{ + pods = []*corev1.Pod{ // Two ready pods matching pool1 utiltest.MakePod("pod1"). Namespace("pool1-ns"). @@ -75,14 +72,23 @@ var ( func TestInferencePoolReconciler(t *testing.T) { // The best practice is to use table-driven tests, however in this scaenario it seems // more logical to do a single test with steps that depend on each other. + gvk := schema.GroupVersionKind{ + Group: v1.GroupVersion.Group, + Version: v1.GroupVersion.Version, + Kind: "InferencePool", + } + pool1 := utiltest.MakeInferencePool("pool1"). + GVK(gvk). + Namespace("pool1-ns"). + Selector(selector_v1). + TargetPortNumber(8080).ObjRef() + pool2 := utiltest.MakeInferencePool("pool2").GVK(gvk).Namespace("pool2-ns").ObjRef() // Set up the scheme. scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) _ = v1alpha2.Install(scheme) _ = v1.Install(scheme) - - // Create a fake client with the pool and the pods. initialObjects := []client.Object{pool1, pool2} for i := range pods { initialObjects = append(initialObjects, pods[i]) @@ -94,12 +100,19 @@ func TestInferencePoolReconciler(t *testing.T) { // Create a request for the existing resource. namespacedName := types.NamespacedName{Name: pool1.Name, Namespace: pool1.Namespace} + gknn := common.GKNN{ + NamespacedName: namespacedName, + GroupKind: schema.GroupKind{ + Group: pool1.GroupVersionKind().Group, + Kind: pool1.GroupVersionKind().Kind, + }, + } req := ctrl.Request{NamespacedName: namespacedName} ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) datastore := datastore.NewDatastore(ctx, pmf) - inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore} + inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { @@ -191,3 +204,152 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string { } return "" } + +// Duplicate it as it is just a temporary code +// "inference.networking.x-k8s.io" InferencePool will get removed in the near future. +func TestXInferencePoolReconciler(t *testing.T) { + // The best practice is to use table-driven tests, however in this scaenario it seems + // more logical to do a single test with steps that depend on each other. + gvk := schema.GroupVersionKind{ + Group: v1alpha2.GroupVersion.Group, + Version: v1alpha2.GroupVersion.Version, + Kind: "InferencePool", + } + pool1 := utiltest.MakeXInferencePool("pool1"). + GVK(gvk). + Namespace("pool1-ns"). + Selector(selector_v1). + TargetPortNumber(8080).ObjRef() + pool2 := utiltest.MakeXInferencePool("pool2").GVK(gvk).Namespace("pool2-ns").ObjRef() + + // Set up the scheme. + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = v1alpha2.Install(scheme) + _ = v1.Install(scheme) + initialObjects := []client.Object{pool1, pool2} + for i := range pods { + initialObjects = append(initialObjects, pods[i]) + } + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initialObjects...). + Build() + + // Create a request for the existing resource. + namespacedName := types.NamespacedName{Name: pool1.Name, Namespace: pool1.Namespace} + gknn := common.GKNN{ + NamespacedName: namespacedName, + GroupKind: schema.GroupKind{ + Group: pool1.GroupVersionKind().Group, + Kind: pool1.GroupVersionKind().Kind, + }, + } + req := ctrl.Request{NamespacedName: namespacedName} + ctx := context.Background() + + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) + datastore := datastore.NewDatastore(ctx, pmf) + inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} + + // Step 1: Inception, only ready pods matching pool1 are added to the store. + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPool: pool1, wantPods: []string{"pod1", "pod2"}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } + + newPool1 := &v1alpha2.InferencePool{} + if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { + t.Errorf("Unexpected pool get error: %v", err) + } + newPool1.Spec.Selector = map[v1alpha2.LabelKey]v1alpha2.LabelValue{"app": "vllm_v2"} + if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { + t.Errorf("Unexpected pool update error: %v", err) + } + + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5"}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } + + // Step 3: update the pool port + if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { + t.Errorf("Unexpected pool get error: %v", err) + } + newPool1.Spec.TargetPortNumber = 9090 + if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { + t.Errorf("Unexpected pool update error: %v", err) + } + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5"}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } + + // Step 4: delete the pool to trigger a datastore clear + if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { + t.Errorf("Unexpected pool get error: %v", err) + } + if err := fakeClient.Delete(ctx, newPool1, &client.DeleteOptions{}); err != nil { + t.Errorf("Unexpected pool delete error: %v", err) + } + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPods: []string{}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } +} + +type xDiffStoreParams struct { + wantPool *v1alpha2.InferencePool + wantPods []string + wantObjectives []*v1alpha2.InferenceObjective +} + +func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStoreParams) string { + gotPool, _ := datastore.PoolGet() + if gotPool == nil && params.wantPool == nil { + return "" + } + uns, err := common.ToUnstructured(gotPool) + if err != nil { + t.Fatalf("failed to convert XInferencePool to Unstructured: %v", err) + } + gotXPool, err := common.ToXInferencePool(uns) + if err != nil { + t.Fatalf("failed to convert unstructured to InferencePool: %v", err) + } + if diff := cmp.Diff(params.wantPool, gotXPool); diff != "" { + return "pool:" + diff + } + + // Default wantPods if not set because PodGetAll returns an empty slice when empty. + if params.wantPods == nil { + params.wantPods = []string{} + } + gotPods := []string{} + for _, pm := range datastore.PodGetAll() { + gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + } + if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + return "pods:" + diff + } + + // Default wantModels if not set because ModelGetAll returns an empty slice when empty. + if params.wantObjectives == nil { + params.wantObjectives = []*v1alpha2.InferenceObjective{} + } + + if diff := cmp.Diff(params.wantObjectives, datastore.ObjectiveGetAll(), cmpopts.SortSlices(func(a, b *v1alpha2.InferenceObjective) bool { + return a.Name < b.Name + })); diff != "" { + return "models:" + diff + } + return "" +} diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 3aa790bf43..7430f8c547 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -45,7 +45,7 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) ctrl.Options { +func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { opt := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -64,27 +64,33 @@ func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver. }, Metrics: metricsServerOptions, } - if gknn.Group == v1alpha2.GroupName && gknn.Kind == "InferencePool" { + switch gknn.Group { + case v1alpha2.GroupName: opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ "metadata.name": gknn.Name, })}}, } - } - if gknn.Group == v1.GroupName && gknn.Kind == "InferencePool" { - opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + case v1.GroupName: + opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ "metadata.name": gknn.Name, })}}, } + default: + return ctrl.Options{}, fmt.Errorf("unknown group: %s", gknn.Group) } - return opt + return opt, nil } // NewDefaultManager creates a new controller manager with default configuration. func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { - manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(gknn, metricsServerOptions)) + opt, err := defaultManagerOptions(gknn, metricsServerOptions) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager options: %v", err) + } + manager, err := ctrl.NewManager(restConfig, opt) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index 7905e1f684..706ee26f8f 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -19,6 +19,7 @@ package testing import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -191,6 +192,11 @@ func (m *InferencePoolWrapper) Namespace(ns string) *InferencePoolWrapper { return m } +func (m *InferencePoolWrapper) GVK(gvk schema.GroupVersionKind) *InferencePoolWrapper { + m.TypeMeta.SetGroupVersionKind(gvk) + return m +} + func (m *InferencePoolWrapper) Selector(selector map[string]string) *InferencePoolWrapper { s := make(map[v1.LabelKey]v1.LabelValue) for k, v := range selector { @@ -214,3 +220,54 @@ func (m *InferencePoolWrapper) ExtensionRef(name string) *InferencePoolWrapper { func (m *InferencePoolWrapper) ObjRef() *v1.InferencePool { return &m.InferencePool } + +// InferencePoolWrapper wraps an InferencePool. +type XInferencePoolWrapper struct { + v1alpha2.InferencePool +} + +// MakeXInferencePool creates a wrapper for a InferencePool. +func MakeXInferencePool(name string) *XInferencePoolWrapper { + return &XInferencePoolWrapper{ + v1alpha2.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha2.InferencePoolSpec{}, + }, + } +} + +func (m *XInferencePoolWrapper) Namespace(ns string) *XInferencePoolWrapper { + m.ObjectMeta.Namespace = ns + return m +} + +func (m *XInferencePoolWrapper) GVK(gvk schema.GroupVersionKind) *XInferencePoolWrapper { + m.TypeMeta.SetGroupVersionKind(gvk) + return m +} + +func (m *XInferencePoolWrapper) Selector(selector map[string]string) *XInferencePoolWrapper { + s := make(map[v1alpha2.LabelKey]v1alpha2.LabelValue) + for k, v := range selector { + s[v1alpha2.LabelKey(k)] = v1alpha2.LabelValue(v) + } + m.Spec.Selector = s + return m +} + +func (m *XInferencePoolWrapper) TargetPortNumber(p int32) *XInferencePoolWrapper { + m.Spec.TargetPortNumber = p + return m +} + +func (m *XInferencePoolWrapper) ExtensionRef(name string) *XInferencePoolWrapper { + m.Spec.ExtensionRef = &v1alpha2.Extension{ExtensionReference: v1alpha2.ExtensionReference{Name: v1alpha2.ObjectName(name)}} + return m +} + +// Obj returns the wrapped InferencePool. +func (m *XInferencePoolWrapper) ObjRef() *v1alpha2.InferencePool { + return &m.InferencePool +} From 899e22ee2cec3556472c62681e011bf3dd07f79f Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 11:49:48 -0700 Subject: [PATCH 04/14] change import order --- cmd/epp/runner/runner.go | 3 ++- pkg/common/kubemeta.go | 3 ++- pkg/epp/controller/inferenceobjective_reconciler_test.go | 5 +++-- pkg/epp/controller/inferencepool_reconciler.go | 1 + pkg/epp/controller/inferencepool_reconciler_test.go | 5 +++-- pkg/epp/server/controller_manager.go | 1 + pkg/epp/server/runserver.go | 3 ++- 7 files changed, 14 insertions(+), 7 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 40a1f1d0a4..1b576131dc 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -22,10 +22,11 @@ import ( "errors" "flag" "fmt" - "k8s.io/apimachinery/pkg/runtime/schema" "net/http" "net/http/pprof" "os" + + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "github.com/go-logr/logr" diff --git a/pkg/common/kubemeta.go b/pkg/common/kubemeta.go index db2dfdfa53..67985bc6d9 100644 --- a/pkg/common/kubemeta.go +++ b/pkg/common/kubemeta.go @@ -4,9 +4,10 @@ package common import ( "fmt" + "strings" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "strings" ) // GKNN represents a fully qualified k8s resource. diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 33d44e3813..14b1902752 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -18,11 +18,12 @@ package controller import ( "context" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "testing" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index edf0e3de78..e38a7e7e4a 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 9870eca8ee..576bbe626b 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -18,11 +18,12 @@ package controller import ( "context" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "testing" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 7430f8c547..d0f4ab79dc 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -18,6 +18,7 @@ package server import ( "fmt" + "k8s.io/apimachinery/pkg/fields" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index e1ac6c8f50..7d853f3de2 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -20,9 +20,10 @@ import ( "context" "crypto/tls" "fmt" + "time" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/go-logr/logr" From ddf7ffd09b9c03538dd2ce5d105e34a748639627 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 11:54:41 -0700 Subject: [PATCH 05/14] fixed imports --- cmd/epp/runner/runner.go | 5 ++--- pkg/common/kubemeta.go | 1 - pkg/epp/controller/inferenceobjective_reconciler.go | 3 +-- pkg/epp/controller/inferenceobjective_reconciler_test.go | 5 ++--- pkg/epp/controller/inferencepool_reconciler.go | 5 ++--- pkg/epp/controller/inferencepool_reconciler_test.go | 6 ++---- 6 files changed, 9 insertions(+), 16 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 1b576131dc..d586dc982c 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -26,15 +26,13 @@ import ( "net/http/pprof" "os" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" uberzap "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log" @@ -42,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" diff --git a/pkg/common/kubemeta.go b/pkg/common/kubemeta.go index 67985bc6d9..ec3099a852 100644 --- a/pkg/common/kubemeta.go +++ b/pkg/common/kubemeta.go @@ -3,7 +3,6 @@ package common import ( "fmt" - "strings" "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/pkg/epp/controller/inferenceobjective_reconciler.go b/pkg/epp/controller/inferenceobjective_reconciler.go index 6e4f44a74a..666fd74161 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler.go +++ b/pkg/epp/controller/inferenceobjective_reconciler.go @@ -27,9 +27,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 14b1902752..8747d4d801 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -21,17 +21,16 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index e38a7e7e4a..ed499c209b 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -20,13 +20,12 @@ import ( "context" "fmt" - "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 576bbe626b..360b28e9fa 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -21,21 +21,19 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" From 1164c2f4f6a4231fb69eae607f6b00a21e48b03b Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 12:03:48 -0700 Subject: [PATCH 06/14] fixed pipeline --- Dockerfile | 1 + test/integration/epp/hermetic_test.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 21a629cbf5..d2fd2300b1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,7 @@ RUN go mod download # Sources COPY cmd/epp ./cmd/epp +COPY pkg/common ./pkg/common COPY pkg/epp ./pkg/epp COPY internal ./internal COPY apix ./apix diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index ba17b64a2c..107a7d5d17 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -24,8 +24,10 @@ import ( "errors" "fmt" "io" + "k8s.io/apimachinery/pkg/runtime/schema" "os" "path/filepath" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "strings" "testing" "time" @@ -1086,7 +1088,10 @@ func BeforeSuite() func() { serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond, time.Second*2) // Adjust from defaults - serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace} + serverRunner.PoolGKNN = common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName}, + GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, + } serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) kvCacheUtilizationScorer := scorer.NewKVCacheUtilizationScorer() From 6e1d5078849795a3d8c7d96a2f687c9b417fea12 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 12:16:41 -0700 Subject: [PATCH 07/14] fixed comments Signed-off-by: Xiyue Yu --- pkg/epp/util/testing/wrappers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index 706ee26f8f..549957d80f 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -170,7 +170,7 @@ func (m *InferenceObjectiveWrapper) CreationTimestamp(t metav1.Time) *InferenceO return m } -// InferencePoolWrapper wraps an InferencePool. +// InferencePoolWrapper wraps an group "inference.networking.k8s.io" InferencePool. type InferencePoolWrapper struct { v1.InferencePool } @@ -221,7 +221,7 @@ func (m *InferencePoolWrapper) ObjRef() *v1.InferencePool { return &m.InferencePool } -// InferencePoolWrapper wraps an InferencePool. +// XInferencePoolWrapper wraps an group "inference.networking.x-k8s.io" InferencePool. type XInferencePoolWrapper struct { v1alpha2.InferencePool } From 47cc62893a47b5d6f783d15c302181d2e47f27ca Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 12:24:22 -0700 Subject: [PATCH 08/14] fixed merge failure --- pkg/epp/controller/inferencepool_reconciler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 360b28e9fa..680e219f37 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -247,7 +247,7 @@ func TestXInferencePoolReconciler(t *testing.T) { req := ctrl.Request{NamespacedName: namespacedName} ctx := context.Background() - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) datastore := datastore.NewDatastore(ctx, pmf) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} @@ -333,7 +333,7 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa params.wantPods = []string{} } gotPods := []string{} - for _, pm := range datastore.PodGetAll() { + for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) { gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { From 1ff3cbe1c3adabbe062a04a1d90e4ea263cdd86c Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 12:53:17 -0700 Subject: [PATCH 09/14] fixed missing arguments Signed-off-by: Xiyue Yu --- cmd/epp/runner/runner.go | 2 +- pkg/common/convert.go | 1 + pkg/epp/controller/inferenceobjective_reconciler.go | 1 + .../controller/inferenceobjective_reconciler_test.go | 2 +- pkg/epp/controller/inferencepool_reconciler.go | 10 +++++----- pkg/epp/controller/inferencepool_reconciler_test.go | 1 + pkg/epp/server/controller_manager.go | 5 ++--- pkg/epp/server/runserver.go | 5 ++--- test/integration/epp/hermetic_test.go | 4 ++-- 9 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index d586dc982c..edcb7ff8e9 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -40,9 +40,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" diff --git a/pkg/common/convert.go b/pkg/common/convert.go index 0b4f64aa0e..3541610342 100644 --- a/pkg/common/convert.go +++ b/pkg/common/convert.go @@ -6,6 +6,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" ) diff --git a/pkg/epp/controller/inferenceobjective_reconciler.go b/pkg/epp/controller/inferenceobjective_reconciler.go index 666fd74161..bdeb039320 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler.go +++ b/pkg/epp/controller/inferenceobjective_reconciler.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 8747d4d801..39ce5b462e 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -30,10 +30,10 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index ed499c209b..66d4d09719 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -24,10 +24,10 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -42,7 +42,7 @@ type InferencePoolReconciler struct { } func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithValues("inferencePool", c.PoolGKNN.Group, req.NamespacedName).V(logutil.DEFAULT) + logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) logger.Info("Reconciling group %s InferencePool", c.PoolGKNN.Group) @@ -64,11 +64,11 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques } uns, err := common.ToUnstructured(infPool) if err != nil { - logger.Error(err, "Failed to convert group % InferencePool to unstructured", c.PoolGKNN.Group) + logger.Error(err, "Failed to convert inferencePool to unstructured") } v1infPool, err := common.ToInferencePool(uns) if err != nil { - logger.Error(err, "Failed to convert unstructured to InferencePool") + logger.Error(err, "Failed to convert unstructured to inferencePool") return ctrl.Result{}, err } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 680e219f37..d9d675a5fa 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -31,6 +31,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index d0f4ab79dc..05038f7c0d 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -19,10 +19,8 @@ package server import ( "fmt" - "k8s.io/apimachinery/pkg/fields" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -35,6 +33,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" ) var scheme = runtime.NewScheme() diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 7d853f3de2..b5c7fef983 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -22,21 +22,20 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/go-logr/logr" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" tlsutil "sigs.k8s.io/gateway-api-inference-extension/internal/tls" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 107a7d5d17..859a147163 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -24,10 +24,8 @@ import ( "errors" "fmt" "io" - "k8s.io/apimachinery/pkg/runtime/schema" "os" "path/filepath" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "strings" "testing" "time" @@ -44,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" k8syaml "k8s.io/apimachinery/pkg/util/yaml" @@ -61,6 +60,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" From 2b3e66dab86b6ca9c445d01f770f4e113519a135 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 12:57:51 -0700 Subject: [PATCH 10/14] fixed boilplate --- pkg/common/convert.go | 16 ++++++++++++++++ pkg/common/kubemeta.go | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pkg/common/convert.go b/pkg/common/convert.go index 3541610342..bb67155984 100644 --- a/pkg/common/convert.go +++ b/pkg/common/convert.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + // Package common defines structs for referring to fully qualified k8s resources. package common diff --git a/pkg/common/kubemeta.go b/pkg/common/kubemeta.go index ec3099a852..5da69e1be8 100644 --- a/pkg/common/kubemeta.go +++ b/pkg/common/kubemeta.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + // Package common defines structs for referring to fully qualified k8s resources. package common From 8c90f07288089b40028035e307650f0765087d14 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 1 Aug 2025 13:10:48 -0700 Subject: [PATCH 11/14] pass verify --- pkg/epp/controller/inferencepool_reconciler.go | 2 +- pkg/epp/controller/inferencepool_reconciler_test.go | 10 ++++++---- pkg/epp/util/testing/wrappers.go | 12 ------------ 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 66d4d09719..9c967bdd50 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -45,7 +45,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) - logger.Info("Reconciling group %s InferencePool", c.PoolGKNN.Group) + logger.Info("Reconciling InferencePool", "group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName) if c.PoolGKNN.Group == v1alpha2.GroupName { infPool := &v1alpha2.InferencePool{} diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index d9d675a5fa..a29d531b99 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -78,11 +78,12 @@ func TestInferencePoolReconciler(t *testing.T) { Kind: "InferencePool", } pool1 := utiltest.MakeInferencePool("pool1"). - GVK(gvk). Namespace("pool1-ns"). Selector(selector_v1). TargetPortNumber(8080).ObjRef() - pool2 := utiltest.MakeInferencePool("pool2").GVK(gvk).Namespace("pool2-ns").ObjRef() + pool1.SetGroupVersionKind(gvk) + pool2 := utiltest.MakeInferencePool("pool2").Namespace("pool2-ns").ObjRef() + pool2.SetGroupVersionKind(gvk) // Set up the scheme. scheme := runtime.NewScheme() @@ -216,11 +217,12 @@ func TestXInferencePoolReconciler(t *testing.T) { Kind: "InferencePool", } pool1 := utiltest.MakeXInferencePool("pool1"). - GVK(gvk). Namespace("pool1-ns"). Selector(selector_v1). TargetPortNumber(8080).ObjRef() - pool2 := utiltest.MakeXInferencePool("pool2").GVK(gvk).Namespace("pool2-ns").ObjRef() + pool2 := utiltest.MakeXInferencePool("pool2").Namespace("pool2-ns").ObjRef() + pool1.SetGroupVersionKind(gvk) + pool2.SetGroupVersionKind(gvk) // Set up the scheme. scheme := runtime.NewScheme() diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index 549957d80f..84a6015e57 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -19,8 +19,6 @@ package testing import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" ) @@ -192,11 +190,6 @@ func (m *InferencePoolWrapper) Namespace(ns string) *InferencePoolWrapper { return m } -func (m *InferencePoolWrapper) GVK(gvk schema.GroupVersionKind) *InferencePoolWrapper { - m.TypeMeta.SetGroupVersionKind(gvk) - return m -} - func (m *InferencePoolWrapper) Selector(selector map[string]string) *InferencePoolWrapper { s := make(map[v1.LabelKey]v1.LabelValue) for k, v := range selector { @@ -243,11 +236,6 @@ func (m *XInferencePoolWrapper) Namespace(ns string) *XInferencePoolWrapper { return m } -func (m *XInferencePoolWrapper) GVK(gvk schema.GroupVersionKind) *XInferencePoolWrapper { - m.TypeMeta.SetGroupVersionKind(gvk) - return m -} - func (m *XInferencePoolWrapper) Selector(selector map[string]string) *XInferencePoolWrapper { s := make(map[v1alpha2.LabelKey]v1alpha2.LabelValue) for k, v := range selector { From 3c84a17821b8444a59ecbd13dff6ba84962b3b28 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Mon, 4 Aug 2025 09:44:47 -0700 Subject: [PATCH 12/14] rebase main --- pkg/epp/server/runserver.go | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index b5c7fef983..629af48c8a 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -86,25 +86,7 @@ const ( DefaultCertPath = "" // default for --cert-path DefaultConfigFile = "" // default for --config-file DefaultConfigText = "" // default for --config-text - DefaultGrpcPort = 9002 // default for --grpc-port - DefaultGrpcHealthPort = 9003 // default for --grpc-health-port - DefaultMetricsPort = 9090 // default for --metrics-port - DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace - DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destination-endpoint-hint-key - DefaultPoolName = "" // required but no default - DefaultPoolNamespace = "default" // default for --pool-namespace - DefaultPoolGroup = "inference.networking.k8s.io" // default for --pool-group - DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refresh-metrics-interval - DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refresh-prometheus-metrics-interval - DefaultSecureServing = true // default for --secure-serving - DefaultHealthChecking = false // default for --health-checking - DefaultEnablePprof = true // default for --enable-pprof - DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric - DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric - DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric - DefaultCertPath = "" // default for --cert-path - DefaultConfigFile = "" // default for --config-file - DefaultConfigText = "" // default for --config-text + DefaultPoolGroup = "inference.networking.k8s.io" // default for --pool-group DefaultMetricsStalenessThreshold = 2 * time.Second ) From 0fe9223795985b249146d55ca5f7d4b7ec5e0c84 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Tue, 5 Aug 2025 12:43:51 -0700 Subject: [PATCH 13/14] changed to avoid duplicate code --- .../controller/inferencepool_reconciler.go | 93 ++++++++++--------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 9c967bdd50..08020ea0e0 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -21,6 +21,7 @@ import ( "fmt" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -45,59 +46,67 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) - logger.Info("Reconciling InferencePool", "group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName) - - if c.PoolGKNN.Group == v1alpha2.GroupName { - infPool := &v1alpha2.InferencePool{} - if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { - if errors.IsNotFound(err) { - logger.Info("group %s InferencePool % s not found. Clearing the datastore", c.PoolGKNN.Group, req.NamespacedName) - c.Datastore.Clear() - return ctrl.Result{}, nil - } - logger.Error(err, "Unable to get InferencePool") - return ctrl.Result{}, err - } else if !infPool.DeletionTimestamp.IsZero() { - logger.Info("InferencePool is marked for deletion. Clearing the datastore") + logger.Info("Reconciling InferencePool") + + // 1. Initialize a generic client.Object based on the group. + var obj client.Object + switch c.PoolGKNN.Group { + case v1.GroupName: + obj = &v1.InferencePool{} + case v1alpha2.GroupName: + obj = &v1alpha2.InferencePool{} + default: + // Handle unsupported groups gracefully. + err := fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) + logger.Error(err, "Cannot reconcile InferencePool") + return ctrl.Result{}, err + } + + // 2. Perform a single, generic fetch for the object. + if err := c.Get(ctx, req.NamespacedName, obj); err != nil { + if errors.IsNotFound(err) { + logger.Info("InferencePool not found. Clearing the datastore") c.Datastore.Clear() return ctrl.Result{}, nil } - uns, err := common.ToUnstructured(infPool) + logger.Error(err, "Unable to get InferencePool") + return ctrl.Result{}, err + } + + // 3. Perform common checks using the client.Object interface. + if !obj.GetDeletionTimestamp().IsZero() { + logger.Info("InferencePool is marked for deletion. Clearing the datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + + // 4. Convert the fetched object to the canonical v1.InferencePool. + var v1infPool *v1.InferencePool + + switch pool := obj.(type) { + case *v1.InferencePool: + // If it's already a v1 object, just use it. + v1infPool = pool + case *v1alpha2.InferencePool: + // If it's a v1alpha2 object, convert it to v1. + var uns *unstructured.Unstructured + uns, err := common.ToUnstructured(pool) if err != nil { logger.Error(err, "Failed to convert inferencePool to unstructured") + return ctrl.Result{}, err } - v1infPool, err := common.ToInferencePool(uns) + v1infPool, err = common.ToInferencePool(uns) if err != nil { logger.Error(err, "Failed to convert unstructured to inferencePool") return ctrl.Result{}, err } - - // update pool in datastore - if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil { - logger.Error(err, "Failed to update datastore") - return ctrl.Result{}, err - } + default: + return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) } - if c.PoolGKNN.Group == v1.GroupName { - infPool := &v1.InferencePool{} - if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { - if errors.IsNotFound(err) { - logger.Info("InferencePool not found. Clearing the datastore") - c.Datastore.Clear() - return ctrl.Result{}, nil - } - logger.Error(err, "Unable to get InferencePool") - return ctrl.Result{}, err - } else if !infPool.DeletionTimestamp.IsZero() { - logger.Info("InferencePool is marked for deletion. Clearing the datastore") - c.Datastore.Clear() - return ctrl.Result{}, nil - } - // update pool in datastore - if err := c.Datastore.PoolSet(ctx, c.Reader, infPool); err != nil { - logger.Error(err, "Failed to update datastore") - return ctrl.Result{}, err - } + + if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil { + logger.Error(err, "Failed to update datastore") + return ctrl.Result{}, err } return ctrl.Result{}, nil From d823af9008f3ebbb4620ff81b2c694af32edaf63 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Tue, 5 Aug 2025 12:51:11 -0700 Subject: [PATCH 14/14] changed logger info --- pkg/epp/controller/inferencepool_reconciler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 08020ea0e0..1d3c0aa9e7 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -46,7 +46,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) - logger.Info("Reconciling InferencePool") + logger.Info("Reconciling InferencePool", "group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName) // 1. Initialize a generic client.Object based on the group. var obj client.Object