Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/epp/backend/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand All @@ -42,13 +44,23 @@ func (fpm *FakePodMetrics) String() string {
func (fpm *FakePodMetrics) GetPod() *backend.Pod {
return fpm.Pod
}

func (fpm *FakePodMetrics) GetMetrics() *MetricsState {
return fpm.Metrics
}

func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
fpm.Pod = toInternalPod(pod)
}
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop

func (*FakePodMetrics) Put(string, datalayer.Cloneable) {}
func (*FakePodMetrics) Get(string) (datalayer.Cloneable, bool) { return nil, false }
func (*FakePodMetrics) Keys() []string { return nil }

func (fpm *FakePodMetrics) UpdateMetrics(updated *MetricsState) {
updated.UpdateTime = time.Now()
fpm.Metrics = updated
}

type FakePodMetricsClient struct {
errMu sync.RWMutex
Expand Down
12 changes: 3 additions & 9 deletions pkg/epp/backend/metrics/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)
Expand All @@ -33,15 +33,9 @@ const (
debugPrintInterval = 5 * time.Second
)

type Datastore interface {
PoolGet() (*v1.InferencePool, error)
// PodMetrics operations
PodList(func(PodMetrics) bool) []PodMetrics
}

// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
// enabled; 2) flushes Prometheus metrics about the backend servers.
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
func StartMetricsLogger(ctx context.Context, datastore datalayer.PoolInfo, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
logger := log.FromContext(ctx)
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
go func() {
Expand Down Expand Up @@ -82,7 +76,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
}
}

func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsStalenessThreshold time.Duration) {
func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, metricsStalenessThreshold time.Duration) {
pool, err := datastore.PoolGet()
if err != nil {
// No inference pool or not initialize.
Expand Down
57 changes: 3 additions & 54 deletions pkg/epp/backend/metrics/metrics_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,64 +17,13 @@ limitations under the License.
package metrics

import (
"fmt"
"time"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

// NewMetricsState initializes a new MetricsState and returns its pointer.
func NewMetricsState() *MetricsState {
return &MetricsState{
ActiveModels: make(map[string]int),
WaitingModels: make(map[string]int),
}
return datalayer.NewMetrics()
}

// MetricsState holds the latest state of the metrics that were scraped from a pod.
type MetricsState struct {
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
ActiveModels map[string]int
WaitingModels map[string]int
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
MaxActiveModels int
RunningQueueSize int
WaitingQueueSize int
KVCacheUsagePercent float64
KvCacheMaxTokenCapacity int

// UpdateTime record the last time when the metrics were updated.
UpdateTime time.Time
}

// String returns a string with all MetricState information
func (s *MetricsState) String() string {
if s == nil {
return ""
}
return fmt.Sprintf("%+v", *s)
}

// Clone creates a copy of MetricsState and returns its pointer.
// Clone returns nil if the object being cloned is nil.
func (s *MetricsState) Clone() *MetricsState {
if s == nil {
return nil
}
activeModels := make(map[string]int, len(s.ActiveModels))
for key, value := range s.ActiveModels {
activeModels[key] = value
}
waitingModels := make(map[string]int, len(s.WaitingModels))
for key, value := range s.WaitingModels {
waitingModels[key] = value
}
return &MetricsState{
ActiveModels: activeModels,
WaitingModels: waitingModels,
MaxActiveModels: s.MaxActiveModels,
RunningQueueSize: s.RunningQueueSize,
WaitingQueueSize: s.WaitingQueueSize,
KVCacheUsagePercent: s.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: s.KvCacheMaxTokenCapacity,
UpdateTime: s.UpdateTime,
}
}
type MetricsState = datalayer.Metrics
21 changes: 16 additions & 5 deletions pkg/epp/backend/metrics/pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand All @@ -39,7 +40,7 @@ type podMetrics struct {
pod atomic.Pointer[backend.Pod]
metrics atomic.Pointer[MetricsState]
pmc PodMetricsClient
ds Datastore
ds datalayer.PoolInfo
interval time.Duration

startOnce sync.Once // ensures the refresh loop goroutine is started only once
Expand Down Expand Up @@ -129,17 +130,27 @@ func (pm *podMetrics) refreshMetrics() error {
// this case, the updated metrics object will have partial updates. A partial update is
// considered better than no updates.
if updated != nil {
updated.UpdateTime = time.Now()
pm.logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", updated)
pm.metrics.Store(updated)
pm.UpdateMetrics(updated)
}

return nil
}

func (pm *podMetrics) StopRefreshLoop() {
func (pm *podMetrics) stopRefreshLoop() {
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
pm.stopOnce.Do(func() {
close(pm.done)
})
}

// Allowing forward compatibility between PodMetrics and datalayer.Endpoint, by
// implementing missing functions (e.g., extended attributes support) as no-op.
func (*podMetrics) Put(string, datalayer.Cloneable) {}
func (*podMetrics) Get(string) (datalayer.Cloneable, bool) { return nil, false }
func (*podMetrics) Keys() []string { return nil }

func (pm *podMetrics) UpdateMetrics(updated *MetricsState) {
updated.UpdateTime = time.Now()
pm.logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", updated)
pm.metrics.Store(updated)
}
4 changes: 2 additions & 2 deletions pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestMetricsRefresh(t *testing.T) {
pmf := NewPodMetricsFactory(pmc, time.Millisecond, time.Second*2)

// The refresher is initialized with empty metrics.
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})
pm := pmf.NewEndpoint(ctx, pod1, &fakeDataStore{})

namespacedName := types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
// Use SetRes to simulate an update of metrics from the pod.
Expand All @@ -78,7 +78,7 @@ func TestMetricsRefresh(t *testing.T) {

// Stop the loop, and simulate metric update again, this time the PodMetrics won't get the
// new update.
pm.StopRefreshLoop()
pmf.ReleaseEndpoint(pm)
time.Sleep(pmf.refreshMetricsInterval * 2 /* small buffer for robustness */)
pmc.SetRes(map[types.NamespacedName]*MetricsState{namespacedName: updated})
// Still expect the same condition (no metrics update).
Expand Down
16 changes: 8 additions & 8 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

var (
Expand All @@ -46,7 +46,7 @@ type PodMetricsFactory struct {
metricsStalenessThreshold time.Duration
}

func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, in *corev1.Pod, ds datalayer.PoolInfo) PodMetrics {
pod := toInternalPod(in)
pm := &podMetrics{
pmc: f.pmc,
Expand All @@ -64,10 +64,10 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.
return pm
}

type PodMetrics interface {
GetPod() *backend.Pod
GetMetrics() *MetricsState
UpdatePod(*corev1.Pod)
StopRefreshLoop()
String() string
func (f *PodMetricsFactory) ReleaseEndpoint(ep PodMetrics) {
if pm, ok := ep.(*podMetrics); ok {
pm.stopRefreshLoop()
}
}

type PodMetrics = datalayer.Endpoint
35 changes: 2 additions & 33 deletions pkg/epp/backend/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,7 @@ limitations under the License.
package backend

import (
"fmt"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

type Pod struct {
NamespacedName types.NamespacedName
Address string
Labels map[string]string
}

func (p *Pod) String() string {
if p == nil {
return ""
}
return fmt.Sprintf("%+v", *p)
}

func (p *Pod) Clone() *Pod {
if p == nil {
return nil
}
clonedLabels := make(map[string]string, len(p.Labels))
for key, value := range p.Labels {
clonedLabels[key] = value
}
return &Pod{
NamespacedName: types.NamespacedName{
Name: p.NamespacedName.Name,
Namespace: p.NamespacedName.Namespace,
},
Address: p.Address,
Labels: clonedLabels,
}
}
type Pod = datalayer.PodInfo
3 changes: 2 additions & 1 deletion pkg/epp/datalayer/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type EndpointMetricsState interface {

// Endpoint represents an inference serving endpoint and its related attributes.
type Endpoint interface {
fmt.Stringer
EndpointPodState
EndpointMetricsState
AttributeMap
Expand All @@ -57,7 +58,7 @@ func NewEndpoint() *ModelServer {
}

// String returns a representation of the ModelServer. For brevity, only names of
// extended attributes are returned and not the values.
// extended attributes are returned and not their values.
func (srv *ModelServer) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/epp/datalayer/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"context"

corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
)

// PoolInfo represents the DataStore information needed for endpoints.
// TODO:
// Consider if to remove/simplify in follow-ups. This is mostly for backward
// compatibility with backend.metrics' expectations and allowing a shared
// implementation during the transition.
// - Endpoint metric scraping uses PoolGet to access the pool's Port and Name.
// - Global metrics logging uses PoolGet solely for error return and PodList to enumerate
// all endpoints for metrics summarization.
type PoolInfo interface {
PoolGet() (*v1.InferencePool, error)
PodList(func(Endpoint) bool) []Endpoint
}

// EndpointFactory defines an interface for managing Endpoint lifecycle. Specifically,
// providing methods to allocate and retire endpoints. This can potentially be used for
// pooled memory or other management chores in the implementation.
type EndpointFactory interface {
NewEndpoint(parent context.Context, inpod *corev1.Pod, poolinfo PoolInfo) Endpoint
ReleaseEndpoint(ep Endpoint)
}
Loading