Skip to content

Commit bab851d

Browse files
authored
Pluggable data layer: transition backend/metrics to use type aliases from datalayer package (#1351)
* type alias from backend to datalayer Signed-off-by: Etai Lev Ran <[email protected]> * add String() to Endpoint Signed-off-by: Etai Lev Ran <[email protected]> * remove StopRefreshLoop from interface and delegate to factory Signed-off-by: Etai Lev Ran <[email protected]> * alias PodMetrics to datalayer.Endpoint Signed-off-by: Etai Lev Ran <[email protected]> * move interfaces from backend.metrics to datalayer Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent f92d5bf commit bab851d

File tree

10 files changed

+102
-120
lines changed

10 files changed

+102
-120
lines changed

pkg/epp/backend/metrics/fake.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/types"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728

2829
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2931
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3032
)
3133

@@ -42,13 +44,23 @@ func (fpm *FakePodMetrics) String() string {
4244
func (fpm *FakePodMetrics) GetPod() *backend.Pod {
4345
return fpm.Pod
4446
}
47+
4548
func (fpm *FakePodMetrics) GetMetrics() *MetricsState {
4649
return fpm.Metrics
4750
}
51+
4852
func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
4953
fpm.Pod = toInternalPod(pod)
5054
}
51-
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop
55+
56+
func (*FakePodMetrics) Put(string, datalayer.Cloneable) {}
57+
func (*FakePodMetrics) Get(string) (datalayer.Cloneable, bool) { return nil, false }
58+
func (*FakePodMetrics) Keys() []string { return nil }
59+
60+
func (fpm *FakePodMetrics) UpdateMetrics(updated *MetricsState) {
61+
updated.UpdateTime = time.Now()
62+
fpm.Metrics = updated
63+
}
5264

5365
type FakePodMetricsClient struct {
5466
errMu sync.RWMutex

pkg/epp/backend/metrics/logger.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/go-logr/logr"
2525
"sigs.k8s.io/controller-runtime/pkg/log"
2626

27-
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2929
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3030
)
@@ -33,15 +33,9 @@ const (
3333
debugPrintInterval = 5 * time.Second
3434
)
3535

36-
type Datastore interface {
37-
PoolGet() (*v1.InferencePool, error)
38-
// PodMetrics operations
39-
PodList(func(PodMetrics) bool) []PodMetrics
40-
}
41-
4236
// StartMetricsLogger starts goroutines to 1) Print metrics debug logs if the DEBUG log level is
4337
// enabled; 2) flushes Prometheus metrics about the backend servers.
44-
func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
38+
func StartMetricsLogger(ctx context.Context, datastore datalayer.PoolInfo, refreshPrometheusMetricsInterval, metricsStalenessThreshold time.Duration) {
4539
logger := log.FromContext(ctx)
4640
ticker := time.NewTicker(refreshPrometheusMetricsInterval)
4741
go func() {
@@ -80,7 +74,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
8074
}
8175
}
8276

83-
func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore, metricsStalenessThreshold time.Duration) {
77+
func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, metricsStalenessThreshold time.Duration) {
8478
pool, err := datastore.PoolGet()
8579
if err != nil {
8680
// No inference pool or not initialize.

pkg/epp/backend/metrics/metrics_state.go

Lines changed: 3 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,64 +17,13 @@ limitations under the License.
1717
package metrics
1818

1919
import (
20-
"fmt"
21-
"time"
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2221
)
2322

2423
// NewMetricsState initializes a new MetricsState and returns its pointer.
2524
func NewMetricsState() *MetricsState {
26-
return &MetricsState{
27-
ActiveModels: make(map[string]int),
28-
WaitingModels: make(map[string]int),
29-
}
25+
return datalayer.NewMetrics()
3026
}
3127

3228
// MetricsState holds the latest state of the metrics that were scraped from a pod.
33-
type MetricsState struct {
34-
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
35-
ActiveModels map[string]int
36-
WaitingModels map[string]int
37-
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
38-
MaxActiveModels int
39-
RunningQueueSize int
40-
WaitingQueueSize int
41-
KVCacheUsagePercent float64
42-
KvCacheMaxTokenCapacity int
43-
44-
// UpdateTime record the last time when the metrics were updated.
45-
UpdateTime time.Time
46-
}
47-
48-
// String returns a string with all MetricState information
49-
func (s *MetricsState) String() string {
50-
if s == nil {
51-
return ""
52-
}
53-
return fmt.Sprintf("%+v", *s)
54-
}
55-
56-
// Clone creates a copy of MetricsState and returns its pointer.
57-
// Clone returns nil if the object being cloned is nil.
58-
func (s *MetricsState) Clone() *MetricsState {
59-
if s == nil {
60-
return nil
61-
}
62-
activeModels := make(map[string]int, len(s.ActiveModels))
63-
for key, value := range s.ActiveModels {
64-
activeModels[key] = value
65-
}
66-
waitingModels := make(map[string]int, len(s.WaitingModels))
67-
for key, value := range s.WaitingModels {
68-
waitingModels[key] = value
69-
}
70-
return &MetricsState{
71-
ActiveModels: activeModels,
72-
WaitingModels: waitingModels,
73-
MaxActiveModels: s.MaxActiveModels,
74-
RunningQueueSize: s.RunningQueueSize,
75-
WaitingQueueSize: s.WaitingQueueSize,
76-
KVCacheUsagePercent: s.KVCacheUsagePercent,
77-
KvCacheMaxTokenCapacity: s.KvCacheMaxTokenCapacity,
78-
UpdateTime: s.UpdateTime,
79-
}
80-
}
29+
type MetricsState = datalayer.Metrics

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/apimachinery/pkg/types"
2929

3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
3132
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3233
)
3334

@@ -39,7 +40,7 @@ type podMetrics struct {
3940
pod atomic.Pointer[backend.Pod]
4041
metrics atomic.Pointer[MetricsState]
4142
pmc PodMetricsClient
42-
ds Datastore
43+
ds datalayer.PoolInfo
4344
interval time.Duration
4445

4546
startOnce sync.Once // ensures the refresh loop goroutine is started only once
@@ -129,17 +130,27 @@ func (pm *podMetrics) refreshMetrics() error {
129130
// this case, the updated metrics object will have partial updates. A partial update is
130131
// considered better than no updates.
131132
if updated != nil {
132-
updated.UpdateTime = time.Now()
133-
pm.logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", updated)
134-
pm.metrics.Store(updated)
133+
pm.UpdateMetrics(updated)
135134
}
136135

137136
return nil
138137
}
139138

140-
func (pm *podMetrics) StopRefreshLoop() {
139+
func (pm *podMetrics) stopRefreshLoop() {
141140
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
142141
pm.stopOnce.Do(func() {
143142
close(pm.done)
144143
})
145144
}
145+
146+
// Allowing forward compatibility between PodMetrics and datalayer.Endpoint, by
147+
// implementing missing functions (e.g., extended attributes support) as no-op.
148+
func (*podMetrics) Put(string, datalayer.Cloneable) {}
149+
func (*podMetrics) Get(string) (datalayer.Cloneable, bool) { return nil, false }
150+
func (*podMetrics) Keys() []string { return nil }
151+
152+
func (pm *podMetrics) UpdateMetrics(updated *MetricsState) {
153+
updated.UpdateTime = time.Now()
154+
pm.logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", updated)
155+
pm.metrics.Store(updated)
156+
}

pkg/epp/backend/metrics/pod_metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestMetricsRefresh(t *testing.T) {
6565
pmf := NewPodMetricsFactory(pmc, time.Millisecond)
6666

6767
// The refresher is initialized with empty metrics.
68-
pm := pmf.NewPodMetrics(ctx, pod1, &fakeDataStore{})
68+
pm := pmf.NewEndpoint(ctx, pod1, &fakeDataStore{})
6969

7070
namespacedName := types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
7171
// Use SetRes to simulate an update of metrics from the pod.
@@ -78,7 +78,7 @@ func TestMetricsRefresh(t *testing.T) {
7878

7979
// Stop the loop, and simulate metric update again, this time the PodMetrics won't get the
8080
// new update.
81-
pm.StopRefreshLoop()
81+
pmf.ReleaseEndpoint(pm)
8282
time.Sleep(pmf.refreshMetricsInterval * 2 /* small buffer for robustness */)
8383
pmc.SetRes(map[types.NamespacedName]*MetricsState{namespacedName: updated})
8484
// Still expect the same condition (no metrics update).

pkg/epp/backend/metrics/types.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
corev1 "k8s.io/api/core/v1"
2626
"sigs.k8s.io/controller-runtime/pkg/log"
2727

28-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2929
)
3030

3131
var (
@@ -53,7 +53,7 @@ type PodMetricsFactory struct {
5353
refreshMetricsInterval time.Duration
5454
}
5555

56-
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
56+
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, in *corev1.Pod, ds datalayer.PoolInfo) PodMetrics {
5757
pod := toInternalPod(in)
5858
pm := &podMetrics{
5959
pmc: f.pmc,
@@ -71,10 +71,10 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.
7171
return pm
7272
}
7373

74-
type PodMetrics interface {
75-
GetPod() *backend.Pod
76-
GetMetrics() *MetricsState
77-
UpdatePod(*corev1.Pod)
78-
StopRefreshLoop()
79-
String() string
74+
func (f *PodMetricsFactory) ReleaseEndpoint(ep PodMetrics) {
75+
if pm, ok := ep.(*podMetrics); ok {
76+
pm.stopRefreshLoop()
77+
}
8078
}
79+
80+
type PodMetrics = datalayer.Endpoint

pkg/epp/backend/pod.go

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,7 @@ limitations under the License.
1717
package backend
1818

1919
import (
20-
"fmt"
21-
22-
"k8s.io/apimachinery/pkg/types"
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2321
)
2422

25-
type Pod struct {
26-
NamespacedName types.NamespacedName
27-
Address string
28-
Labels map[string]string
29-
}
30-
31-
func (p *Pod) String() string {
32-
if p == nil {
33-
return ""
34-
}
35-
return fmt.Sprintf("%+v", *p)
36-
}
37-
38-
func (p *Pod) Clone() *Pod {
39-
if p == nil {
40-
return nil
41-
}
42-
clonedLabels := make(map[string]string, len(p.Labels))
43-
for key, value := range p.Labels {
44-
clonedLabels[key] = value
45-
}
46-
return &Pod{
47-
NamespacedName: types.NamespacedName{
48-
Name: p.NamespacedName.Name,
49-
Namespace: p.NamespacedName.Namespace,
50-
},
51-
Address: p.Address,
52-
Labels: clonedLabels,
53-
}
54-
}
23+
type Pod = datalayer.PodInfo

pkg/epp/datalayer/endpoint.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type EndpointMetricsState interface {
3737

3838
// Endpoint represents an inference serving endpoint and its related attributes.
3939
type Endpoint interface {
40+
fmt.Stringer
4041
EndpointPodState
4142
EndpointMetricsState
4243
AttributeMap
@@ -57,7 +58,7 @@ func NewEndpoint() *ModelServer {
5758
}
5859

5960
// String returns a representation of the ModelServer. For brevity, only names of
60-
// extended attributes are returned and not the values.
61+
// extended attributes are returned and not their values.
6162
func (srv *ModelServer) String() string {
6263
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
6364
}

pkg/epp/datalayer/factory.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"context"
21+
22+
corev1 "k8s.io/api/core/v1"
23+
24+
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
25+
)
26+
27+
// PoolInfo represents the DataStore information needed for endpoints.
28+
// TODO:
29+
// Consider if to remove/simplify in follow-ups. This is mostly for backward
30+
// compatibility with backend.metrics' expectations and allowing a shared
31+
// implementation during the transition.
32+
// - Endpoint metric scraping uses PoolGet to access the pool's Port and Name.
33+
// - Global metrics logging uses PoolGet solely for error return and PodList to enumerate
34+
// all endpoints for metrics summarization.
35+
type PoolInfo interface {
36+
PoolGet() (*v1.InferencePool, error)
37+
PodList(func(Endpoint) bool) []Endpoint
38+
}
39+
40+
// EndpointFactory defines an interface for managing Endpoint lifecycle. Specifically,
41+
// providing methods to allocate and retire endpoints. This can potentially be used for
42+
// pooled memory or other management chores in the implementation.
43+
type EndpointFactory interface {
44+
NewEndpoint(parent context.Context, inpod *corev1.Pod, poolinfo PoolInfo) Endpoint
45+
ReleaseEndpoint(ep Endpoint)
46+
}

0 commit comments

Comments
 (0)