From ddb63d569795b50990b37012fd80fd86b71ff9cf Mon Sep 17 00:00:00 2001 From: oliveromahony Date: Tue, 6 Aug 2024 12:17:23 +0100 Subject: [PATCH] Increase timeout period for collecting metrics (#755) * added parallel calls for getting plus stats to speed up the metrics retrieval process --- src/core/metrics/sources/nginx_plus.go | 360 +++++++++++++++++- src/core/metrics/sources/nginx_plus_test.go | 151 +++++++- src/core/nginx.go | 2 +- src/plugins/metrics.go | 8 +- src/plugins/metrics_throlling.go | 1 + .../nginx/agent/v2/src/core/nginx.go | 2 +- .../v2/src/core/metrics/sources/nginx_plus.go | 360 +++++++++++++++++- .../nginx/agent/v2/src/core/nginx.go | 2 +- .../nginx/agent/v2/src/plugins/metrics.go | 8 +- .../agent/v2/src/plugins/metrics_throlling.go | 1 + 10 files changed, 868 insertions(+), 27 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 7afb45d7d5..3077083ba8 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -10,10 +10,12 @@ package sources import ( "context" "encoding/json" + "errors" "fmt" "io" "math" "net/http" + "slices" "sync" "time" @@ -37,6 +39,29 @@ const ( valueFloat64Zero = float64(0) ) +type Client interface { + GetAvailableEndpoints() ([]string, error) + GetAvailableStreamEndpoints() ([]string, error) + GetStreamServerZones() (*plusclient.StreamServerZones, error) + GetStreamUpstreams() (*plusclient.StreamUpstreams, error) + GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) + GetStreamZoneSync() (*plusclient.StreamZoneSync, error) + GetNginxInfo() (*plusclient.NginxInfo, error) + GetCaches() (*plusclient.Caches, error) + GetProcesses() (*plusclient.Processes, error) + GetSlabs() (*plusclient.Slabs, error) + GetConnections() (*plusclient.Connections, error) + GetHTTPRequests() (*plusclient.HTTPRequests, error) + GetSSL() (*plusclient.SSL, error) + GetServerZones() (*plusclient.ServerZones, error) + GetUpstreams() (*plusclient.Upstreams, error) + GetLocationZones() (*plusclient.LocationZones, error) + GetResolvers() (*plusclient.Resolvers, error) + GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) + GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) + GetWorkers() ([]*plusclient.Workers, error) +} + // NginxPlus generates metrics from NGINX Plus API type NginxPlus struct { baseDimensions *metrics.CommonDim @@ -50,6 +75,12 @@ type NginxPlus struct { logger *MetricSourceLogger } +type ExtendedStats struct { + *plusclient.Stats + endpoints []string + streamEndpoints []string +} + func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespace, plusAPI string, clientVersion int) *NginxPlus { return &NginxPlus{baseDimensions: baseDimensions, nginxNamespace: nginxNamespace, plusNamespace: plusNamespace, plusAPI: plusAPI, clientVersion: clientVersion, logger: NewMetricSourceLogger()} } @@ -71,7 +102,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - c.prevStats, err = cl.GetStats() + c.prevStats, err = c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -87,13 +118,17 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - stats, err := cl.GetStats() + log.Debug("NGINX_plus_Collect: getting stats") + + stats, err := c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) return } + log.Debug("NGINX_plus_Collect: got stats") + if c.prevStats == nil { c.prevStats = stats } @@ -105,23 +140,325 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxVersion = stats.NginxInfo.Version c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) + log.Debug("NGINX_plus_Collect: metrics sent") c.prevStats = stats } +func (c *NginxPlus) defaultStats() *plusclient.Stats { + return &plusclient.Stats{ + Upstreams: map[string]plusclient.Upstream{}, + ServerZones: map[string]plusclient.ServerZone{}, + StreamServerZones: map[string]plusclient.StreamServerZone{}, + StreamUpstreams: map[string]plusclient.StreamUpstream{}, + Slabs: map[string]plusclient.Slab{}, + Caches: map[string]plusclient.HTTPCache{}, + HTTPLimitConnections: map[string]plusclient.LimitConnection{}, + StreamLimitConnections: map[string]plusclient.LimitConnection{}, + HTTPLimitRequests: map[string]plusclient.HTTPLimitRequest{}, + Resolvers: map[string]plusclient.Resolver{}, + LocationZones: map[string]plusclient.LocationZone{}, + StreamZoneSync: &plusclient.StreamZoneSync{}, + Workers: []*plusclient.Workers{}, + NginxInfo: plusclient.NginxInfo{}, + SSL: plusclient.SSL{}, + Connections: plusclient.Connections{}, + HTTPRequests: plusclient.HTTPRequests{}, + Processes: plusclient.Processes{}, + } +} + +func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { + var initialStatsWg sync.WaitGroup + initialStatsErrChan := make(chan error, 15) + stats := &ExtendedStats{ + Stats: c.defaultStats(), + } + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + endpoints, err := client.GetAvailableEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Endpoints: %w", err) + return + } + stats.endpoints = endpoints + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + streamEndpoints, err := client.GetAvailableStreamEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Stream Endpoints: %w", err) + return + } + stats.streamEndpoints = streamEndpoints + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + nginxInfo, err := client.GetNginxInfo() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.NginxInfo = *nginxInfo + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + caches, err := client.GetCaches() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.Caches = *caches + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + processes, err := client.GetProcesses() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Processes = *processes + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + slabs, err := client.GetSlabs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Slabs = *slabs + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpRequests, err := client.GetHTTPRequests() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPRequests: %w", err) + return + } + stats.HTTPRequests = *httpRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + ssl, err := client.GetSSL() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get SSL: %w", err) + return + } + stats.SSL = *ssl + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + serverZones, err := client.GetServerZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get ServerZones: %w", err) + return + } + stats.ServerZones = *serverZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + upstreams, err := client.GetUpstreams() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Upstreams: %w", err) + return + } + stats.Upstreams = *upstreams + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + locationZones, err := client.GetLocationZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get LocationZones: %w", err) + return + } + stats.LocationZones = *locationZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + resolvers, err := client.GetResolvers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Resolvers: %w", err) + return + } + stats.Resolvers = *resolvers + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitRequests, err := client.GetHTTPLimitReqs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitRequests: %w", err) + return + } + stats.HTTPLimitRequests = *httpLimitRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitConnections, err := client.GetHTTPConnectionsLimit() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitConnections: %w", err) + return + } + stats.HTTPLimitConnections = *httpLimitConnections + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + workers, err := client.GetWorkers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Workers: %w", err) + return + } + stats.Workers = workers + }() + + initialStatsWg.Wait() + close(initialStatsErrChan) + + // only error if all the stats are empty + if len(initialStatsErrChan) == 16 { + return nil, errors.New("no useful metrics found") + } + + if slices.Contains(stats.endpoints, "stream") { + var streamStatsWg sync.WaitGroup + // this may never be 4 depending on the if conditions + streamStatsErrChan := make(chan error, 4) + + if slices.Contains(stats.streamEndpoints, "server_zones") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamServerZones, err := client.GetStreamServerZones() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get streamServerZones: %w", err) + return + } + stats.StreamServerZones = *streamServerZones + }() + } + + if slices.Contains(stats.streamEndpoints, "upstreams") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamUpstreams, err := client.GetStreamUpstreams() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamUpstreams: %w", err) + return + } + stats.StreamUpstreams = *streamUpstreams + }() + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamConnectionsLimit, err := client.GetStreamConnectionsLimit() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamLimitConnections: %w", err) + return + } + stats.StreamLimitConnections = *streamConnectionsLimit + }() + + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamZoneSync, err := client.GetStreamZoneSync() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamZoneSync: %w", err) + return + } + stats.StreamZoneSync = streamZoneSync + }() + + } + streamStatsWg.Wait() + close(streamStatsErrChan) + + if len(streamStatsErrChan) > 0 { + log.Warnf("no useful metrics found in stream stats") + } + } + + // report connection metrics separately so it does not influence the results (all http requests show in the metrics) + var connectionsWg sync.WaitGroup + connectionsErrChan := make(chan error, 1) + + connectionsWg.Add(1) + go func() { + defer connectionsWg.Done() + connections, err := client.GetConnections() + if err != nil { + connectionsErrChan <- fmt.Errorf("failed to get connections: %w", err) + return + } + stats.Connections = *connections + }() + + connectionsWg.Wait() + close(connectionsErrChan) + + if len(connectionsErrChan) > 0 { + log.Warnf("connections metrics not found") + } + + return stats.Stats, nil +} + func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI } func (c *NginxPlus) Stop() { - log.Debugf("Stopping NginxPlus source for nginx id: %v", c.baseDimensions.NginxId) + log.Debugf("Stopping NGINX Plus source for NGINX id: %v", c.baseDimensions.NginxId) } func (c *NginxPlus) sendMetrics(ctx context.Context, m chan<- *metrics.StatsEntityWrapper, entries ...*metrics.StatsEntityWrapper) { for _, entry := range entries { select { case <-ctx.Done(): + err := ctx.Err() + if err != nil { + log.Errorf("sendMetrics: error in done context %v", err) + } + log.Debug("sendMetrics: context done") return case m <- entry: } @@ -161,6 +498,7 @@ func (c *NginxPlus) instanceMetrics(stats, prevStats *plusclient.Stats) *metrics }) dims := c.baseDimensions.ToDimensions() + return metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE) } @@ -227,6 +565,7 @@ func (c *NginxPlus) sslMetrics(stats, prevStats *plusclient.Stats) *metrics.Stat func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper { zoneMetrics := make([]*metrics.StatsEntityWrapper, 0) + for name, sz := range stats.ServerZones { l := &namedMetric{namespace: c.plusNamespace, group: "http"} @@ -305,6 +644,8 @@ func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*met zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -362,6 +703,8 @@ func (c *NginxPlus) streamServerZoneMetrics(stats, prevStats *plusclient.Stats) dims = append(dims, &proto.Dimension{Name: "server_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("stream server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -428,6 +771,7 @@ func (c *NginxPlus) locationZoneMetrics(stats, prevStats *plusclient.Stats) []*m dims = append(dims, &proto.Dimension{Name: "location_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("location zone metrics count %d", len(zoneMetrics)) return zoneMetrics } @@ -572,6 +916,7 @@ func (c *NginxPlus) httpUpstreamMetrics(stats, prevStats *plusclient.Stats) []*m upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -672,6 +1017,7 @@ func (c *NginxPlus) streamUpstreamMetrics(stats, prevStats *plusclient.Stats) [] upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("stream upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -762,6 +1108,8 @@ func (c *NginxPlus) cacheMetrics(stats, prevStats *plusclient.Stats) []*metrics. zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_CACHE_ZONE)) } + log.Debugf("cache metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -799,6 +1147,7 @@ func (c *NginxPlus) slabMetrics(stats *plusclient.Stats) []*metrics.StatsEntityW slabMetrics = append(slabMetrics, metrics.NewStatsEntityWrapper(dims, slotSimpleMetrics, proto.MetricsReport_INSTANCE)) } } + log.Debugf("slab metrics count %d", len(slabMetrics)) return slabMetrics } @@ -818,6 +1167,8 @@ func (c *NginxPlus) httpLimitConnsMetrics(stats, prevStats *plusclient.Stats) [] limitConnsMetrics = append(limitConnsMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit connection metrics count %d", len(limitConnsMetrics)) + return limitConnsMetrics } @@ -838,6 +1189,8 @@ func (c *NginxPlus) httpLimitRequestMetrics(stats, prevStats *plusclient.Stats) limitRequestMetrics = append(limitRequestMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit request metrics count %d", len(limitRequestMetrics)) + return limitRequestMetrics } @@ -871,6 +1224,7 @@ func (c *NginxPlus) workerMetrics(stats, prevStats *plusclient.Stats) []*metrics dims = append(dims, &proto.Dimension{Name: "process_id", Value: fmt.Sprint(w.ProcessID)}) workerMetrics = append(workerMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("worker metrics count %d", len(workerMetrics)) return workerMetrics } diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index dd845411be..b067a987ad 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -54,8 +54,6 @@ const ( currentPeer2UpstreamHeaderTime = 80 currentPeer1UpstreamResponseTime = 100 currentPeer2UpstreamResponseTime = 80 - currentUpstreamResponseTime = 100 - currentUpstreamConnectTime = 80 currentUpstreamFirstByteTime = 50 previousUpstreamHeaderTime = 98 previousUpstreamResponseTime = 98 @@ -97,14 +95,28 @@ const ( workerProcessID = 12345 ) -type FakeNginxPlus struct { - *NginxPlus -} - -// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api -func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { - defer wg.Done() - stats := plusclient.Stats{ +var ( + availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"} + stats = plusclient.Stats{ + StreamZoneSync: &plusclient.StreamZoneSync{ + Zones: map[string]plusclient.SyncZone{ + "0": { + RecordsPending: 1, + RecordsTotal: 2, + }, + "1": { + RecordsPending: 3, + RecordsTotal: 4, + }, + }, + Status: plusclient.StreamZoneSyncStatus{ + BytesIn: 1, + MsgsIn: 2, + MsgsOut: 3, + BytesOut: 4, + NodesOnline: 5, + }, + }, HTTPRequests: plusclient.HTTPRequests{ Total: currentHTTPRequestTotal, Current: currentHTTPRequestCurrent, @@ -387,8 +399,7 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< }, }, } - - prevStats := plusclient.Stats{ + prevStats = plusclient.Stats{ HTTPRequests: plusclient.HTTPRequests{ Total: previousHTTPRequestTotal, Current: previousHTTPRequestCurrent, @@ -547,6 +558,15 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< }, }, } +) + +type FakeNginxPlus struct { + *NginxPlus +} + +// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api +func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { + defer wg.Done() f.baseDimensions.NginxType = "plus" f.baseDimensions.PublishedAPI = f.plusAPI @@ -556,6 +576,113 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< f.sendMetrics(ctx, m, f.collectMetrics(&stats, &prevStats)...) } +var _ Client = (*MockClient)(nil) + +type MockClient struct{} + +func (m *MockClient) GetAvailableEndpoints() ([]string, error) { + return []string{"stream"}, nil +} + +func (m *MockClient) GetAvailableStreamEndpoints() ([]string, error) { + return availableZones, nil +} + +func (m *MockClient) GetStreamServerZones() (*plusclient.StreamServerZones, error) { + return &stats.StreamServerZones, nil +} + +func (m *MockClient) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) { + return &stats.StreamUpstreams, nil +} + +func (m *MockClient) GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) { + return &stats.StreamLimitConnections, nil +} + +func (m *MockClient) GetStreamZoneSync() (*plusclient.StreamZoneSync, error) { + return &plusclient.StreamZoneSync{ + Zones: stats.StreamZoneSync.Zones, + Status: stats.StreamZoneSync.Status, + }, nil +} + +func (m *MockClient) GetNginxInfo() (*plusclient.NginxInfo, error) { + return &stats.NginxInfo, nil +} + +func (m *MockClient) GetCaches() (*plusclient.Caches, error) { + return &stats.Caches, nil +} + +func (m *MockClient) GetProcesses() (*plusclient.Processes, error) { + return &stats.Processes, nil +} + +func (m *MockClient) GetSlabs() (*plusclient.Slabs, error) { + return &stats.Slabs, nil +} + +func (m *MockClient) GetConnections() (*plusclient.Connections, error) { + return &stats.Connections, nil +} + +func (m *MockClient) GetHTTPRequests() (*plusclient.HTTPRequests, error) { + return &stats.HTTPRequests, nil +} + +func (m *MockClient) GetSSL() (*plusclient.SSL, error) { + return &stats.SSL, nil +} + +func (m *MockClient) GetServerZones() (*plusclient.ServerZones, error) { + return &stats.ServerZones, nil +} + +func (m *MockClient) GetUpstreams() (*plusclient.Upstreams, error) { + return &stats.Upstreams, nil +} + +func (m *MockClient) GetLocationZones() (*plusclient.LocationZones, error) { + return &stats.LocationZones, nil +} + +func (m *MockClient) GetResolvers() (*plusclient.Resolvers, error) { + return &stats.Resolvers, nil +} + +func (m *MockClient) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) { + return &stats.HTTPLimitRequests, nil +} + +func (m *MockClient) GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) { + return &stats.HTTPLimitConnections, nil +} + +func (m *MockClient) GetWorkers() ([]*plusclient.Workers, error) { + return stats.Workers, nil +} + +func TestGetStats(t *testing.T) { + client := &MockClient{} + + source := NewNginxPlus(nil, "", "", "", 9) + + tests := []struct { + stats plusclient.Stats + }{ + { + stats: stats, + }, + } + + for _, test := range tests { + resultStats, err := source.getStats(client) + require.NoError(t, err) + assert.Equal(t, test.stats, *resultStats) + } +} + func TestNginxPlusUpdate(t *testing.T) { nginxPlus := NewNginxPlus(&metrics.CommonDim{}, "test", PlusNamespace, "http://localhost:8080/api", 6) diff --git a/src/core/nginx.go b/src/core/nginx.go index f29f410ee1..b85451dcfe 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro if urlsLength == 0 || nginxStatus == "" { stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) if err != nil { - log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err) + log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err) } nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index ac493ddc97..de869794bc 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -211,8 +211,8 @@ func (m *Metrics) metricsGoroutine() { } func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { - // setups a collect duration of half-time of the poll interval - ctx, cancel := context.WithTimeout(m.ctx, m.interval/2) + // set a timeout for a millisecond less than the collection interval + ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1*time.Millisecond)) defer cancel() // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. @@ -232,7 +232,9 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { // drain the buf, since our sources/collectors are all done, we can rely on buffer length select { case <-ctx.Done(): - err := m.ctx.Err() + log.Debugf("context done in %s collectStats", time.Since(start)) + + err := ctx.Err() if err != nil { log.Errorf("error in done context collectStats %v", err) } diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index ee0cf6bd52..770bf27331 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -154,6 +154,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: reports := r.getAggregatedReports() + log.Debugf("metricsThrottle: metricsReportGoroutine, got %d reports to send", len(reports)) if len(reports) > 0 { r.messagePipeline.Process(core.NewMessage(core.CommMetrics, reports)) } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index f29f410ee1..b85451dcfe 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro if urlsLength == 0 || nginxStatus == "" { stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) if err != nil { - log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err) + log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err) } nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 7afb45d7d5..3077083ba8 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -10,10 +10,12 @@ package sources import ( "context" "encoding/json" + "errors" "fmt" "io" "math" "net/http" + "slices" "sync" "time" @@ -37,6 +39,29 @@ const ( valueFloat64Zero = float64(0) ) +type Client interface { + GetAvailableEndpoints() ([]string, error) + GetAvailableStreamEndpoints() ([]string, error) + GetStreamServerZones() (*plusclient.StreamServerZones, error) + GetStreamUpstreams() (*plusclient.StreamUpstreams, error) + GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) + GetStreamZoneSync() (*plusclient.StreamZoneSync, error) + GetNginxInfo() (*plusclient.NginxInfo, error) + GetCaches() (*plusclient.Caches, error) + GetProcesses() (*plusclient.Processes, error) + GetSlabs() (*plusclient.Slabs, error) + GetConnections() (*plusclient.Connections, error) + GetHTTPRequests() (*plusclient.HTTPRequests, error) + GetSSL() (*plusclient.SSL, error) + GetServerZones() (*plusclient.ServerZones, error) + GetUpstreams() (*plusclient.Upstreams, error) + GetLocationZones() (*plusclient.LocationZones, error) + GetResolvers() (*plusclient.Resolvers, error) + GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) + GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) + GetWorkers() ([]*plusclient.Workers, error) +} + // NginxPlus generates metrics from NGINX Plus API type NginxPlus struct { baseDimensions *metrics.CommonDim @@ -50,6 +75,12 @@ type NginxPlus struct { logger *MetricSourceLogger } +type ExtendedStats struct { + *plusclient.Stats + endpoints []string + streamEndpoints []string +} + func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespace, plusAPI string, clientVersion int) *NginxPlus { return &NginxPlus{baseDimensions: baseDimensions, nginxNamespace: nginxNamespace, plusNamespace: plusNamespace, plusAPI: plusAPI, clientVersion: clientVersion, logger: NewMetricSourceLogger()} } @@ -71,7 +102,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - c.prevStats, err = cl.GetStats() + c.prevStats, err = c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -87,13 +118,17 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - stats, err := cl.GetStats() + log.Debug("NGINX_plus_Collect: getting stats") + + stats, err := c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) return } + log.Debug("NGINX_plus_Collect: got stats") + if c.prevStats == nil { c.prevStats = stats } @@ -105,23 +140,325 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxVersion = stats.NginxInfo.Version c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) + log.Debug("NGINX_plus_Collect: metrics sent") c.prevStats = stats } +func (c *NginxPlus) defaultStats() *plusclient.Stats { + return &plusclient.Stats{ + Upstreams: map[string]plusclient.Upstream{}, + ServerZones: map[string]plusclient.ServerZone{}, + StreamServerZones: map[string]plusclient.StreamServerZone{}, + StreamUpstreams: map[string]plusclient.StreamUpstream{}, + Slabs: map[string]plusclient.Slab{}, + Caches: map[string]plusclient.HTTPCache{}, + HTTPLimitConnections: map[string]plusclient.LimitConnection{}, + StreamLimitConnections: map[string]plusclient.LimitConnection{}, + HTTPLimitRequests: map[string]plusclient.HTTPLimitRequest{}, + Resolvers: map[string]plusclient.Resolver{}, + LocationZones: map[string]plusclient.LocationZone{}, + StreamZoneSync: &plusclient.StreamZoneSync{}, + Workers: []*plusclient.Workers{}, + NginxInfo: plusclient.NginxInfo{}, + SSL: plusclient.SSL{}, + Connections: plusclient.Connections{}, + HTTPRequests: plusclient.HTTPRequests{}, + Processes: plusclient.Processes{}, + } +} + +func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { + var initialStatsWg sync.WaitGroup + initialStatsErrChan := make(chan error, 15) + stats := &ExtendedStats{ + Stats: c.defaultStats(), + } + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + endpoints, err := client.GetAvailableEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Endpoints: %w", err) + return + } + stats.endpoints = endpoints + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + streamEndpoints, err := client.GetAvailableStreamEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Stream Endpoints: %w", err) + return + } + stats.streamEndpoints = streamEndpoints + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + nginxInfo, err := client.GetNginxInfo() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.NginxInfo = *nginxInfo + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + caches, err := client.GetCaches() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.Caches = *caches + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + processes, err := client.GetProcesses() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Processes = *processes + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + slabs, err := client.GetSlabs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Slabs = *slabs + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpRequests, err := client.GetHTTPRequests() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPRequests: %w", err) + return + } + stats.HTTPRequests = *httpRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + ssl, err := client.GetSSL() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get SSL: %w", err) + return + } + stats.SSL = *ssl + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + serverZones, err := client.GetServerZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get ServerZones: %w", err) + return + } + stats.ServerZones = *serverZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + upstreams, err := client.GetUpstreams() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Upstreams: %w", err) + return + } + stats.Upstreams = *upstreams + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + locationZones, err := client.GetLocationZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get LocationZones: %w", err) + return + } + stats.LocationZones = *locationZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + resolvers, err := client.GetResolvers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Resolvers: %w", err) + return + } + stats.Resolvers = *resolvers + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitRequests, err := client.GetHTTPLimitReqs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitRequests: %w", err) + return + } + stats.HTTPLimitRequests = *httpLimitRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitConnections, err := client.GetHTTPConnectionsLimit() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitConnections: %w", err) + return + } + stats.HTTPLimitConnections = *httpLimitConnections + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + workers, err := client.GetWorkers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Workers: %w", err) + return + } + stats.Workers = workers + }() + + initialStatsWg.Wait() + close(initialStatsErrChan) + + // only error if all the stats are empty + if len(initialStatsErrChan) == 16 { + return nil, errors.New("no useful metrics found") + } + + if slices.Contains(stats.endpoints, "stream") { + var streamStatsWg sync.WaitGroup + // this may never be 4 depending on the if conditions + streamStatsErrChan := make(chan error, 4) + + if slices.Contains(stats.streamEndpoints, "server_zones") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamServerZones, err := client.GetStreamServerZones() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get streamServerZones: %w", err) + return + } + stats.StreamServerZones = *streamServerZones + }() + } + + if slices.Contains(stats.streamEndpoints, "upstreams") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamUpstreams, err := client.GetStreamUpstreams() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamUpstreams: %w", err) + return + } + stats.StreamUpstreams = *streamUpstreams + }() + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamConnectionsLimit, err := client.GetStreamConnectionsLimit() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamLimitConnections: %w", err) + return + } + stats.StreamLimitConnections = *streamConnectionsLimit + }() + + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamZoneSync, err := client.GetStreamZoneSync() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamZoneSync: %w", err) + return + } + stats.StreamZoneSync = streamZoneSync + }() + + } + streamStatsWg.Wait() + close(streamStatsErrChan) + + if len(streamStatsErrChan) > 0 { + log.Warnf("no useful metrics found in stream stats") + } + } + + // report connection metrics separately so it does not influence the results (all http requests show in the metrics) + var connectionsWg sync.WaitGroup + connectionsErrChan := make(chan error, 1) + + connectionsWg.Add(1) + go func() { + defer connectionsWg.Done() + connections, err := client.GetConnections() + if err != nil { + connectionsErrChan <- fmt.Errorf("failed to get connections: %w", err) + return + } + stats.Connections = *connections + }() + + connectionsWg.Wait() + close(connectionsErrChan) + + if len(connectionsErrChan) > 0 { + log.Warnf("connections metrics not found") + } + + return stats.Stats, nil +} + func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI } func (c *NginxPlus) Stop() { - log.Debugf("Stopping NginxPlus source for nginx id: %v", c.baseDimensions.NginxId) + log.Debugf("Stopping NGINX Plus source for NGINX id: %v", c.baseDimensions.NginxId) } func (c *NginxPlus) sendMetrics(ctx context.Context, m chan<- *metrics.StatsEntityWrapper, entries ...*metrics.StatsEntityWrapper) { for _, entry := range entries { select { case <-ctx.Done(): + err := ctx.Err() + if err != nil { + log.Errorf("sendMetrics: error in done context %v", err) + } + log.Debug("sendMetrics: context done") return case m <- entry: } @@ -161,6 +498,7 @@ func (c *NginxPlus) instanceMetrics(stats, prevStats *plusclient.Stats) *metrics }) dims := c.baseDimensions.ToDimensions() + return metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE) } @@ -227,6 +565,7 @@ func (c *NginxPlus) sslMetrics(stats, prevStats *plusclient.Stats) *metrics.Stat func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper { zoneMetrics := make([]*metrics.StatsEntityWrapper, 0) + for name, sz := range stats.ServerZones { l := &namedMetric{namespace: c.plusNamespace, group: "http"} @@ -305,6 +644,8 @@ func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*met zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -362,6 +703,8 @@ func (c *NginxPlus) streamServerZoneMetrics(stats, prevStats *plusclient.Stats) dims = append(dims, &proto.Dimension{Name: "server_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("stream server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -428,6 +771,7 @@ func (c *NginxPlus) locationZoneMetrics(stats, prevStats *plusclient.Stats) []*m dims = append(dims, &proto.Dimension{Name: "location_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("location zone metrics count %d", len(zoneMetrics)) return zoneMetrics } @@ -572,6 +916,7 @@ func (c *NginxPlus) httpUpstreamMetrics(stats, prevStats *plusclient.Stats) []*m upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -672,6 +1017,7 @@ func (c *NginxPlus) streamUpstreamMetrics(stats, prevStats *plusclient.Stats) [] upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("stream upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -762,6 +1108,8 @@ func (c *NginxPlus) cacheMetrics(stats, prevStats *plusclient.Stats) []*metrics. zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_CACHE_ZONE)) } + log.Debugf("cache metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -799,6 +1147,7 @@ func (c *NginxPlus) slabMetrics(stats *plusclient.Stats) []*metrics.StatsEntityW slabMetrics = append(slabMetrics, metrics.NewStatsEntityWrapper(dims, slotSimpleMetrics, proto.MetricsReport_INSTANCE)) } } + log.Debugf("slab metrics count %d", len(slabMetrics)) return slabMetrics } @@ -818,6 +1167,8 @@ func (c *NginxPlus) httpLimitConnsMetrics(stats, prevStats *plusclient.Stats) [] limitConnsMetrics = append(limitConnsMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit connection metrics count %d", len(limitConnsMetrics)) + return limitConnsMetrics } @@ -838,6 +1189,8 @@ func (c *NginxPlus) httpLimitRequestMetrics(stats, prevStats *plusclient.Stats) limitRequestMetrics = append(limitRequestMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit request metrics count %d", len(limitRequestMetrics)) + return limitRequestMetrics } @@ -871,6 +1224,7 @@ func (c *NginxPlus) workerMetrics(stats, prevStats *plusclient.Stats) []*metrics dims = append(dims, &proto.Dimension{Name: "process_id", Value: fmt.Sprint(w.ProcessID)}) workerMetrics = append(workerMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("worker metrics count %d", len(workerMetrics)) return workerMetrics } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index f29f410ee1..b85451dcfe 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro if urlsLength == 0 || nginxStatus == "" { stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) if err != nil { - log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err) + log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err) } nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index ac493ddc97..de869794bc 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -211,8 +211,8 @@ func (m *Metrics) metricsGoroutine() { } func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { - // setups a collect duration of half-time of the poll interval - ctx, cancel := context.WithTimeout(m.ctx, m.interval/2) + // set a timeout for a millisecond less than the collection interval + ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1*time.Millisecond)) defer cancel() // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. @@ -232,7 +232,9 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { // drain the buf, since our sources/collectors are all done, we can rely on buffer length select { case <-ctx.Done(): - err := m.ctx.Err() + log.Debugf("context done in %s collectStats", time.Since(start)) + + err := ctx.Err() if err != nil { log.Errorf("error in done context collectStats %v", err) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index ee0cf6bd52..770bf27331 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -154,6 +154,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: reports := r.getAggregatedReports() + log.Debugf("metricsThrottle: metricsReportGoroutine, got %d reports to send", len(reports)) if len(reports) > 0 { r.messagePipeline.Process(core.NewMessage(core.CommMetrics, reports)) }