Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/prometheus/client_golang v1.23.0
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.65.0
github.com/prometheus/prometheus v0.305.0
github.com/stretchr/testify v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -43,14 +44,15 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand All @@ -64,9 +66,10 @@ require (
github.com/google/btree v1.1.3 // indirect
github.com/google/cel-go v0.23.2 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/huandu/xstrings v1.3.3 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand All @@ -90,30 +93,31 @@ require (
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 // indirect
golang.org/x/mod v0.26.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/term v0.33.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.35.0 // indirect
golang.org/x/tools/go/expect v0.1.1-deprecated // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
123 changes: 99 additions & 24 deletions go.sum

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions pkg/epp/datalayer/attributemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ type AttributeMap interface {
Put(string, Cloneable)
Get(string) (Cloneable, bool)
Keys() []string
Clone() *Attributes
}

// Attributes provides a goroutine-safe implementation of AttributeMap.
type Attributes struct {
data sync.Map
data sync.Map // key: attribute name (string), value: attribute value (opaque, Cloneable)
}

// NewAttributes returns a new instance of Attributes.
Expand Down Expand Up @@ -76,7 +75,7 @@ func (a *Attributes) Keys() []string {
return keys
}

// Clone creates a deep copy of the entire Attributes map.
// Clone creates a deep copy of the entire attribute map.
func (a *Attributes) Clone() *Attributes {
clone := NewAttributes()
a.data.Range(func(key, value any) bool {
Expand Down
137 changes: 137 additions & 0 deletions pkg/epp/datalayer/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"context"
"errors"
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// TODO:
// currently the data store is expected to manage the state of multiple
// Collectors (e.g., using sync.Map mapping pod to its Collector). Alternatively,
// this can be encapsulated in this file, providing the data store with an interface
// to only update on endpoint addition/change and deletion. This can also be used
// to centrally track statistics such errors, active routines, etc.

const (
defaultCollectionTimeout = time.Second
)

// Ticker implements a time source for periodic invocation.
// The Ticker is passed in as parameter a Collector to allow control over time
// progress in tests, ensuring tests are deterministic and fast.
type Ticker interface {
Channel() <-chan time.Time
Stop()
}

// TimeTicker implements a Ticker based on time.Ticker.
type TimeTicker struct {
*time.Ticker
}

// NewTimeTicker returns a new time.Ticker with the configured duration.
func NewTimeTicker(d time.Duration) Ticker {
return &TimeTicker{
Ticker: time.NewTicker(d),
}
}

// Channel exposes the ticker's channel.
func (t *TimeTicker) Channel() <-chan time.Time {
return t.C
}

// Collector runs the data collection for a single endpoint.
type Collector struct {
// per-endpoint context and cancellation
ctx context.Context
cancel context.CancelFunc

// goroutine management
startOnce sync.Once
stopOnce sync.Once

// TODO: optional metrics tracking collection (e.g., errors, invocations, ...)
}

// NewCollector returns a new collector.
func NewCollector() *Collector {
return &Collector{}
}

// Start initiates data source collection for the endpoint.
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
started := false
c.startOnce.Do(func() {
c.ctx, c.cancel = context.WithCancel(ctx)
started = true

go func(endpoint Endpoint, sources []DataSource) {
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
logger.V(logging.DEFAULT).Info("starting collection")

defer func() {
logger.V(logging.DEFAULT).Info("terminating collection")
ticker.Stop()
}()

for {
select {
case <-c.ctx.Done(): // per endpoint context cancelled
return
case <-ticker.Channel():
for _, src := range sources {
ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout)
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector?
cancel() // release the ctx timeout resources
}
}
}
}(ep, sources)
})

if !started {
return errors.New("collector start called multiple times")
}
return nil
}

// Stop terminates the collector.
func (c *Collector) Stop() error {
if c.ctx == nil || c.cancel == nil {
return errors.New("collector stop called before start")
}

stopped := false
c.stopOnce.Do(func() {
stopped = true
c.cancel()
})

if !stopped {
return errors.New("collector stop called multiple times")
}
return nil
}
130 changes: 130 additions & 0 deletions pkg/epp/datalayer/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/mocks"
)

// --- Test Stubs ---

type DummySource struct {
callCount int64
}

func (d *DummySource) Name() string { return "test-dummy-data-source" }
func (d *DummySource) AddExtractor(_ Extractor) error { return nil }
func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {
atomic.AddInt64(&d.callCount, 1)
return nil
}

func defaultEndpoint() Endpoint {
ms := NewEndpoint()
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-name",
Namespace: "default",
},
Status: corev1.PodStatus{
PodIP: "1.2.3.4",
},
}
ms.UpdatePod(pod)
return ms
}

// --- Tests ---

var (
endpoint = defaultEndpoint()
sources = []DataSource{&DummySource{}}
)

func TestCollectorCanStartOnlyOnce(t *testing.T) {
c := NewCollector()
ctx := context.Background()
ticker := mocks.NewTicker()

err := c.Start(ctx, ticker, endpoint, sources)
require.NoError(t, err, "first Start call should succeed")

err = c.Start(ctx, ticker, endpoint, sources)
assert.Error(t, err, "multiple collector start should error")
}

func TestCollectorStopBeforeStartIsAnError(t *testing.T) {
c := NewCollector()
err := c.Stop()
assert.Error(t, err, "collector stop called before start should error")
}

func TestCollectorCanStopOnlyOnce(t *testing.T) {
c := NewCollector()
ctx := context.Background()
ticker := mocks.NewTicker()

require.NoError(t, c.Start(ctx, ticker, endpoint, sources))
require.NoError(t, c.Stop(), "first Stop should succeed")
assert.Error(t, c.Stop(), "second Stop should fail")
}

func TestCollectorCollectsOnTicks(t *testing.T) {
source := &DummySource{}
c := NewCollector()
ticker := mocks.NewTicker()
ctx := context.Background()
require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))

ticker.Tick()
ticker.Tick()
time.Sleep(20 * time.Millisecond) // let collector process the ticks

got := atomic.LoadInt64(&source.callCount)
want := int64(2)
assert.Equal(t, want, got, "call count mismatch")
require.NoError(t, c.Stop())
}

func TestCollectorStopCancelsContext(t *testing.T) {
source := &DummySource{}
c := NewCollector()
ticker := mocks.NewTicker()
ctx := context.Background()

require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))
ticker.Tick() // should be processed
time.Sleep(20 * time.Millisecond)

require.NoError(t, c.Stop())
before := atomic.LoadInt64(&source.callCount)

ticker.Tick()
time.Sleep(20 * time.Millisecond) // let collector run again
after := atomic.LoadInt64(&source.callCount)
assert.Equal(t, before, after, "call count changed after stop")
}
5 changes: 3 additions & 2 deletions pkg/epp/datalayer/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package datalayer

import (
"context"
"errors"
"fmt"
"reflect"
Expand All @@ -36,7 +37,7 @@ type DataSource interface {
// Collect is triggered by the data layer framework to fetch potentially new
// data for an endpoint. Collect calls registered Extractors to convert the
// raw data into structured attributes.
Collect(ep Endpoint)
Collect(ctx context.Context, ep Endpoint) error
}

// Extractor transforms raw data into structured attributes.
Expand All @@ -46,7 +47,7 @@ type Extractor interface {
ExpectedInputType() reflect.Type
// Extract transforms the raw data source output into a concrete structured
// attribute, stored on the given endpoint.
Extract(data any, ep Endpoint)
Extract(ctx context.Context, data any, ep Endpoint) error
}

var defaultDataSources = DataSourceRegistry{}
Expand Down
Loading