Skip to content

Commit 091ebea

Browse files
authored
Enable pluggable datalayer as experimental feature (#1391)
* enable global metrics logging Signed-off-by: Etai Lev Ran <[email protected]> * enable v2 data layer Signed-off-by: Etai Lev Ran <[email protected]> * move feature flag to cmd Signed-off-by: Etai Lev Ran <[email protected]> * remove logger from collector Signed-off-by: Etai Lev Ran <[email protected]> * return a typed data source using generics Signed-off-by: Etai Lev Ran <[email protected]> * document client configuration constants Signed-off-by: Etai Lev Ran <[email protected]> * retrieve a typed data source Signed-off-by: Etai Lev Ran <[email protected]> * use structured logging Signed-off-by: Etai Lev Ran <[email protected]> * commandline specified port value has precedence Signed-off-by: Etai Lev Ran <[email protected]> * pass data sources in ctor Signed-off-by: Etai Lev Ran <[email protected]> * address remaining review comments Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent bd26722 commit 091ebea

File tree

9 files changed

+378
-51
lines changed

9 files changed

+378
-51
lines changed

cmd/epp/runner/runner.go

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ import (
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
4747
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4848
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
50+
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
4951
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
5052
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
5153
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
@@ -59,10 +61,17 @@ import (
5961
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
6062
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
6163
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
64+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
6265
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
6366
"sigs.k8s.io/gateway-api-inference-extension/version"
6467
)
6568

69+
const (
70+
// enableExperimentalDatalayerV2 defines the environment variable
71+
// used as feature flag for the pluggable data layer.
72+
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
73+
)
74+
6675
var (
6776
grpcPort = flag.Int(
6877
"grpc-port",
@@ -245,40 +254,12 @@ func (r *Runner) Run(ctx context.Context) error {
245254
}
246255

247256
// --- Setup Datastore ---
248-
mapping, err := backendmetrics.NewMetricMapping(
249-
*totalQueuedRequestsMetric,
250-
*kvCacheUsagePercentageMetric,
251-
*loraInfoMetric,
252-
)
257+
useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog)
258+
epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2)
253259
if err != nil {
254-
setupLog.Error(err, "Failed to create metric mapping from flags.")
255260
return err
256261
}
257-
verifyMetricMapping(*mapping, setupLog)
258-
259-
var metricsHttpClient *http.Client
260-
if *modelServerMetricsScheme == "https" {
261-
metricsHttpClient = &http.Client{
262-
Transport: &http.Transport{
263-
TLSClientConfig: &tls.Config{
264-
InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify,
265-
},
266-
},
267-
}
268-
} else {
269-
metricsHttpClient = http.DefaultClient
270-
}
271-
272-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{
273-
MetricMapping: mapping,
274-
ModelServerMetricsPort: int32(*modelServerMetricsPort),
275-
ModelServerMetricsPath: *modelServerMetricsPath,
276-
ModelServerMetricsScheme: *modelServerMetricsScheme,
277-
Client: metricsHttpClient,
278-
},
279-
*refreshMetricsInterval)
280-
281-
datastore := datastore.NewDatastore(ctx, pmf)
262+
datastore := datastore.NewDatastore(ctx, epf)
282263

283264
// --- Setup Metrics Server ---
284265
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
@@ -371,6 +352,7 @@ func (r *Runner) Run(ctx context.Context) error {
371352
MetricsStalenessThreshold: *metricsStalenessThreshold,
372353
Director: director,
373354
SaturationDetector: saturationDetector,
355+
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
374356
}
375357
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
376358
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -446,6 +428,81 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
446428
return nil
447429
}
448430

431+
func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) {
432+
if useExperimentalDatalayer {
433+
return setupDatalayer()
434+
}
435+
436+
if len(datalayer.GetSources()) != 0 {
437+
setupLog.Info("data sources registered but pluggable datalayer is disabled")
438+
}
439+
return setupMetricsV1(setupLog)
440+
}
441+
442+
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
443+
mapping, err := backendmetrics.NewMetricMapping(
444+
*totalQueuedRequestsMetric,
445+
*kvCacheUsagePercentageMetric,
446+
*loraInfoMetric,
447+
)
448+
if err != nil {
449+
setupLog.Error(err, "Failed to create metric mapping from flags.")
450+
return nil, err
451+
}
452+
verifyMetricMapping(*mapping, setupLog)
453+
454+
var metricsHttpClient *http.Client
455+
if *modelServerMetricsScheme == "https" {
456+
metricsHttpClient = &http.Client{
457+
Transport: &http.Transport{
458+
TLSClientConfig: &tls.Config{
459+
InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify,
460+
},
461+
},
462+
}
463+
} else {
464+
metricsHttpClient = http.DefaultClient
465+
}
466+
467+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{
468+
MetricMapping: mapping,
469+
ModelServerMetricsPort: int32(*modelServerMetricsPort),
470+
ModelServerMetricsPath: *modelServerMetricsPath,
471+
ModelServerMetricsScheme: *modelServerMetricsScheme,
472+
Client: metricsHttpClient,
473+
},
474+
*refreshMetricsInterval)
475+
return pmf, nil
476+
}
477+
478+
func setupDatalayer() (datalayer.EndpointFactory, error) {
479+
// create and register a metrics data source and extractor. In the future,
480+
// data sources and extractors might be configured via a file. Once done,
481+
// this (and registering the sources with the endpoint factory) should
482+
// be moved accordingly.
483+
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
484+
int32(*modelServerMetricsPort), // start with (optional) command line port value
485+
*modelServerMetricsPath,
486+
*modelServerMetricsHttpsInsecureSkipVerify,
487+
nil)
488+
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
489+
*kvCacheUsagePercentageMetric,
490+
*loraInfoMetric)
491+
492+
if err != nil {
493+
return nil, err
494+
}
495+
if err := source.AddExtractor(extractor); err != nil {
496+
return nil, err
497+
}
498+
if err := datalayer.RegisterSource(source); err != nil {
499+
return nil, err
500+
}
501+
502+
factory := datalayer.NewEndpointFactory(datalayer.GetSources(), *refreshMetricsInterval)
503+
return factory, nil
504+
}
505+
449506
func initLogging(opts *zap.Options) {
450507
// Unless -zap-log-level is explicitly set, use -v
451508
useV := true

pkg/epp/datalayer/collector.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,18 @@ func NewCollector() *Collector {
8282
}
8383

8484
// Start initiates data source collection for the endpoint.
85+
// TODO: pass PoolInfo for backward compatibility
8586
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
8687
var ready chan struct{}
8788
started := false
8889

8990
c.startOnce.Do(func() {
91+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
9092
c.ctx, c.cancel = context.WithCancel(ctx)
9193
started = true
9294
ready = make(chan struct{})
9395

9496
go func(endpoint Endpoint, sources []DataSource) {
95-
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
9697
logger.V(logging.DEFAULT).Info("starting collection")
9798

9899
defer func() {
@@ -107,6 +108,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
107108
case <-c.ctx.Done(): // per endpoint context cancelled
108109
return
109110
case <-ticker.Channel():
111+
// TODO: do not collect if there's no pool specified?
110112
for _, src := range sources {
111113
ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout)
112114
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector?

pkg/epp/datalayer/datasource.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,27 @@ func (dsr *DataSourceRegistry) GetSources() []DataSource {
9292

9393
// --- default registry accessors ---
9494

95+
// RegisterSource adds a new data source to the default registry.
9596
func RegisterSource(src DataSource) error {
9697
return defaultDataSources.Register(src)
9798
}
9899

99-
func GetNamedSource(name string) (DataSource, bool) {
100-
return defaultDataSources.GetNamedSource(name)
100+
// GetNamedSource returns a typed data source from the default registry.
101+
func GetNamedSource[T DataSource](name string) (T, bool) {
102+
v, ok := defaultDataSources.GetNamedSource(name)
103+
if !ok {
104+
var zero T
105+
return zero, false
106+
}
107+
src, ok := v.(T)
108+
if !ok {
109+
var zero T
110+
return zero, false
111+
}
112+
return src, true
101113
}
102114

115+
// GetSources returns the list of data sources registered in the default registry.
103116
func GetSources() []DataSource {
104117
return defaultDataSources.GetSources()
105118
}

pkg/epp/datalayer/factory.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ package datalayer
1818

1919
import (
2020
"context"
21+
"sync"
22+
"time"
2123

2224
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
2327

2428
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
2529
)
@@ -44,3 +48,74 @@ type EndpointFactory interface {
4448
NewEndpoint(parent context.Context, inpod *corev1.Pod, poolinfo PoolInfo) Endpoint
4549
ReleaseEndpoint(ep Endpoint)
4650
}
51+
52+
// EndpointLifecycle manages the life cycle (creation and termination) of
53+
// endpoints.
54+
type EndpointLifecycle struct {
55+
sources []DataSource // data sources for collectors
56+
collectors sync.Map // collectors map. key: Pod namespaced name, value: *Collector
57+
refreshInterval time.Duration // metrics refresh interval
58+
}
59+
60+
// NewEndpointFactory returns a new endpoint for factory, managing collectors for
61+
// its endpoints. This function assumes that sources are not modified afterwards.
62+
func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Duration) *EndpointLifecycle {
63+
return &EndpointLifecycle{
64+
sources: sources,
65+
collectors: sync.Map{},
66+
refreshInterval: refreshMetricsInterval,
67+
}
68+
}
69+
70+
// NewEndpoint implements EndpointFactory.NewEndpoint.
71+
// Creates a new endpoint and starts its associated collector with its own ticker.
72+
// Guards against multiple concurrent calls for the same endpoint.
73+
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *corev1.Pod, _ PoolInfo) Endpoint {
74+
key := types.NamespacedName{Namespace: inpod.Namespace, Name: inpod.Name}
75+
logger := log.FromContext(parent).WithValues("pod", key)
76+
77+
if _, ok := lc.collectors.Load(key); ok {
78+
logger.Info("collector already running for endpoint", "endpoint", key)
79+
return nil
80+
}
81+
82+
endpoint := NewEndpoint()
83+
endpoint.UpdatePod(inpod)
84+
collector := NewCollector() // for full backward compatibility, set the logger and poolinfo
85+
86+
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
87+
// another goroutine already created and stored a collector for this endpoint.
88+
// No need to start the new collector.
89+
logger.Info("collector already running for endpoint", "endpoint", key)
90+
return nil
91+
}
92+
93+
ticker := NewTimeTicker(lc.refreshInterval)
94+
if err := collector.Start(parent, ticker, endpoint, lc.sources); err != nil {
95+
logger.Error(err, "failed to start collector for endpoint", "endpoint", key)
96+
lc.collectors.Delete(key)
97+
}
98+
99+
return endpoint
100+
}
101+
102+
// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
103+
// Stops the collector and cleans up resources for the endpoint
104+
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
105+
key := ep.GetPod().GetNamespacedName()
106+
107+
if value, ok := lc.collectors.LoadAndDelete(key); ok {
108+
collector := value.(*Collector)
109+
_ = collector.Stop()
110+
}
111+
}
112+
113+
// Shutdown gracefully stops all collectors and cleans up all resources.
114+
func (lc *EndpointLifecycle) Shutdown() {
115+
lc.collectors.Range(func(key, value any) bool {
116+
collector := value.(*Collector)
117+
_ = collector.Stop()
118+
lc.collectors.Delete(key)
119+
return true
120+
})
121+
}

pkg/epp/datalayer/metrics/client.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,31 @@ type Client interface {
3333
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error)
3434
}
3535

36-
// -- package implementations --
3736
const (
37+
// the maximum idle connection count is shared by all endpoints. The value is
38+
// set high to ensure the number of idle connection is above the expected
39+
// endpoint count. Setting it too low would cause thrashing of the idle connection
40+
// pool and incur higher overheads for every GET (e.g., socket initiation, certificate
41+
// exchange, connections in timed wait state, etc.).
3842
maxIdleConnections = 5000
39-
maxIdleTime = 10 * time.Second
40-
timeout = 10 * time.Second
43+
maxIdleTime = 10 * time.Second // once a endpoint goes down, allow closing.
44+
timeout = 10 * time.Second // mostly guard against unresponsive endpoints.
45+
// allow some grace when connections are not made idle immediately (e.g., parsing
46+
// and updating might take some time). This allows maintaining up to two idle connections
47+
// per endpoint (defined as scheme://host:port).
48+
maxIdleConnsPerHost = 2
4149
)
4250

4351
var (
52+
baseTransport = &http.Transport{
53+
MaxIdleConns: maxIdleConnections,
54+
MaxIdleConnsPerHost: maxIdleConnsPerHost,
55+
// TODO: set additional timeouts, transport options, etc.
56+
}
4457
defaultClient = &client{
4558
Client: http.Client{
46-
Timeout: timeout,
47-
Transport: &http.Transport{
48-
MaxIdleConns: maxIdleConnections,
49-
MaxIdleConnsPerHost: 4, // host is defined as scheme://host:port
50-
},
51-
// TODO: set additional timeouts, transport options, etc.
59+
Timeout: timeout,
60+
Transport: baseTransport,
5261
},
5362
}
5463
)

0 commit comments

Comments
 (0)