diff --git a/xds/internal/clients/lrsclient/internal/internal.go b/xds/internal/clients/lrsclient/internal/internal.go new file mode 100644 index 000000000000..86eb14ea4ca4 --- /dev/null +++ b/xds/internal/clients/lrsclient/internal/internal.go @@ -0,0 +1,26 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package internal contains functionality internal to the lrsclient package. +package internal + +import "time" + +var ( + // TimeNow is used to get the current time. It can be overridden in tests. + TimeNow func() time.Time +) diff --git a/xds/internal/clients/lrsclient/load_store.go b/xds/internal/clients/lrsclient/load_store.go index fd363ad62145..0a3de8b3a5bb 100644 --- a/xds/internal/clients/lrsclient/load_store.go +++ b/xds/internal/clients/lrsclient/load_store.go @@ -23,6 +23,8 @@ import ( "sync" "sync/atomic" "time" + + lrsclientinternal "google.golang.org/grpc/xds/internal/clients/lrsclient/internal" ) // A LoadStore aggregates loads for multiple clusters and services that are @@ -49,6 +51,10 @@ type LoadStore struct { clusters map[string]map[string]*PerClusterReporter } +func init() { + lrsclientinternal.TimeNow = time.Now +} + // newLoadStore creates a LoadStore. func newLoadStore() *LoadStore { return &LoadStore{ @@ -82,8 +88,9 @@ func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) *PerClu return p } p := &PerClusterReporter{ - cluster: clusterName, - service: serviceName, + cluster: clusterName, + service: serviceName, + lastLoadReportAt: lrsclientinternal.TimeNow(), } c[serviceName] = p return p @@ -244,8 +251,8 @@ func (p *PerClusterReporter) stats() *loadData { }) p.mu.Lock() - sd.reportInterval = time.Since(p.lastLoadReportAt) - p.lastLoadReportAt = time.Now() + sd.reportInterval = lrsclientinternal.TimeNow().Sub(p.lastLoadReportAt) + p.lastLoadReportAt = lrsclientinternal.TimeNow() p.mu.Unlock() if sd.totalDrops == 0 && len(sd.drops) == 0 && len(sd.localityStats) == 0 { diff --git a/xds/internal/clients/lrsclient/load_store_test.go b/xds/internal/clients/lrsclient/load_store_test.go index a21ac71defca..a4c191921544 100644 --- a/xds/internal/clients/lrsclient/load_store_test.go +++ b/xds/internal/clients/lrsclient/load_store_test.go @@ -22,9 +22,11 @@ import ( "sort" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + lrsclientinternal "google.golang.org/grpc/xds/internal/clients/lrsclient/internal" ) var ( @@ -471,3 +473,52 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) { t.Error(err) } } + +// TestStoreReportInterval verify that the load report interval gets +// calculated at every stats() call and is the duration between start of last +// load reporting to next stats() call. +func TestStoreReportInterval(t *testing.T) { + originalTimeNow := lrsclientinternal.TimeNow + t.Cleanup(func() { lrsclientinternal.TimeNow = originalTimeNow }) + + // Initial time for reporter creation + currentTime := time.Now() + lrsclientinternal.TimeNow = func() time.Time { + return currentTime + } + + store := newLoadStore() + reporter := store.ReporterForCluster("test-cluster", "test-service") + // Report dummy drop to ensure stats1 is not nil. + reporter.CallDropped("dummy-category") + + // Update currentTime to simulate the passage of time between the reporter + // creation and first stats() call. + currentTime = currentTime.Add(5 * time.Second) + stats1 := reporter.stats() + + if stats1 == nil { + t.Fatalf("stats1 is nil after reporting a drop, want non-nil") + } + // Verify stats() call calculate the report interval from the time of + // reporter creation. + if got, want := stats1.reportInterval, 5*time.Second; got != want { + t.Errorf("stats1.reportInterval = %v, want %v", stats1.reportInterval, want) + } + + // Update currentTime to simulate the passage of time between the first + // and second stats() call. + currentTime = currentTime.Add(10 * time.Second) + // Report another dummy drop to ensure stats2 is not nil. + reporter.CallDropped("dummy-category-2") + stats2 := reporter.stats() + + if stats2 == nil { + t.Fatalf("stats2 is nil after reporting a drop, want non-nil") + } + // Verify stats() call calculate the report interval from the time of first + // stats() call. + if got, want := stats2.reportInterval, 10*time.Second; got != want { + t.Errorf("stats2.reportInterval = %v, want %v", stats2.reportInterval, want) + } +} diff --git a/xds/internal/clients/lrsclient/loadreport_test.go b/xds/internal/clients/lrsclient/loadreport_test.go index bcdec9c63492..ed79c6162202 100644 --- a/xds/internal/clients/lrsclient/loadreport_test.go +++ b/xds/internal/clients/lrsclient/loadreport_test.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e" "google.golang.org/grpc/xds/internal/clients/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/clients/lrsclient" + lrsclientinternal "google.golang.org/grpc/xds/internal/clients/lrsclient/internal" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/durationpb" @@ -610,3 +611,99 @@ func (s) TestReportLoad_StopWithContext(t *testing.T) { t.Fatal("Timeout waiting for LRS stream to close") } } + +// TestReportLoad_LoadReportInterval tests verify that the load report interval +// received by the LRS server is the duration between start of last load +// reporting by the client and the time when the load is reported to the LRS +// server. +func (s) TestReportLoad_LoadReportInterval(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + originalTimeNow := lrsclientinternal.TimeNow + t.Cleanup(func() { lrsclientinternal.TimeNow = originalTimeNow }) + + // Create a management server that serves LRS. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true}) + + // Create an LRS client with configuration pointing to the above server. + nodeID := uuid.New().String() + + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + config := lrsclient.Config{ + Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"}, + TransportBuilder: grpctransport.NewBuilder(configs), + } + client, err := lrsclient.New(config) + if err != nil { + t.Fatalf("lrsclient.New() failed: %v", err) + } + + // Call the load reporting API, and ensure that an LRS stream is created. + serverIdentifier := clients.ServerIdentifier{ServerURI: mgmtServer.Address, Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}} + loadStore1, err := client.ReportLoad(serverIdentifier) + if err != nil { + t.Fatalf("client.ReportLoad() failed: %v", err) + } + lrsServer := mgmtServer.LRSServer + if _, err := lrsServer.LRSStreamOpenChan.Receive(ctx); err != nil { + t.Fatalf("Timeout when waiting for LRS stream to be created: %v", err) + } + + // Initial time for reporter creation + currentTime := time.Now() + lrsclientinternal.TimeNow = func() time.Time { + return currentTime + } + + // Report dummy drop to ensure stats is not nil. + loadStore1.ReporterForCluster("cluster1", "eds1").CallDropped("test") + + // Update currentTime to simulate the passage of time between the reporter + // creation and first stats() call. + currentTime = currentTime.Add(5 * time.Second) + + // Ensure the initial load reporting request is received at the server. + req, err := lrsServer.LRSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Timeout when waiting for initial LRS request: %v", err) + } + gotInitialReq := req.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest) + nodeProto := &v3corepb.Node{ + Id: nodeID, + UserAgentName: "user-agent", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw", "envoy.lrs.supports_send_all_clusters"}, + } + wantInitialReq := &v3lrspb.LoadStatsRequest{Node: nodeProto} + if diff := cmp.Diff(gotInitialReq, wantInitialReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in initial LRS request (-got, +want):\n%s", diff) + } + + // Send a response from the server with a small deadline. + lrsServer.LRSResponseChan <- &fakeserver.Response{ + Resp: &v3lrspb.LoadStatsResponse{ + SendAllClusters: true, + LoadReportingInterval: &durationpb.Duration{Nanos: 50000000}, // 50ms + }, + } + + // Ensure that loads are seen on the server. + req, err = lrsServer.LRSRequestChan.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for LRS request with loads") + } + gotLoad := req.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest).ClusterStats + if l := len(gotLoad); l != 1 { + t.Fatalf("Received load for %d clusters, want 1", l) + } + // Verify load received at LRS server has load report interval calculated + // from the time of reporter creation. + if got, want := gotLoad[0].GetLoadReportInterval().AsDuration(), 5*time.Second; got != want { + t.Errorf("Got load report interval %v, want %v", got, want) + } + + ssCtx, ssCancel := context.WithTimeout(context.Background(), time.Millisecond) + defer ssCancel() + loadStore1.Stop(ssCtx) +} diff --git a/xds/internal/xdsclient/load/store.go b/xds/internal/xdsclient/load/store.go index f1e265ee7ddf..6c370ac3d935 100644 --- a/xds/internal/xdsclient/load/store.go +++ b/xds/internal/xdsclient/load/store.go @@ -25,6 +25,9 @@ import ( const negativeOneUInt64 = ^uint64(0) +// timeNow is used to get the current time. It can be overridden in tests. +var timeNow = time.Now + // Store keeps the loads for multiple clusters and services to be reported via // LRS. It contains loads to reported to one LRS server. Create multiple stores // for multiple servers. @@ -115,8 +118,9 @@ func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter { return p } p := &perClusterStore{ - cluster: clusterName, - service: serviceName, + cluster: clusterName, + service: serviceName, + lastLoadReportAt: timeNow(), } c[serviceName] = p return p @@ -329,8 +333,8 @@ func (ls *perClusterStore) stats() *Data { }) ls.mu.Lock() - sd.ReportInterval = time.Since(ls.lastLoadReportAt) - ls.lastLoadReportAt = time.Now() + sd.ReportInterval = timeNow().Sub(ls.lastLoadReportAt) + ls.lastLoadReportAt = timeNow() ls.mu.Unlock() if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 { diff --git a/xds/internal/xdsclient/load/store_test.go b/xds/internal/xdsclient/load/store_test.go index 44618966859c..a8a4ac9eeacf 100644 --- a/xds/internal/xdsclient/load/store_test.go +++ b/xds/internal/xdsclient/load/store_test.go @@ -22,6 +22,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -466,3 +467,52 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) { t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) } } + +// TestStoreReportInterval verify that the load report interval gets +// calculated at every stats() call and is the duration between start of last +// load reporting to next stats() call. +func TestStoreReportInterval(t *testing.T) { + originaltimeNow := timeNow + t.Cleanup(func() { timeNow = originaltimeNow }) + + // Initial time for reporter creation + currentTime := time.Now() + timeNow = func() time.Time { + return currentTime + } + + store := NewStore() + reporter := store.PerCluster("test-cluster", "test-service") + // Report dummy drop to ensure stats1 is not nil. + reporter.CallDropped("dummy-category") + + // Update currentTime to simulate the passage of time between the reporter + // creation and first stats() call. + currentTime = currentTime.Add(5 * time.Second) + stats1 := store.Stats(nil) + + if len(stats1) == 0 { + t.Fatalf("stats1 is empty after reporting a drop, want non-nil") + } + // Verify Stats() call calculate the report interval from the time of + // reporter creation. + if got, want := stats1[0].ReportInterval, 5*time.Second; got != want { + t.Errorf("stats1[0].ReportInterval = %v, want %v", stats1[0].ReportInterval, want) + } + + // Update currentTime to simulate the passage of time between the first + // and second stats() call. + currentTime = currentTime.Add(10 * time.Second) + // Report another dummy drop to ensure stats2 is not nil. + reporter.CallDropped("dummy-category-2") + stats2 := store.Stats(nil) + + if len(stats2) == 0 { + t.Fatalf("stats2 is empty after reporting a drop, want non-nil") + } + // Verify Stats() call calculate the report interval from the time of first + // Stats() call. + if got, want := stats2[0].ReportInterval, 10*time.Second; got != want { + t.Errorf("stats2[0].ReportInterval = %v, want %v", stats2[0].ReportInterval, want) + } +}