Skip to content

Commit acd7103

Browse files
authored
Implement EPP Plugins by datalayer objects (#1901)
* decouple handle from data layer types Signed-off-by: Etai Lev Ran <[email protected]> * make datasource and extractor comply with Plugin interface Signed-off-by: Etai Lev Ran <[email protected]> * remove unused client from NewDataSource function Signed-off-by: Etai Lev Ran <[email protected]> * define datalayer configuration object Signed-off-by: Etai Lev Ran <[email protected]> * import grouping Signed-off-by: Etai Lev Ran <[email protected]> * change config field names per PR review Signed-off-by: Etai Lev Ran <[email protected]> * embed Plugin interface directly Signed-off-by: Etai Lev Ran <[email protected]> * address PR comments on use of type/name Signed-off-by: Etai Lev Ran <[email protected]> * Address remaining review comments. - use Name as the unique key to allow future extension to support multiple instances of the same type. - remove GetSourceByName/Type. Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent 337eb69 commit acd7103

File tree

12 files changed

+126
-96
lines changed

12 files changed

+126
-96
lines changed

cmd/epp/runner/runner.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,9 +473,22 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
473473
return rawConfig, nil
474474
}
475475

476+
// Return a function that can be used in the EPP Handle to list pod names.
477+
func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
478+
return func() []types.NamespacedName {
479+
pods := ds.PodList(func(_ backendmetrics.PodMetrics) bool { return true })
480+
names := make([]types.NamespacedName, 0, len(pods))
481+
482+
for _, p := range pods {
483+
names = append(names, p.GetPod().NamespacedName)
484+
}
485+
return names
486+
}
487+
}
488+
476489
func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
477490
logger := log.FromContext(ctx)
478-
handle := plugins.NewEppHandle(ctx, ds.PodList)
491+
handle := plugins.NewEppHandle(ctx, makePodListFunc(ds))
479492
cfg, err := loader.LoadConfigPhaseTwo(rawConfig, handle, logger)
480493

481494
if err != nil {
@@ -604,8 +617,7 @@ func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
604617
// create and register a metrics data source and extractor.
605618
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
606619
*modelServerMetricsPath,
607-
*modelServerMetricsHttpsInsecureSkipVerify,
608-
nil)
620+
*modelServerMetricsHttpsInsecureSkipVerify)
609621
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
610622
*totalRunningRequestsMetric,
611623
*kvCacheUsagePercentageMetric,
@@ -624,7 +636,7 @@ func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
624636
// TODO: this could be moved to the configuration loading functions once ported over.
625637
sources := datalayer.GetSources()
626638
for _, src := range sources {
627-
logger.Info("data layer configuration", "source", src.Name(), "extractors", src.Extractors())
639+
logger.Info("data layer configuration", "source", src.TypedName().String(), "extractors", src.Extractors())
628640
}
629641
factory := datalayer.NewEndpointFactory(sources, *refreshMetricsInterval)
630642
return factory, nil

pkg/epp/datalayer/collector_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apimachinery/pkg/types"
2828

2929
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/mocks"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3031
)
3132

3233
// --- Test Stubs ---
@@ -35,7 +36,16 @@ type DummySource struct {
3536
callCount int64
3637
}
3738

38-
func (d *DummySource) Name() string { return "test-dummy-data-source" }
39+
const (
40+
dummySource = "test-dummy-data-source"
41+
)
42+
43+
func (d *DummySource) TypedName() plugins.TypedName {
44+
return plugins.TypedName{
45+
Type: dummySource,
46+
Name: dummySource,
47+
}
48+
}
3949
func (d *DummySource) Extractors() []string { return []string{} }
4050
func (d *DummySource) AddExtractor(_ Extractor) error { return nil }
4151
func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {

pkg/epp/datalayer/config.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
// Config defines the configuration of EPP data layer, as the set of DataSources and
20+
// Extractors defined on them.
21+
type Config struct {
22+
Sources []DataSourceConfig // the data sources configured in the data layer
23+
}
24+
25+
// DataSourceConfig defines the configuration of a specific DataSource
26+
type DataSourceConfig struct {
27+
Plugin DataSource // the data source plugin instance
28+
Extractors []Extractor // extractors defined for the data source
29+
}

pkg/epp/datalayer/datasource.go

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ import (
2222
"fmt"
2323
"reflect"
2424
"sync"
25+
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2527
)
2628

2729
// DataSource provides raw data to registered Extractors.
2830
type DataSource interface {
29-
// Name of this data source.
30-
Name() string
31+
plugins.Plugin
3132
// Extractors returns a list of registered Extractor names.
3233
Extractors() []string
3334
// AddExtractor adds an extractor to the data source. Multiple
@@ -45,7 +46,7 @@ type DataSource interface {
4546

4647
// Extractor transforms raw data into structured attributes.
4748
type Extractor interface {
48-
Name() string
49+
plugins.Plugin
4950
// ExpectedType defines the type expected by the extractor.
5051
ExpectedInputType() reflect.Type
5152
// Extract transforms the raw data source output into a concrete structured
@@ -65,22 +66,12 @@ func (dsr *DataSourceRegistry) Register(src DataSource) error {
6566
if src == nil {
6667
return errors.New("unable to register a nil data source")
6768
}
68-
if _, loaded := dsr.sources.LoadOrStore(src.Name(), src); loaded {
69-
return fmt.Errorf("unable to register duplicate data source: %s", src.Name())
69+
if _, loaded := dsr.sources.LoadOrStore(src.TypedName().Name, src); loaded {
70+
return fmt.Errorf("unable to register duplicate data source: %s", src.TypedName().String())
7071
}
7172
return nil
7273
}
7374

74-
// GetNamedSource fetches a source by name.
75-
func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
76-
if val, ok := dsr.sources.Load(name); ok {
77-
if ds, ok := val.(DataSource); ok {
78-
return ds, true
79-
}
80-
}
81-
return nil, false
82-
}
83-
8475
// GetSources returns all registered sources.
8576
func (dsr *DataSourceRegistry) GetSources() []DataSource {
8677
var result []DataSource
@@ -100,21 +91,6 @@ func RegisterSource(src DataSource) error {
10091
return defaultDataSources.Register(src)
10192
}
10293

103-
// GetNamedSource returns a typed data source from the default registry.
104-
func GetNamedSource[T DataSource](name string) (T, bool) {
105-
v, ok := defaultDataSources.GetNamedSource(name)
106-
if !ok {
107-
var zero T
108-
return zero, false
109-
}
110-
src, ok := v.(T)
111-
if !ok {
112-
var zero T
113-
return zero, false
114-
}
115-
return src, true
116-
}
117-
11894
// GetSources returns the list of data sources registered in the default registry.
11995
func GetSources() []DataSource {
12096
return defaultDataSources.GetSources()

pkg/epp/datalayer/datasource_test.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,26 @@ import (
2222
"testing"
2323

2424
"github.com/stretchr/testify/assert"
25+
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
27+
)
28+
29+
const (
30+
testType = "test-ds-type"
2531
)
2632

2733
type mockDataSource struct {
28-
name string
34+
tn plugins.TypedName
2935
}
3036

31-
func (m *mockDataSource) Name() string { return m.name }
37+
func (m *mockDataSource) TypedName() plugins.TypedName { return m.tn }
3238
func (m *mockDataSource) Extractors() []string { return []string{} }
3339
func (m *mockDataSource) AddExtractor(_ Extractor) error { return nil }
3440
func (m *mockDataSource) Collect(_ context.Context, _ Endpoint) error { return nil }
3541

3642
func TestRegisterAndGetSource(t *testing.T) {
3743
reg := DataSourceRegistry{}
38-
ds := &mockDataSource{name: "test"}
44+
ds := &mockDataSource{tn: plugins.TypedName{Type: testType, Name: testType}}
3945

4046
err := reg.Register(ds)
4147
assert.NoError(t, err, "expected no error on first registration")
@@ -47,35 +53,25 @@ func TestRegisterAndGetSource(t *testing.T) {
4753
err = reg.Register(nil)
4854
assert.Error(t, err, "expected error on nil")
4955

50-
// Get by name
51-
got, found := reg.GetNamedSource("test")
52-
assert.True(t, found, "expected to find registered data source")
53-
assert.Equal(t, "test", got.Name())
54-
5556
// Get all sources
5657
all := reg.GetSources()
5758
assert.Len(t, all, 1)
58-
assert.Equal(t, "test", all[0].Name())
59+
assert.Equal(t, testType, all[0].TypedName().Type)
5960

6061
// Default registry
6162
err = RegisterSource(ds)
6263
assert.NoError(t, err, "expected no error on registration")
6364

64-
// Get by name
65-
got, found = GetNamedSource[*mockDataSource]("test")
66-
assert.True(t, found, "expected to find registered data source")
67-
assert.Equal(t, "test", got.Name())
68-
6965
// Get all sources
7066
all = GetSources()
7167
assert.Len(t, all, 1)
72-
assert.Equal(t, "test", all[0].Name())
68+
assert.Equal(t, testType, all[0].TypedName().Type)
7369
}
7470

75-
func TestGetNamedSourceWhenNotFound(t *testing.T) {
71+
func TestGetSourceWhenNoneAreRegistered(t *testing.T) {
7672
reg := DataSourceRegistry{}
77-
_, found := reg.GetNamedSource("missing")
78-
assert.False(t, found, "expected source to be missing")
73+
found := reg.GetSources()
74+
assert.Empty(t, found, "expected no sources to be returned")
7975
}
8076

8177
func TestValidateExtractorType(t *testing.T) {

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,17 @@ import (
2525
"sync"
2626

2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2829
)
2930

3031
const (
31-
DataSourceName = "metrics-data-source"
32+
DataSourceType = "metrics-data-source"
3233
)
3334

3435
// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
3536
// returning Prometheus formatted metrics for an endpoint.
3637
type DataSource struct {
38+
tn plugins.TypedName
3739
metricsScheme string // scheme to use in metrics URL
3840
metricsPath string // path to use in metrics URL
3941

@@ -42,10 +44,9 @@ type DataSource struct {
4244
}
4345

4446
// NewDataSource returns a new MSP compliant metrics data source, configured with
45-
// the provided client factory. If ClientFactory is nil, a default factory is used.
46-
// The Scheme, port and path are command line options. It should be noted that
47-
// a port value of zero is set if the command line is unspecified.
48-
func NewDataSource(metricsScheme string, metricsPath string, skipCertVerification bool, cl Client) *DataSource {
47+
// the provided client configuration.
48+
// The Scheme, path and certificate validation setting are command line options.
49+
func NewDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
4950
if metricsScheme == "https" {
5051
httpsTransport := baseTransport.Clone()
5152
httpsTransport.TLSClientConfig = &tls.Config{
@@ -54,33 +55,33 @@ func NewDataSource(metricsScheme string, metricsPath string, skipCertVerificatio
5455
defaultClient.Transport = httpsTransport
5556
}
5657

57-
if cl == nil {
58-
cl = defaultClient
59-
}
60-
6158
dataSrc := &DataSource{
59+
tn: plugins.TypedName{
60+
Type: DataSourceType,
61+
Name: DataSourceType,
62+
},
6263
metricsScheme: metricsScheme,
6364
metricsPath: metricsPath,
64-
client: cl,
65+
client: defaultClient,
6566
}
6667
return dataSrc
6768
}
6869

69-
// Name returns the metrics data source name.
70-
func (dataSrc *DataSource) Name() string {
71-
return DataSourceName
70+
// TypedName returns the metrics data source type and name.
71+
func (dataSrc *DataSource) TypedName() plugins.TypedName {
72+
return dataSrc.tn
7273
}
7374

7475
// Extractors returns a list of registered Extractor names.
7576
func (dataSrc *DataSource) Extractors() []string {
76-
names := []string{}
77+
extractors := []string{}
7778
dataSrc.extractors.Range(func(_, val any) bool {
7879
if ex, ok := val.(datalayer.Extractor); ok {
79-
names = append(names, ex.Name())
80+
extractors = append(extractors, ex.TypedName().String())
8081
}
8182
return true // continue iteration
8283
})
83-
return names
84+
return extractors
8485
}
8586

8687
// AddExtractor adds an extractor to the data source, validating it can process
@@ -89,8 +90,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
8990
if err := datalayer.ValidateExtractorType(PrometheusMetricType, extractor.ExpectedInputType()); err != nil {
9091
return err
9192
}
92-
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.Name(), extractor); loaded {
93-
return fmt.Errorf("attempt to add extractor with duplicate name %s to %s", extractor.Name(), dataSrc.Name())
93+
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.TypedName().Name, extractor); loaded {
94+
return fmt.Errorf("attempt to add duplicate extractor %s to %s", extractor.TypedName(), dataSrc.TypedName())
9495
}
9596
return nil
9697
}

pkg/epp/datalayer/metrics/datasource_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import (
2828
)
2929

3030
func TestDatasource(t *testing.T) {
31-
source := NewDataSource("https", "/metrics", true, nil)
31+
source := NewDataSource("https", "/metrics", true)
3232
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
3333
assert.Nil(t, err, "failed to create extractor")
3434

35-
name := source.Name()
36-
assert.Equal(t, DataSourceName, name)
35+
dsType := source.TypedName().Type
36+
assert.Equal(t, DataSourceType, dsType)
3737

3838
err = source.AddExtractor(extractor)
3939
assert.Nil(t, err, "failed to add extractor")
@@ -43,7 +43,7 @@ func TestDatasource(t *testing.T) {
4343

4444
extractors := source.Extractors()
4545
assert.Len(t, extractors, 1)
46-
assert.Equal(t, extractor.Name(), extractors[0])
46+
assert.Equal(t, extractor.TypedName().String(), extractors[0])
4747

4848
err = datalayer.RegisterSource(source)
4949
assert.Nil(t, err, "failed to register")

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ import (
3030

3131
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
3232
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
33+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3334
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3435
)
3536

3637
const (
37-
extractorName = "model-server-protocol-metrics"
38+
extractorType = "model-server-protocol-metrics"
3839

3940
// LoRA metrics based on MSP
4041
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
@@ -48,6 +49,7 @@ const (
4849
// Extractor implements the metrics extraction based on the model
4950
// server protocol standard.
5051
type Extractor struct {
52+
tn plugins.TypedName
5153
mapping *Mapping
5254
}
5355

@@ -72,13 +74,17 @@ func NewExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec s
7274
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
7375
}
7476
return &Extractor{
77+
tn: plugins.TypedName{
78+
Type: extractorType,
79+
Name: extractorType,
80+
},
7581
mapping: mapping,
7682
}, nil
7783
}
7884

79-
// Name returns the name of the metrics.Extractor.
80-
func (ext *Extractor) Name() string {
81-
return extractorName
85+
// TypedName returns the type and name of the metrics.Extractor.
86+
func (ext *Extractor) TypedName() plugins.TypedName {
87+
return ext.tn
8288
}
8389

8490
// ExpectedType defines the type expected by the metrics.Extractor - a

0 commit comments

Comments
 (0)