diff --git a/Dockerfile b/Dockerfile index 21a629cbf5..d2fd2300b1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,7 @@ RUN go mod download # Sources COPY cmd/epp ./cmd/epp +COPY pkg/common ./pkg/common COPY pkg/epp ./pkg/epp COPY internal ./internal COPY apix ./apix diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 7d92c7898b..edcb7ff8e9 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log" @@ -41,6 +42,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -82,6 +84,10 @@ var ( "pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") + poolGroup = flag.String( + "pool-group", + runserver.DefaultPoolGroup, + "group of the InferencePool this Endpoint Picker is associated with.") poolNamespace = flag.String( "pool-namespace", runserver.DefaultPoolNamespace, @@ -301,7 +307,15 @@ func (r *Runner) Run(ctx context.Context) error { Name: *poolName, Namespace: *poolNamespace, } - mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg, metricsServerOptions) + poolGroupKind := schema.GroupKind{ + Group: *poolGroup, + Kind: "InferencePool", + } + poolGKNN := common.GKNN{ + NamespacedName: poolNamespacedName, + GroupKind: poolGroupKind, + } + mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err @@ -344,6 +358,7 @@ func (r *Runner) Run(ctx context.Context) error { DestinationEndpointHintKey: *destinationEndpointHintKey, FairnessIDHeaderKey: *fairnessIDHeaderKey, PoolNamespacedName: poolNamespacedName, + PoolGKNN: poolGKNN, Datastore: datastore, SecureServing: *secureServing, HealthChecking: *healthChecking, diff --git a/pkg/common/convert.go b/pkg/common/convert.go new file mode 100644 index 0000000000..bb67155984 --- /dev/null +++ b/pkg/common/convert.go @@ -0,0 +1,48 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package common defines structs for referring to fully qualified k8s resources. +package common + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" +) + +func ToUnstructured(obj any) (*unstructured.Unstructured, error) { + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, err + } + return &unstructured.Unstructured{Object: u}, nil +} + +var ToInferencePool = convert[v1.InferencePool] + +var ToXInferencePool = convert[v1alpha2.InferencePool] + +func convert[T any](u *unstructured.Unstructured) (*T, error) { + var res T + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &res); err != nil { + return nil, fmt.Errorf("error converting unstructured to T: %v", err) + } + return &res, nil +} diff --git a/pkg/common/kubemeta.go b/pkg/common/kubemeta.go new file mode 100644 index 0000000000..5da69e1be8 --- /dev/null +++ b/pkg/common/kubemeta.go @@ -0,0 +1,57 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package common defines structs for referring to fully qualified k8s resources. +package common + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" +) + +// GKNN represents a fully qualified k8s resource. +type GKNN struct { + types.NamespacedName + schema.GroupKind +} + +// String implements Stringer. +func (g *GKNN) String() string { + return fmt.Sprintf("%s %s", g.GroupKind.String(), g.NamespacedName.String()) +} + +// Compare returns the comparison of a and b where less than, equal, and greater than return -1, 0, +// and 1 respectively. +func Compare(a, b GKNN) int { + if v := strings.Compare(a.Group, b.Group); v != 0 { + return v + } + if v := strings.Compare(a.Kind, b.Kind); v != 0 { + return v + } + if v := strings.Compare(a.Namespace, b.Namespace); v != 0 { + return v + } + return strings.Compare(a.Name, b.Name) +} + +// Less returns true if a is less than b. +func Less(a, b GKNN) bool { + return Compare(a, b) < 0 +} diff --git a/pkg/epp/controller/inferenceobjective_reconciler.go b/pkg/epp/controller/inferenceobjective_reconciler.go index 8b029bcf2a..bdeb039320 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler.go +++ b/pkg/epp/controller/inferenceobjective_reconciler.go @@ -29,14 +29,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) type InferenceObjectiveReconciler struct { client.Reader - Datastore datastore.Datastore - PoolNamespacedName types.NamespacedName + Datastore datastore.Datastore + PoolGKNN common.GKNN } func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -55,7 +56,7 @@ func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.R notFound = true } - if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolNamespacedName.Name) { + if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolGKNN.Name) { // InferenceObjective object got deleted or changed the referenced pool. err := c.handleObjectiveDeleted(ctx, req.NamespacedName) return ctrl.Result{}, err @@ -125,5 +126,5 @@ func (c *InferenceObjectiveReconciler) SetupWithManager(ctx context.Context, mgr } func (c *InferenceObjectiveReconciler) eventPredicate(infObjective *v1alpha2.InferenceObjective) bool { - return string(infObjective.Spec.PoolRef.Name) == c.PoolNamespacedName.Name + return string(infObjective.Spec.PoolRef.Name) == c.PoolGKNN.Name } diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 046cd1ecf0..39ce5b462e 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -24,6 +24,7 @@ import ( "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -32,6 +33,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" @@ -203,9 +205,12 @@ func TestInferenceObjectiveReconciler(t *testing.T) { } _ = ds.PoolSet(context.Background(), fakeClient, pool) reconciler := &InferenceObjectiveReconciler{ - Reader: fakeClient, - Datastore: ds, - PoolNamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, + Reader: fakeClient, + Datastore: ds, + PoolGKNN: common.GKNN{ + NamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, + GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + }, } if test.incomingReq == nil { test.incomingReq = &types.NamespacedName{Name: test.objective.Name, Namespace: test.objective.Namespace} diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 94fc726c0d..1d3c0aa9e7 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -18,13 +18,17 @@ package controller import ( "context" + "fmt" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -35,17 +39,31 @@ import ( type InferencePoolReconciler struct { client.Reader Datastore datastore.Datastore + PoolGKNN common.GKNN } func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithValues("inferencePool", req.NamespacedName).V(logutil.DEFAULT) + logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName).V(logutil.DEFAULT) ctx = ctrl.LoggerInto(ctx, logger) - logger.Info("Reconciling InferencePool") + logger.Info("Reconciling InferencePool", "group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName) - infPool := &v1.InferencePool{} + // 1. Initialize a generic client.Object based on the group. + var obj client.Object + switch c.PoolGKNN.Group { + case v1.GroupName: + obj = &v1.InferencePool{} + case v1alpha2.GroupName: + obj = &v1alpha2.InferencePool{} + default: + // Handle unsupported groups gracefully. + err := fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) + logger.Error(err, "Cannot reconcile InferencePool") + return ctrl.Result{}, err + } - if err := c.Get(ctx, req.NamespacedName, infPool); err != nil { + // 2. Perform a single, generic fetch for the object. + if err := c.Get(ctx, req.NamespacedName, obj); err != nil { if errors.IsNotFound(err) { logger.Info("InferencePool not found. Clearing the datastore") c.Datastore.Clear() @@ -53,13 +71,40 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques } logger.Error(err, "Unable to get InferencePool") return ctrl.Result{}, err - } else if !infPool.DeletionTimestamp.IsZero() { + } + + // 3. Perform common checks using the client.Object interface. + if !obj.GetDeletionTimestamp().IsZero() { logger.Info("InferencePool is marked for deletion. Clearing the datastore") c.Datastore.Clear() return ctrl.Result{}, nil } - // update pool in datastore - if err := c.Datastore.PoolSet(ctx, c.Reader, infPool); err != nil { + + // 4. Convert the fetched object to the canonical v1.InferencePool. + var v1infPool *v1.InferencePool + + switch pool := obj.(type) { + case *v1.InferencePool: + // If it's already a v1 object, just use it. + v1infPool = pool + case *v1alpha2.InferencePool: + // If it's a v1alpha2 object, convert it to v1. + var uns *unstructured.Unstructured + uns, err := common.ToUnstructured(pool) + if err != nil { + logger.Error(err, "Failed to convert inferencePool to unstructured") + return ctrl.Result{}, err + } + v1infPool, err = common.ToInferencePool(uns) + if err != nil { + logger.Error(err, "Failed to convert unstructured to inferencePool") + return ctrl.Result{}, err + } + default: + return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) + } + + if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil { logger.Error(err, "Failed to update datastore") return ctrl.Result{}, err } @@ -68,7 +113,16 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques } func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&v1.InferencePool{}). - Complete(c) + switch c.PoolGKNN.Group { + case v1alpha2.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha2.InferencePool{}). + Complete(c) + case v1.GroupName: + return ctrl.NewControllerManagedBy(mgr). + For(&v1.InferencePool{}). + Complete(c) + default: + return fmt.Errorf("unknown group %s", c.PoolGKNN.Group) + } } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index d4f7253b68..a29d531b99 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -33,6 +34,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" @@ -41,12 +43,7 @@ import ( var ( selector_v1 = map[string]string{"app": "vllm_v1"} selector_v2 = map[string]string{"app": "vllm_v2"} - pool1 = utiltest.MakeInferencePool("pool1"). - Namespace("pool1-ns"). - Selector(selector_v1). - TargetPortNumber(8080).ObjRef() - pool2 = utiltest.MakeInferencePool("pool2").Namespace("pool2-ns").ObjRef() - pods = []*corev1.Pod{ + pods = []*corev1.Pod{ // Two ready pods matching pool1 utiltest.MakePod("pod1"). Namespace("pool1-ns"). @@ -75,14 +72,24 @@ var ( func TestInferencePoolReconciler(t *testing.T) { // The best practice is to use table-driven tests, however in this scaenario it seems // more logical to do a single test with steps that depend on each other. + gvk := schema.GroupVersionKind{ + Group: v1.GroupVersion.Group, + Version: v1.GroupVersion.Version, + Kind: "InferencePool", + } + pool1 := utiltest.MakeInferencePool("pool1"). + Namespace("pool1-ns"). + Selector(selector_v1). + TargetPortNumber(8080).ObjRef() + pool1.SetGroupVersionKind(gvk) + pool2 := utiltest.MakeInferencePool("pool2").Namespace("pool2-ns").ObjRef() + pool2.SetGroupVersionKind(gvk) // Set up the scheme. scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) _ = v1alpha2.Install(scheme) _ = v1.Install(scheme) - - // Create a fake client with the pool and the pods. initialObjects := []client.Object{pool1, pool2} for i := range pods { initialObjects = append(initialObjects, pods[i]) @@ -94,12 +101,19 @@ func TestInferencePoolReconciler(t *testing.T) { // Create a request for the existing resource. namespacedName := types.NamespacedName{Name: pool1.Name, Namespace: pool1.Namespace} + gknn := common.GKNN{ + NamespacedName: namespacedName, + GroupKind: schema.GroupKind{ + Group: pool1.GroupVersionKind().Group, + Kind: pool1.GroupVersionKind().Kind, + }, + } req := ctrl.Request{NamespacedName: namespacedName} ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) datastore := datastore.NewDatastore(ctx, pmf) - inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore} + inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { @@ -191,3 +205,153 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string { } return "" } + +// Duplicate it as it is just a temporary code +// "inference.networking.x-k8s.io" InferencePool will get removed in the near future. +func TestXInferencePoolReconciler(t *testing.T) { + // The best practice is to use table-driven tests, however in this scaenario it seems + // more logical to do a single test with steps that depend on each other. + gvk := schema.GroupVersionKind{ + Group: v1alpha2.GroupVersion.Group, + Version: v1alpha2.GroupVersion.Version, + Kind: "InferencePool", + } + pool1 := utiltest.MakeXInferencePool("pool1"). + Namespace("pool1-ns"). + Selector(selector_v1). + TargetPortNumber(8080).ObjRef() + pool2 := utiltest.MakeXInferencePool("pool2").Namespace("pool2-ns").ObjRef() + pool1.SetGroupVersionKind(gvk) + pool2.SetGroupVersionKind(gvk) + + // Set up the scheme. + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = v1alpha2.Install(scheme) + _ = v1.Install(scheme) + initialObjects := []client.Object{pool1, pool2} + for i := range pods { + initialObjects = append(initialObjects, pods[i]) + } + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initialObjects...). + Build() + + // Create a request for the existing resource. + namespacedName := types.NamespacedName{Name: pool1.Name, Namespace: pool1.Namespace} + gknn := common.GKNN{ + NamespacedName: namespacedName, + GroupKind: schema.GroupKind{ + Group: pool1.GroupVersionKind().Group, + Kind: pool1.GroupVersionKind().Kind, + }, + } + req := ctrl.Request{NamespacedName: namespacedName} + ctx := context.Background() + + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2) + datastore := datastore.NewDatastore(ctx, pmf) + inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: datastore, PoolGKNN: gknn} + + // Step 1: Inception, only ready pods matching pool1 are added to the store. + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPool: pool1, wantPods: []string{"pod1", "pod2"}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } + + newPool1 := &v1alpha2.InferencePool{} + if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { + t.Errorf("Unexpected pool get error: %v", err) + } + newPool1.Spec.Selector = map[v1alpha2.LabelKey]v1alpha2.LabelValue{"app": "vllm_v2"} + if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { + t.Errorf("Unexpected pool update error: %v", err) + } + + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5"}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } + + // Step 3: update the pool port + if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { + t.Errorf("Unexpected pool get error: %v", err) + } + newPool1.Spec.TargetPortNumber = 9090 + if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { + t.Errorf("Unexpected pool update error: %v", err) + } + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5"}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } + + // Step 4: delete the pool to trigger a datastore clear + if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil { + t.Errorf("Unexpected pool get error: %v", err) + } + if err := fakeClient.Delete(ctx, newPool1, &client.DeleteOptions{}); err != nil { + t.Errorf("Unexpected pool delete error: %v", err) + } + if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { + t.Errorf("Unexpected InferencePool reconcile error: %v", err) + } + if diff := xDiffStore(t, datastore, xDiffStoreParams{wantPods: []string{}}); diff != "" { + t.Errorf("Unexpected diff (+got/-want): %s", diff) + } +} + +type xDiffStoreParams struct { + wantPool *v1alpha2.InferencePool + wantPods []string + wantObjectives []*v1alpha2.InferenceObjective +} + +func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStoreParams) string { + gotPool, _ := datastore.PoolGet() + if gotPool == nil && params.wantPool == nil { + return "" + } + uns, err := common.ToUnstructured(gotPool) + if err != nil { + t.Fatalf("failed to convert XInferencePool to Unstructured: %v", err) + } + gotXPool, err := common.ToXInferencePool(uns) + if err != nil { + t.Fatalf("failed to convert unstructured to InferencePool: %v", err) + } + if diff := cmp.Diff(params.wantPool, gotXPool); diff != "" { + return "pool:" + diff + } + + // Default wantPods if not set because PodGetAll returns an empty slice when empty. + if params.wantPods == nil { + params.wantPods = []string{} + } + gotPods := []string{} + for _, pm := range datastore.PodList(backendmetrics.AllPodPredicate) { + gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + } + if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + return "pods:" + diff + } + + // Default wantModels if not set because ModelGetAll returns an empty slice when empty. + if params.wantObjectives == nil { + params.wantObjectives = []*v1alpha2.InferenceObjective{} + } + + if diff := cmp.Diff(params.wantObjectives, datastore.ObjectiveGetAll(), cmpopts.SortSlices(func(a, b *v1alpha2.InferenceObjective) bool { + return a.Name < b.Name + })); diff != "" { + return "models:" + diff + } + return "" +} diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 9be7ceb5c5..05038f7c0d 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -22,7 +22,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -34,6 +33,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" ) var scheme = runtime.NewScheme() @@ -45,39 +45,52 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(namespacedName types.NamespacedName, metricsServerOptions metricsserver.Options) ctrl.Options { - return ctrl.Options{ +func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { + opt := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ &corev1.Pod{}: { Namespaces: map[string]cache.Config{ - namespacedName.Namespace: {}, - }, - }, - &v1.InferencePool{}: { - Namespaces: map[string]cache.Config{ - namespacedName.Namespace: { - FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": namespacedName.Name, - }), - }, + gknn.Namespace: {}, }, }, &v1alpha2.InferenceObjective{}: { Namespaces: map[string]cache.Config{ - namespacedName.Namespace: {}, + gknn.Namespace: {}, }, }, }, }, Metrics: metricsServerOptions, } + switch gknn.Group { + case v1alpha2.GroupName: + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + case v1.GroupName: + opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": gknn.Name, + })}}, + } + default: + return ctrl.Options{}, fmt.Errorf("unknown group: %s", gknn.Group) + } + + return opt, nil } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(namespacedName types.NamespacedName, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { - manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(namespacedName, metricsServerOptions)) +func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { + opt, err := defaultManagerOptions(gknn, metricsServerOptions) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager options: %v", err) + } + manager, err := ctrl.NewManager(restConfig, opt) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 2cbcbec0ed..629af48c8a 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -28,12 +28,14 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" tlsutil "sigs.k8s.io/gateway-api-inference-extension/internal/tls" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -48,6 +50,7 @@ type ExtProcServerRunner struct { DestinationEndpointHintKey string FairnessIDHeaderKey string PoolNamespacedName types.NamespacedName + PoolGKNN common.GKNN Datastore datastore.Datastore SecureServing bool HealthChecking bool @@ -83,18 +86,27 @@ const ( DefaultCertPath = "" // default for --cert-path DefaultConfigFile = "" // default for --config-file DefaultConfigText = "" // default for --config-text + DefaultPoolGroup = "inference.networking.k8s.io" // default for --pool-group DefaultMetricsStalenessThreshold = 2 * time.Second ) // NewDefaultExtProcServerRunner creates a runner with default values. // Note: Dependencies like Datastore, Scheduler, SD need to be set separately. func NewDefaultExtProcServerRunner() *ExtProcServerRunner { + poolGKNN := common.GKNN{ + NamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + GroupKind: schema.GroupKind{ + Group: DefaultPoolGroup, + Kind: "InferencePool", + }, + } return &ExtProcServerRunner{ GrpcPort: DefaultGrpcPort, DestinationEndpointHintKey: DefaultDestinationEndpointHintKey, DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace, FairnessIDHeaderKey: DefaultFairnessIDHeaderKey, PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, + PoolGKNN: poolGKNN, SecureServing: DefaultSecureServing, HealthChecking: DefaultHealthChecking, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, @@ -109,14 +121,15 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man if err := (&controller.InferencePoolReconciler{ Datastore: r.Datastore, Reader: mgr.GetClient(), + PoolGKNN: r.PoolGKNN, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) } if err := (&controller.InferenceObjectiveReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolNamespacedName: r.PoolNamespacedName, + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.PoolGKNN, }).SetupWithManager(ctx, mgr); err != nil { return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) } diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index 7905e1f684..84a6015e57 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -19,7 +19,6 @@ package testing import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" ) @@ -169,7 +168,7 @@ func (m *InferenceObjectiveWrapper) CreationTimestamp(t metav1.Time) *InferenceO return m } -// InferencePoolWrapper wraps an InferencePool. +// InferencePoolWrapper wraps an group "inference.networking.k8s.io" InferencePool. type InferencePoolWrapper struct { v1.InferencePool } @@ -214,3 +213,49 @@ func (m *InferencePoolWrapper) ExtensionRef(name string) *InferencePoolWrapper { func (m *InferencePoolWrapper) ObjRef() *v1.InferencePool { return &m.InferencePool } + +// XInferencePoolWrapper wraps an group "inference.networking.x-k8s.io" InferencePool. +type XInferencePoolWrapper struct { + v1alpha2.InferencePool +} + +// MakeXInferencePool creates a wrapper for a InferencePool. +func MakeXInferencePool(name string) *XInferencePoolWrapper { + return &XInferencePoolWrapper{ + v1alpha2.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha2.InferencePoolSpec{}, + }, + } +} + +func (m *XInferencePoolWrapper) Namespace(ns string) *XInferencePoolWrapper { + m.ObjectMeta.Namespace = ns + return m +} + +func (m *XInferencePoolWrapper) Selector(selector map[string]string) *XInferencePoolWrapper { + s := make(map[v1alpha2.LabelKey]v1alpha2.LabelValue) + for k, v := range selector { + s[v1alpha2.LabelKey(k)] = v1alpha2.LabelValue(v) + } + m.Spec.Selector = s + return m +} + +func (m *XInferencePoolWrapper) TargetPortNumber(p int32) *XInferencePoolWrapper { + m.Spec.TargetPortNumber = p + return m +} + +func (m *XInferencePoolWrapper) ExtensionRef(name string) *XInferencePoolWrapper { + m.Spec.ExtensionRef = &v1alpha2.Extension{ExtensionReference: v1alpha2.ExtensionReference{Name: v1alpha2.ObjectName(name)}} + return m +} + +// Obj returns the wrapped InferencePool. +func (m *XInferencePoolWrapper) ObjRef() *v1alpha2.InferencePool { + return &m.InferencePool +} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index ba17b64a2c..859a147163 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" k8syaml "k8s.io/apimachinery/pkg/util/yaml" @@ -59,6 +60,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -1086,7 +1088,10 @@ func BeforeSuite() func() { serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond, time.Second*2) // Adjust from defaults - serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace} + serverRunner.PoolGKNN = common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName}, + GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, + } serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) kvCacheUtilizationScorer := scorer.NewKVCacheUtilizationScorer()