Skip to content
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN go mod download

# Sources
COPY cmd/epp ./cmd/epp
COPY pkg/common ./pkg/common
COPY pkg/epp ./pkg/epp
COPY internal ./internal
COPY apix ./apix
Expand Down
17 changes: 16 additions & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -41,6 +42,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
Expand Down Expand Up @@ -82,6 +84,10 @@ var (
"pool-name",
runserver.DefaultPoolName,
"Name of the InferencePool this Endpoint Picker is associated with.")
poolGroup = flag.String(
"pool-group",
runserver.DefaultPoolGroup,
"group of the InferencePool this Endpoint Picker is associated with.")
poolNamespace = flag.String(
"pool-namespace",
runserver.DefaultPoolNamespace,
Expand Down Expand Up @@ -301,7 +307,15 @@ func (r *Runner) Run(ctx context.Context) error {
Name: *poolName,
Namespace: *poolNamespace,
}
mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg, metricsServerOptions)
poolGroupKind := schema.GroupKind{
Group: *poolGroup,
Kind: "InferencePool",
}
poolGKNN := common.GKNN{
NamespacedName: poolNamespacedName,
GroupKind: poolGroupKind,
}
mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions)
if err != nil {
setupLog.Error(err, "Failed to create controller manager")
return err
Expand Down Expand Up @@ -344,6 +358,7 @@ func (r *Runner) Run(ctx context.Context) error {
DestinationEndpointHintKey: *destinationEndpointHintKey,
FairnessIDHeaderKey: *fairnessIDHeaderKey,
PoolNamespacedName: poolNamespacedName,
PoolGKNN: poolGKNN,
Datastore: datastore,
SecureServing: *secureServing,
HealthChecking: *healthChecking,
Expand Down
48 changes: 48 additions & 0 deletions pkg/common/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package common defines structs for referring to fully qualified k8s resources.
package common

import (
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
)

func ToUnstructured(obj any) (*unstructured.Unstructured, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
return &unstructured.Unstructured{Object: u}, nil
}

var ToInferencePool = convert[v1.InferencePool]

var ToXInferencePool = convert[v1alpha2.InferencePool]

func convert[T any](u *unstructured.Unstructured) (*T, error) {
var res T
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &res); err != nil {
return nil, fmt.Errorf("error converting unstructured to T: %v", err)
}
return &res, nil
}
57 changes: 57 additions & 0 deletions pkg/common/kubemeta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package common defines structs for referring to fully qualified k8s resources.
package common

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)

// GKNN represents a fully qualified k8s resource.
type GKNN struct {
types.NamespacedName
schema.GroupKind
}

// String implements Stringer.
func (g *GKNN) String() string {
return fmt.Sprintf("%s %s", g.GroupKind.String(), g.NamespacedName.String())
}

// Compare returns the comparison of a and b where less than, equal, and greater than return -1, 0,
// and 1 respectively.
func Compare(a, b GKNN) int {
if v := strings.Compare(a.Group, b.Group); v != 0 {
return v
}
if v := strings.Compare(a.Kind, b.Kind); v != 0 {
return v
}
if v := strings.Compare(a.Namespace, b.Namespace); v != 0 {
return v
}
return strings.Compare(a.Name, b.Name)
}

// Less returns true if a is less than b.
func Less(a, b GKNN) bool {
return Compare(a, b) < 0
}
9 changes: 5 additions & 4 deletions pkg/epp/controller/inferenceobjective_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

type InferenceObjectiveReconciler struct {
client.Reader
Datastore datastore.Datastore
PoolNamespacedName types.NamespacedName
Datastore datastore.Datastore
PoolGKNN common.GKNN
}

func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand All @@ -55,7 +56,7 @@ func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.R
notFound = true
}

if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolNamespacedName.Name) {
if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolGKNN.Name) {
// InferenceObjective object got deleted or changed the referenced pool.
err := c.handleObjectiveDeleted(ctx, req.NamespacedName)
return ctrl.Result{}, err
Expand Down Expand Up @@ -125,5 +126,5 @@ func (c *InferenceObjectiveReconciler) SetupWithManager(ctx context.Context, mgr
}

func (c *InferenceObjectiveReconciler) eventPredicate(infObjective *v1alpha2.InferenceObjective) bool {
return string(infObjective.Spec.PoolRef.Name) == c.PoolNamespacedName.Name
return string(infObjective.Spec.PoolRef.Name) == c.PoolGKNN.Name
}
11 changes: 8 additions & 3 deletions pkg/epp/controller/inferenceobjective_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -32,6 +33,7 @@ import (

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
Expand Down Expand Up @@ -203,9 +205,12 @@ func TestInferenceObjectiveReconciler(t *testing.T) {
}
_ = ds.PoolSet(context.Background(), fakeClient, pool)
reconciler := &InferenceObjectiveReconciler{
Reader: fakeClient,
Datastore: ds,
PoolNamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace},
Reader: fakeClient,
Datastore: ds,
PoolGKNN: common.GKNN{
NamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace},
GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind},
},
}
if test.incomingReq == nil {
test.incomingReq = &types.NamespacedName{Name: test.objective.Name, Namespace: test.objective.Namespace}
Expand Down
74 changes: 64 additions & 10 deletions pkg/epp/controller/inferencepool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package controller

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)
Expand All @@ -35,31 +39,72 @@ import (
type InferencePoolReconciler struct {
client.Reader
Datastore datastore.Datastore
PoolGKNN common.GKNN
}

func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("inferencePool", req.NamespacedName).V(logutil.DEFAULT)
logger := log.FromContext(ctx).WithValues("group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName).V(logutil.DEFAULT)
ctx = ctrl.LoggerInto(ctx, logger)

logger.Info("Reconciling InferencePool")
logger.Info("Reconciling InferencePool", "group", c.PoolGKNN.Group, "inferencePool", req.NamespacedName)

infPool := &v1.InferencePool{}
// 1. Initialize a generic client.Object based on the group.
var obj client.Object
switch c.PoolGKNN.Group {
case v1.GroupName:
obj = &v1.InferencePool{}
case v1alpha2.GroupName:
obj = &v1alpha2.InferencePool{}
default:
// Handle unsupported groups gracefully.
err := fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group)
logger.Error(err, "Cannot reconcile InferencePool")
return ctrl.Result{}, err
}

if err := c.Get(ctx, req.NamespacedName, infPool); err != nil {
// 2. Perform a single, generic fetch for the object.
if err := c.Get(ctx, req.NamespacedName, obj); err != nil {
if errors.IsNotFound(err) {
logger.Info("InferencePool not found. Clearing the datastore")
c.Datastore.Clear()
return ctrl.Result{}, nil
}
logger.Error(err, "Unable to get InferencePool")
return ctrl.Result{}, err
} else if !infPool.DeletionTimestamp.IsZero() {
}

// 3. Perform common checks using the client.Object interface.
if !obj.GetDeletionTimestamp().IsZero() {
logger.Info("InferencePool is marked for deletion. Clearing the datastore")
c.Datastore.Clear()
return ctrl.Result{}, nil
}
// update pool in datastore
if err := c.Datastore.PoolSet(ctx, c.Reader, infPool); err != nil {

// 4. Convert the fetched object to the canonical v1.InferencePool.
var v1infPool *v1.InferencePool

switch pool := obj.(type) {
case *v1.InferencePool:
// If it's already a v1 object, just use it.
v1infPool = pool
case *v1alpha2.InferencePool:
// If it's a v1alpha2 object, convert it to v1.
var uns *unstructured.Unstructured
uns, err := common.ToUnstructured(pool)
if err != nil {
logger.Error(err, "Failed to convert inferencePool to unstructured")
return ctrl.Result{}, err
}
v1infPool, err = common.ToInferencePool(uns)
if err != nil {
logger.Error(err, "Failed to convert unstructured to inferencePool")
return ctrl.Result{}, err
}
default:
return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group)
}

if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil {
logger.Error(err, "Failed to update datastore")
return ctrl.Result{}, err
}
Expand All @@ -68,7 +113,16 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.InferencePool{}).
Complete(c)
switch c.PoolGKNN.Group {
case v1alpha2.GroupName:
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha2.InferencePool{}).
Complete(c)
case v1.GroupName:
return ctrl.NewControllerManagedBy(mgr).
For(&v1.InferencePool{}).
Complete(c)
default:
return fmt.Errorf("unknown group %s", c.PoolGKNN.Group)
}
}
Loading