Skip to content

Commit 809ed0c

Browse files
committed
review comments on use of client
Signed-off-by: Etai Lev Ran <[email protected]>
1 parent 77a2bca commit 809ed0c

File tree

4 files changed

+51
-96
lines changed

4 files changed

+51
-96
lines changed

pkg/epp/datalayer/collector.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121
"errors"
2222
"sync"
2323
"time"
24+
25+
"sigs.k8s.io/controller-runtime/pkg/log"
26+
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2428
)
2529

2630
// TODO:
@@ -37,7 +41,6 @@ const (
3741
// Ticker implements a time source for periodic invocation.
3842
// The Ticker is passed in as parameter a Collector to allow control over time
3943
// progress in tests, ensuring tests are deterministic and fast.
40-
4144
type Ticker interface {
4245
Channel() <-chan time.Time
4346
Stop()
@@ -86,8 +89,11 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
8689
started = true
8790

8891
go func(endpoint Endpoint, sources []DataSource) {
92+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
93+
logger.V(logging.DEFAULT).Info("starting collection")
94+
8995
defer func() {
90-
// TODO: log end of collection for endpoint
96+
logger.V(logging.DEFAULT).Info("terminating collection")
9197
ticker.Stop()
9298
}()
9399

@@ -98,7 +104,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
98104
case <-ticker.Channel():
99105
for _, src := range sources {
100106
ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout)
101-
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector
107+
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector?
102108
cancel() // release the ctx timeout resources
103109
}
104110
}

pkg/epp/datalayer/collector_test.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424

2525
"github.com/stretchr/testify/assert"
2626
"github.com/stretchr/testify/require"
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2729

2830
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/mocks"
2931
)
@@ -41,10 +43,25 @@ func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {
4143
return nil
4244
}
4345

46+
func defaultEndpoint() Endpoint {
47+
ms := NewEndpoint()
48+
pod := &corev1.Pod{
49+
ObjectMeta: metav1.ObjectMeta{
50+
Name: "pod-name",
51+
Namespace: "default",
52+
},
53+
Status: corev1.PodStatus{
54+
PodIP: "1.2.3.4",
55+
},
56+
}
57+
ms.UpdatePod(pod)
58+
return ms
59+
}
60+
4461
// --- Tests ---
4562

4663
var (
47-
endpoint = NewEndpoint()
64+
endpoint = defaultEndpoint()
4865
sources = []DataSource{&DummySource{}}
4966
)
5067

pkg/epp/datalayer/metrics/client.go

Lines changed: 17 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"fmt"
2222
"net/http"
2323
"net/url"
24-
"sync"
2524
"time"
2625

2726
"github.com/prometheus/common/expfmt"
@@ -34,98 +33,36 @@ type Client interface {
3433
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error)
3534
}
3635

37-
// ClientFactory returns a Client suitable for an endpoint.
38-
// Implementations may return a new Client each time, or use a cached
39-
// copy for optimized retrieval.
40-
type ClientFactory interface {
41-
GetClientForEndpoint(ep datalayer.Addressable) (Client, error)
42-
}
43-
44-
// GetDefaultClientFactory returns a default implementation of the
45-
// ClientFactory, which caches and reuses client across calls.
46-
func GetDefaultClientFactory() ClientFactory {
47-
return defaultClientFactory
48-
}
49-
5036
// -- package implementations --
51-
var (
52-
cleanupTick = 30 * time.Second
53-
maxIdleTime = time.Minute
54-
defaultClientFactory = newClientFactory()
37+
const (
38+
maxIdleConnections = 5000
39+
maxIdleTime = 10 * time.Second
40+
timeout = 10 * time.Second
5541
)
5642

57-
type client struct {
58-
cl *http.Client
59-
lastUsed time.Time
60-
}
61-
62-
type clientmap struct {
63-
cleanupOnce sync.Once
64-
clients sync.Map // key: target (Pod) IP address, value: (cached) HTTP client
65-
}
66-
67-
func newClientFactory() *clientmap {
68-
clm := &clientmap{}
69-
clm.startCleanupGoroutine()
70-
return clm
71-
}
72-
73-
func (clm *clientmap) startCleanupGoroutine() {
74-
clm.cleanupOnce.Do(func() {
75-
go func() {
76-
for {
77-
time.Sleep(cleanupTick)
78-
now := time.Now()
79-
80-
clm.clients.Range(func(key, value any) bool {
81-
entry := value.(*client)
82-
if now.Sub(entry.lastUsed) > maxIdleTime {
83-
clm.clients.Delete(key)
84-
}
85-
return true
86-
})
87-
}
88-
}()
89-
})
90-
}
91-
92-
func (clm *clientmap) GetClientForEndpoint(ep datalayer.Addressable) (Client, error) {
93-
id := ep.GetIPAddress()
94-
95-
if value, found := clm.clients.Load(id); found {
96-
cl, ok := value.(*client)
97-
if !ok {
98-
return nil, fmt.Errorf("invalid client stored for %s(%s)", id, ep.GetNamespacedName().String())
99-
}
100-
return cl, nil
101-
}
102-
103-
value, _ := clm.clients.LoadOrStore(id, newClient()) // if stored, will return the new value
104-
cl, ok := value.(*client)
105-
if !ok {
106-
return nil, fmt.Errorf("invalid client stored for %s(%s)", id, ep.GetNamespacedName().String())
107-
}
108-
return cl, nil
109-
}
110-
111-
func newClient() *client {
112-
return &client{
113-
cl: &http.Client{
114-
Timeout: 10 * time.Second,
43+
var (
44+
defaultClient = &client{
45+
Client: http.Client{
46+
Timeout: timeout,
47+
Transport: &http.Transport{
48+
MaxIdleConns: maxIdleConnections,
49+
MaxIdleConnsPerHost: 4, // host is defined as scheme://host:port
50+
},
11551
// TODO: set additional timeouts, transport options, etc.
11652
},
117-
lastUsed: time.Now(),
11853
}
54+
)
55+
56+
type client struct {
57+
http.Client
11958
}
12059

12160
func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error) {
122-
cl.lastUsed = time.Now()
123-
12461
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
12562
if err != nil {
12663
return nil, fmt.Errorf("failed to create request: %v", err)
12764
}
128-
resp, err := cl.cl.Do(req)
65+
resp, err := defaultClient.Do(req)
12966
if err != nil {
13067
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", ep.GetNamespacedName(), err)
13168
}

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,21 @@ type DataSource struct {
4040
metricsPort atomic.Pointer[string] // target port to use in metrics URL
4141
metricsPath string // path to use in metrics URL
4242

43-
clients ClientFactory
43+
client Client // client (e.g. a wrapped http.Client) used to get metrics
4444
extractors sync.Map // key: name, value: extractor
4545
}
4646

4747
// NewDataSource returns a new MSP compliant metrics data source, configured with the provided
4848
// client factory. If ClientFactory is nil, a default factory is used.
49-
func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, clf ClientFactory) *DataSource {
50-
if clf == nil {
51-
clf = GetDefaultClientFactory()
49+
func NewDataSource(metricsScheme string, metricsPort int32, metricsPath string, cl Client) *DataSource {
50+
if cl == nil {
51+
cl = defaultClient
5252
}
5353

5454
dataSrc := &DataSource{
5555
metricsScheme: metricsScheme,
5656
metricsPath: metricsPath,
57-
clients: clf,
57+
client: cl,
5858
}
5959
dataSrc.SetPort(metricsPort)
6060
return dataSrc
@@ -86,15 +86,10 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
8686
// Collect is triggered by the data layer framework to fetch potentially new
8787
// MSP metrics data for an endpoint.
8888
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
89-
cl, err := dataSrc.clients.GetClientForEndpoint(ep.GetPod())
90-
if cl == nil {
91-
return err // TODO log error
92-
}
93-
9489
target := dataSrc.getMetricsEndpoint(ep.GetPod())
95-
families, err := cl.Get(ctx, target, ep.GetPod())
90+
families, err := dataSrc.client.Get(ctx, target, ep.GetPod())
9691

97-
if err != nil { // TODO log error
92+
if err != nil {
9893
return err
9994
}
10095

0 commit comments

Comments
 (0)