Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,22 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
return rawConfig, nil
}

// Return a function that can be used in the EPP Handle to list pod names.
func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
return func() []types.NamespacedName {
pods := ds.PodList(func(_ backendmetrics.PodMetrics) bool { return true })
names := make([]types.NamespacedName, 0, len(pods))

for _, p := range pods {
names = append(names, p.GetPod().NamespacedName)
}
return names
}
}

func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
logger := log.FromContext(ctx)
handle := plugins.NewEppHandle(ctx, ds.PodList)
handle := plugins.NewEppHandle(ctx, makePodListFunc(ds))
cfg, err := loader.LoadConfigPhaseTwo(rawConfig, handle, logger)

if err != nil {
Expand Down Expand Up @@ -604,8 +617,7 @@ func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
// create and register a metrics data source and extractor.
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
*modelServerMetricsPath,
*modelServerMetricsHttpsInsecureSkipVerify,
nil)
*modelServerMetricsHttpsInsecureSkipVerify)
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
Expand All @@ -624,7 +636,7 @@ func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
// TODO: this could be moved to the configuration loading functions once ported over.
sources := datalayer.GetSources()
for _, src := range sources {
logger.Info("data layer configuration", "source", src.Name(), "extractors", src.Extractors())
logger.Info("data layer configuration", "source", src.TypedName().String(), "extractors", src.Extractors())
}
factory := datalayer.NewEndpointFactory(sources, *refreshMetricsInterval)
return factory, nil
Expand Down
12 changes: 11 additions & 1 deletion pkg/epp/datalayer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

// --- Test Stubs ---
Expand All @@ -35,7 +36,16 @@ type DummySource struct {
callCount int64
}

func (d *DummySource) Name() string { return "test-dummy-data-source" }
const (
dummySource = "test-dummy-data-source"
)

func (d *DummySource) TypedName() plugins.TypedName {
return plugins.TypedName{
Type: dummySource,
Name: dummySource,
}
}
func (d *DummySource) Extractors() []string { return []string{} }
func (d *DummySource) AddExtractor(_ Extractor) error { return nil }
func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {
Expand Down
29 changes: 29 additions & 0 deletions pkg/epp/datalayer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

// Config defines the configuration of EPP data layer, as the set of DataSources and
// Extractors defined on them.
type Config struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this struct a prep for next PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct!
Mainly to confirm the interface between the datalayer and the configuration setting code so they can proceed in parallel.

Sources []DataSourceConfig // the data sources configured in the data layer
}

// DataSourceConfig defines the configuration of a specific DataSource
type DataSourceConfig struct {
Plugin DataSource // the data source plugin instance
Extractors []Extractor // extractors defined for the data source
}
36 changes: 6 additions & 30 deletions pkg/epp/datalayer/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"fmt"
"reflect"
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

// DataSource provides raw data to registered Extractors.
type DataSource interface {
// Name of this data source.
Name() string
plugins.Plugin
// Extractors returns a list of registered Extractor names.
Extractors() []string
// AddExtractor adds an extractor to the data source. Multiple
Expand All @@ -45,7 +46,7 @@ type DataSource interface {

// Extractor transforms raw data into structured attributes.
type Extractor interface {
Name() string
plugins.Plugin
// ExpectedType defines the type expected by the extractor.
ExpectedInputType() reflect.Type
// Extract transforms the raw data source output into a concrete structured
Expand All @@ -65,22 +66,12 @@ func (dsr *DataSourceRegistry) Register(src DataSource) error {
if src == nil {
return errors.New("unable to register a nil data source")
}
if _, loaded := dsr.sources.LoadOrStore(src.Name(), src); loaded {
return fmt.Errorf("unable to register duplicate data source: %s", src.Name())
if _, loaded := dsr.sources.LoadOrStore(src.TypedName().Name, src); loaded {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing.
generally in the codebase, plugins have TypeName, where Type is mandatory and Name is optional.
when using ConfigFile Name defaults to Type is not mentioned otherwise, but that is not the case if configured through code (e.g., in unit tests name remains empty).
Additionally, in other plugins - Type is guaranteed to be unique, while Name is not.

maybe we can use TypedName.String() as the key of the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was originally using Type as the key, went back and forth on this given @shmuelk feedback on usage for configuration file. You can see some of the discussion context in resolved conversations.

The plugin references in the configuration file use the plugin name as the reference key, hence the same is applied here.
At least in theory, you could have multiple sources or extractors of the same type registered (same as plugins).
If we want to change to use TypedName.String() should probaly do so across the entire config handling code.
@shmuelk ^^

return fmt.Errorf("unable to register duplicate data source: %s", src.TypedName().String())
}
return nil
}

// GetNamedSource fetches a source by name.
func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
if val, ok := dsr.sources.Load(name); ok {
if ds, ok := val.(DataSource); ok {
return ds, true
}
}
return nil, false
}

// GetSources returns all registered sources.
func (dsr *DataSourceRegistry) GetSources() []DataSource {
var result []DataSource
Expand All @@ -100,21 +91,6 @@ func RegisterSource(src DataSource) error {
return defaultDataSources.Register(src)
}

// GetNamedSource returns a typed data source from the default registry.
func GetNamedSource[T DataSource](name string) (T, bool) {
v, ok := defaultDataSources.GetNamedSource(name)
if !ok {
var zero T
return zero, false
}
src, ok := v.(T)
if !ok {
var zero T
return zero, false
}
return src, true
}

// GetSources returns the list of data sources registered in the default registry.
func GetSources() []DataSource {
return defaultDataSources.GetSources()
Expand Down
32 changes: 14 additions & 18 deletions pkg/epp/datalayer/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

const (
testType = "test-ds-type"
)

type mockDataSource struct {
name string
tn plugins.TypedName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typedName?

}

func (m *mockDataSource) Name() string { return m.name }
func (m *mockDataSource) TypedName() plugins.TypedName { return m.tn }
func (m *mockDataSource) Extractors() []string { return []string{} }
func (m *mockDataSource) AddExtractor(_ Extractor) error { return nil }
func (m *mockDataSource) Collect(_ context.Context, _ Endpoint) error { return nil }

func TestRegisterAndGetSource(t *testing.T) {
reg := DataSourceRegistry{}
ds := &mockDataSource{name: "test"}
ds := &mockDataSource{tn: plugins.TypedName{Type: testType, Name: testType}}

err := reg.Register(ds)
assert.NoError(t, err, "expected no error on first registration")
Expand All @@ -47,35 +53,25 @@ func TestRegisterAndGetSource(t *testing.T) {
err = reg.Register(nil)
assert.Error(t, err, "expected error on nil")

// Get by name
got, found := reg.GetNamedSource("test")
assert.True(t, found, "expected to find registered data source")
assert.Equal(t, "test", got.Name())

// Get all sources
all := reg.GetSources()
assert.Len(t, all, 1)
assert.Equal(t, "test", all[0].Name())
assert.Equal(t, testType, all[0].TypedName().Type)

// Default registry
err = RegisterSource(ds)
assert.NoError(t, err, "expected no error on registration")

// Get by name
got, found = GetNamedSource[*mockDataSource]("test")
assert.True(t, found, "expected to find registered data source")
assert.Equal(t, "test", got.Name())

// Get all sources
all = GetSources()
assert.Len(t, all, 1)
assert.Equal(t, "test", all[0].Name())
assert.Equal(t, testType, all[0].TypedName().Type)
}

func TestGetNamedSourceWhenNotFound(t *testing.T) {
func TestGetSourceWhenNoneAreRegistered(t *testing.T) {
reg := DataSourceRegistry{}
_, found := reg.GetNamedSource("missing")
assert.False(t, found, "expected source to be missing")
found := reg.GetSources()
assert.Empty(t, found, "expected no sources to be returned")
}

func TestValidateExtractorType(t *testing.T) {
Expand Down
37 changes: 19 additions & 18 deletions pkg/epp/datalayer/metrics/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ import (
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

const (
DataSourceName = "metrics-data-source"
DataSourceType = "metrics-data-source"
)

// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
// returning Prometheus formatted metrics for an endpoint.
type DataSource struct {
tn plugins.TypedName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consistency with other plugins?

Suggested change
tn plugins.TypedName
typedName plugins.TypedName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current plugins use both typedName (most) and tn (e.g., lora affinity, slo scorer, some mocks in test files).
The scope (i.e., distance between where defined and used) is small enough to use tn here.
If you think it's important, can open another PR to standardize on on typedName across the entire code base.

metricsScheme string // scheme to use in metrics URL
metricsPath string // path to use in metrics URL

Expand All @@ -42,10 +44,9 @@ type DataSource struct {
}

// NewDataSource returns a new MSP compliant metrics data source, configured with
// the provided client factory. If ClientFactory is nil, a default factory is used.
// The Scheme, port and path are command line options. It should be noted that
// a port value of zero is set if the command line is unspecified.
func NewDataSource(metricsScheme string, metricsPath string, skipCertVerification bool, cl Client) *DataSource {
// the provided client configuration.
// The Scheme, path and certificate validation setting are command line options.
func NewDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
if metricsScheme == "https" {
httpsTransport := baseTransport.Clone()
httpsTransport.TLSClientConfig = &tls.Config{
Expand All @@ -54,33 +55,33 @@ func NewDataSource(metricsScheme string, metricsPath string, skipCertVerificatio
defaultClient.Transport = httpsTransport
}

if cl == nil {
cl = defaultClient
}

dataSrc := &DataSource{
tn: plugins.TypedName{
Type: DataSourceType,
Name: DataSourceType,
},
metricsScheme: metricsScheme,
metricsPath: metricsPath,
client: cl,
client: defaultClient,
}
return dataSrc
}

// Name returns the metrics data source name.
func (dataSrc *DataSource) Name() string {
return DataSourceName
// TypedName returns the metrics data source type and name.
func (dataSrc *DataSource) TypedName() plugins.TypedName {
return dataSrc.tn
}

// Extractors returns a list of registered Extractor names.
func (dataSrc *DataSource) Extractors() []string {
names := []string{}
extractors := []string{}
dataSrc.extractors.Range(func(_, val any) bool {
if ex, ok := val.(datalayer.Extractor); ok {
names = append(names, ex.Name())
extractors = append(extractors, ex.TypedName().String())
}
return true // continue iteration
})
return names
return extractors
}

// AddExtractor adds an extractor to the data source, validating it can process
Expand All @@ -89,8 +90,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
if err := datalayer.ValidateExtractorType(PrometheusMetricType, extractor.ExpectedInputType()); err != nil {
return err
}
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.Name(), extractor); loaded {
return fmt.Errorf("attempt to add extractor with duplicate name %s to %s", extractor.Name(), dataSrc.Name())
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.TypedName().Name, extractor); loaded {
return fmt.Errorf("attempt to add duplicate extractor %s to %s", extractor.TypedName(), dataSrc.TypedName())
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/epp/datalayer/metrics/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import (
)

func TestDatasource(t *testing.T) {
source := NewDataSource("https", "/metrics", true, nil)
source := NewDataSource("https", "/metrics", true)
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
assert.Nil(t, err, "failed to create extractor")

name := source.Name()
assert.Equal(t, DataSourceName, name)
dsType := source.TypedName().Type
assert.Equal(t, DataSourceType, dsType)

err = source.AddExtractor(extractor)
assert.Nil(t, err, "failed to add extractor")
Expand All @@ -43,7 +43,7 @@ func TestDatasource(t *testing.T) {

extractors := source.Extractors()
assert.Len(t, extractors, 1)
assert.Equal(t, extractor.Name(), extractors[0])
assert.Equal(t, extractor.TypedName().String(), extractors[0])

err = datalayer.RegisterSource(source)
assert.Nil(t, err, "failed to register")
Expand Down
14 changes: 10 additions & 4 deletions pkg/epp/datalayer/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const (
extractorName = "model-server-protocol-metrics"
extractorType = "model-server-protocol-metrics"

// LoRA metrics based on MSP
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
Expand All @@ -48,6 +49,7 @@ const (
// Extractor implements the metrics extraction based on the model
// server protocol standard.
type Extractor struct {
tn plugins.TypedName
mapping *Mapping
}

Expand All @@ -72,13 +74,17 @@ func NewExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec s
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
}
return &Extractor{
tn: plugins.TypedName{
Type: extractorType,
Name: extractorType,
},
mapping: mapping,
}, nil
}

// Name returns the name of the metrics.Extractor.
func (ext *Extractor) Name() string {
return extractorName
// TypedName returns the type and name of the metrics.Extractor.
func (ext *Extractor) TypedName() plugins.TypedName {
return ext.tn
}

// ExpectedType defines the type expected by the metrics.Extractor - a
Expand Down
Loading