From bfb5de6a2057313b85c1eea0cb03ac327f5f2993 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Tue, 23 Jul 2024 14:20:03 +0100 Subject: [PATCH 01/17] updated the debugging for context --- src/plugins/metrics.go | 9 ++++++--- .../github.com/nginx/agent/v2/src/plugins/metrics.go | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index ac493ddc9..a8ae7241e 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -9,6 +9,7 @@ package plugins import ( "context" + "math" "sync" "time" @@ -211,8 +212,8 @@ func (m *Metrics) metricsGoroutine() { } func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { - // setups a collect duration of half-time of the poll interval - ctx, cancel := context.WithTimeout(m.ctx, m.interval/2) + // set a timeout for a millisecond less than the collection interval + ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1 * time.Millisecond)) defer cancel() // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. @@ -232,7 +233,9 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { // drain the buf, since our sources/collectors are all done, we can rely on buffer length select { case <-ctx.Done(): - err := m.ctx.Err() + log.Debugf("context done in %s collectStats", time.Since(start)) + + err := ctx.Err() if err != nil { log.Errorf("error in done context collectStats %v", err) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index ac493ddc9..a8ae7241e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -9,6 +9,7 @@ package plugins import ( "context" + "math" "sync" "time" @@ -211,8 +212,8 @@ func (m *Metrics) metricsGoroutine() { } func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { - // setups a collect duration of half-time of the poll interval - ctx, cancel := context.WithTimeout(m.ctx, m.interval/2) + // set a timeout for a millisecond less than the collection interval + ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1 * time.Millisecond)) defer cancel() // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. @@ -232,7 +233,9 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { // drain the buf, since our sources/collectors are all done, we can rely on buffer length select { case <-ctx.Done(): - err := m.ctx.Err() + log.Debugf("context done in %s collectStats", time.Since(start)) + + err := ctx.Err() if err != nil { log.Errorf("error in done context collectStats %v", err) } From 7251390890100f5f03651e53ae1a81d8b61706e5 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Tue, 23 Jul 2024 14:22:04 +0100 Subject: [PATCH 02/17] local changes --- src/plugins/metrics.go | 3 +-- .../vendor/github.com/nginx/agent/v2/src/plugins/metrics.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index a8ae7241e..de869794b 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -9,7 +9,6 @@ package plugins import ( "context" - "math" "sync" "time" @@ -213,7 +212,7 @@ func (m *Metrics) metricsGoroutine() { func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { // set a timeout for a millisecond less than the collection interval - ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1 * time.Millisecond)) + ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1*time.Millisecond)) defer cancel() // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index a8ae7241e..de869794b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -9,7 +9,6 @@ package plugins import ( "context" - "math" "sync" "time" @@ -213,7 +212,7 @@ func (m *Metrics) metricsGoroutine() { func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { // set a timeout for a millisecond less than the collection interval - ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1 * time.Millisecond)) + ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1*time.Millisecond)) defer cancel() // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. From 12da4c1400eabae04132ec92c32b7b387c9431b3 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Tue, 23 Jul 2024 17:04:27 +0100 Subject: [PATCH 03/17] added in more context debugging --- src/core/metrics/sources/nginx_plus.go | 8 +++++++- src/plugins/metrics_throlling.go | 1 + .../nginx/agent/v2/src/core/metrics/sources/nginx_plus.go | 8 +++++++- .../nginx/agent/v2/src/plugins/metrics_throlling.go | 1 + 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 7afb45d7d..e43476635 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -105,6 +105,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxVersion = stats.NginxInfo.Version c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) + log.Debug("NGINX_plus_Collect: metrics sent") c.prevStats = stats } @@ -115,13 +116,18 @@ func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics } func (c *NginxPlus) Stop() { - log.Debugf("Stopping NginxPlus source for nginx id: %v", c.baseDimensions.NginxId) + log.Debugf("Stopping NGINX Plus source for NGINX id: %v", c.baseDimensions.NginxId) } func (c *NginxPlus) sendMetrics(ctx context.Context, m chan<- *metrics.StatsEntityWrapper, entries ...*metrics.StatsEntityWrapper) { for _, entry := range entries { select { case <-ctx.Done(): + err := ctx.Err() + if err != nil { + log.Errorf("sendMetrics: error in done context %v", err) + } + log.Debug("sendMetrics: context done") return case m <- entry: } diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index ee0cf6bd5..770bf2733 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -154,6 +154,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: reports := r.getAggregatedReports() + log.Debugf("metricsThrottle: metricsReportGoroutine, got %d reports to send", len(reports)) if len(reports) > 0 { r.messagePipeline.Process(core.NewMessage(core.CommMetrics, reports)) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 7afb45d7d..e43476635 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -105,6 +105,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxVersion = stats.NginxInfo.Version c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) + log.Debug("NGINX_plus_Collect: metrics sent") c.prevStats = stats } @@ -115,13 +116,18 @@ func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics } func (c *NginxPlus) Stop() { - log.Debugf("Stopping NginxPlus source for nginx id: %v", c.baseDimensions.NginxId) + log.Debugf("Stopping NGINX Plus source for NGINX id: %v", c.baseDimensions.NginxId) } func (c *NginxPlus) sendMetrics(ctx context.Context, m chan<- *metrics.StatsEntityWrapper, entries ...*metrics.StatsEntityWrapper) { for _, entry := range entries { select { case <-ctx.Done(): + err := ctx.Err() + if err != nil { + log.Errorf("sendMetrics: error in done context %v", err) + } + log.Debug("sendMetrics: context done") return case m <- entry: } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index ee0cf6bd5..770bf2733 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -154,6 +154,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: reports := r.getAggregatedReports() + log.Debugf("metricsThrottle: metricsReportGoroutine, got %d reports to send", len(reports)) if len(reports) > 0 { r.messagePipeline.Process(core.NewMessage(core.CommMetrics, reports)) } From 17582e727b3de12aad50bccf234b59b2347b225c Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Fri, 26 Jul 2024 14:36:05 +0100 Subject: [PATCH 04/17] parallel metrics calls --- src/core/metrics/sources/nginx_plus.go | 180 ++++++++++++++++++++++++- 1 file changed, 179 insertions(+), 1 deletion(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index e43476635..5d2ee2553 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -10,10 +10,12 @@ package sources import ( "context" "encoding/json" + "errors" "fmt" "io" "math" "net/http" + "slices" "sync" "time" @@ -50,6 +52,12 @@ type NginxPlus struct { logger *MetricSourceLogger } +type ExtendedStats struct { + *plusclient.Stats + endpoints []string + streamEndpoints []string +} + func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespace, plusAPI string, clientVersion int) *NginxPlus { return &NginxPlus{baseDimensions: baseDimensions, nginxNamespace: nginxNamespace, plusNamespace: plusNamespace, plusAPI: plusAPI, clientVersion: clientVersion, logger: NewMetricSourceLogger()} } @@ -87,7 +95,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - stats, err := cl.GetStats() + stats, err := c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -110,6 +118,159 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } +func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats, error) { + var intialStatsWg sync.WaitGroup + stats := &ExtendedStats{} + + intialStats := []struct { + target interface{} + fetchFunc interface{} + }{ + {&stats.endpoints, client.GetAvailableEndpoints}, + // before, we used error on this, if no stream endpoints we will continue and the if condition + // lower down should cater for it + {&stats.streamEndpoints, client.GetAvailableStreamEndpoints}, + {&stats.NginxInfo, client.GetNginxInfo}, + {&stats.Caches, client.GetCaches}, + {&stats.Processes, client.GetProcesses}, + {&stats.Slabs, client.GetSlabs}, + {&stats.Connections, client.GetConnections}, + {&stats.HTTPRequests, client.GetHTTPRequests}, + {&stats.SSL, client.GetSSL}, + {&stats.ServerZones, client.GetServerZones}, + {&stats.Upstreams, client.GetUpstreams}, + {&stats.LocationZones, client.GetLocationZones}, + {&stats.Resolvers, client.GetResolvers}, + {&stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, + {&stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, + {&stats.Workers, client.GetWorkers}, + } + + intialStatsErrChan := make(chan error, len(intialStats)) + + // run these functions in parallel + for _, stats := range intialStats { + intialStatsWg.Add(1) + go fetchAndAssign(&intialStatsWg, intialStatsErrChan, stats.target, stats.fetchFunc) + } + + intialStatsWg.Wait() + close(intialStatsErrChan) + + // only error if all the stats are empty + if len(intialStatsErrChan) == len(intialStats) { + return nil, errors.New("no useful metrics found") + } + + if slices.Contains(stats.endpoints, "stream") { + var streamStatsWg sync.WaitGroup + // check if these can get added to the client GetStats in a different way + endpointStats := []struct { + target interface{} + fetchFunc interface{} + statType string + }{ + {&stats.StreamServerZones, client.GetStreamServerZones, "server_zones"}, + {&stats.StreamUpstreams, client.GetStreamUpstreams, "upstreams"}, + {&stats.StreamLimitConnections, client.GetStreamConnectionsLimit, "limit_conns"}, + {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, + } + + streamStatsErrChan := make(chan error, len(endpointStats)) + + for _, stat := range endpointStats { + streamStatsWg.Add(1) + if slices.Contains(stats.streamEndpoints, stat.statType) { + go fetchAndAssign(&streamStatsWg, streamStatsErrChan, stat.target, stat.fetchFunc) + } + } + close(streamStatsErrChan) + + if len(streamStatsErrChan) == len(endpointStats) { + log.Warnf("no useful metrics found in stream stats") + } + } + + return stats.Stats, nil +} + + +func fetchData[T any](wg *sync.WaitGroup, errChan chan error, target *T, fetchFunc func() (T, error)) { + defer wg.Done() + data, err := fetchFunc() + if err != nil { + errStr := fmt.Errorf("failed to get stats: %w", err) + log.Debug(errStr) + errChan <- errStr + return + } + *target = data +} + +// this function takes the target type and matches it's function signature +func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { + defer wg.Done() + + switch t := target.(type) { + case *plusclient.Upstreams: + f := fetchFunc.(func() (plusclient.Upstreams, error)) + fetchData(wg, errChan, t, f) + case *plusclient.ServerZones: + f := fetchFunc.(func() (plusclient.ServerZones, error)) + fetchData(wg, errChan, t, f) + case *plusclient.StreamServerZones: + f := fetchFunc.(func() (plusclient.StreamServerZones, error)) + fetchData(wg, errChan, t, f) + case *plusclient.StreamUpstreams: + f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Slabs: + f := fetchFunc.(func() (plusclient.Slabs, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Caches: + f := fetchFunc.(func() (plusclient.Caches, error)) + fetchData(wg, errChan, t, f) + case *plusclient.HTTPLimitConnections: + f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) + fetchData(wg, errChan, t, f) + case *plusclient.StreamLimitConnections: + f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) + fetchData(wg, errChan, t, f) + case *plusclient.HTTPLimitRequests: + f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Resolvers: + f := fetchFunc.(func() (plusclient.Resolvers, error)) + fetchData(wg, errChan, t, f) + case *plusclient.LocationZones: + f := fetchFunc.(func() (plusclient.LocationZones, error)) + fetchData(wg, errChan, t, f) + case **plusclient.StreamZoneSync: + f := fetchFunc.(func() (*plusclient.StreamZoneSync, error)) + fetchData(wg, errChan, t, f) + case *[]*Workers: + f := fetchFunc.(func() ([]*Workers, error)) + fetchData(wg, errChan, t, f) + case *plusclient.NginxInfo: + f := fetchFunc.(func() (plusclient.NginxInfo, error)) + fetchData(wg, errChan, t, f) + case *plusclient.SSL: + f := fetchFunc.(func() (plusclient.SSL, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Connections: + f := fetchFunc.(func() (plusclient.Connections, error)) + fetchData(wg, errChan, t, f) + case *plusclient.HTTPRequests: + f := fetchFunc.(func() (plusclient.HTTPRequests, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Processes: + f := fetchFunc.(func() (plusclient.Processes, error)) + fetchData(wg, errChan, t, f) + default: + errChan <- fmt.Errorf("unsupported type: %T", target) + } +} + func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI @@ -167,6 +328,7 @@ func (c *NginxPlus) instanceMetrics(stats, prevStats *plusclient.Stats) *metrics }) dims := c.baseDimensions.ToDimensions() + return metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE) } @@ -233,6 +395,7 @@ func (c *NginxPlus) sslMetrics(stats, prevStats *plusclient.Stats) *metrics.Stat func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper { zoneMetrics := make([]*metrics.StatsEntityWrapper, 0) + for name, sz := range stats.ServerZones { l := &namedMetric{namespace: c.plusNamespace, group: "http"} @@ -311,6 +474,8 @@ func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*met zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -368,6 +533,8 @@ func (c *NginxPlus) streamServerZoneMetrics(stats, prevStats *plusclient.Stats) dims = append(dims, &proto.Dimension{Name: "server_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("stream server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -434,6 +601,7 @@ func (c *NginxPlus) locationZoneMetrics(stats, prevStats *plusclient.Stats) []*m dims = append(dims, &proto.Dimension{Name: "location_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("location zone metrics count %d", len(zoneMetrics)) return zoneMetrics } @@ -578,6 +746,7 @@ func (c *NginxPlus) httpUpstreamMetrics(stats, prevStats *plusclient.Stats) []*m upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -678,6 +847,7 @@ func (c *NginxPlus) streamUpstreamMetrics(stats, prevStats *plusclient.Stats) [] upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("stream upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -768,6 +938,8 @@ func (c *NginxPlus) cacheMetrics(stats, prevStats *plusclient.Stats) []*metrics. zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_CACHE_ZONE)) } + log.Debugf("cache metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -805,6 +977,7 @@ func (c *NginxPlus) slabMetrics(stats *plusclient.Stats) []*metrics.StatsEntityW slabMetrics = append(slabMetrics, metrics.NewStatsEntityWrapper(dims, slotSimpleMetrics, proto.MetricsReport_INSTANCE)) } } + log.Debugf("slab metrics count %d", len(slabMetrics)) return slabMetrics } @@ -824,6 +997,8 @@ func (c *NginxPlus) httpLimitConnsMetrics(stats, prevStats *plusclient.Stats) [] limitConnsMetrics = append(limitConnsMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit connection metrics count %d", len(limitConnsMetrics)) + return limitConnsMetrics } @@ -844,6 +1019,8 @@ func (c *NginxPlus) httpLimitRequestMetrics(stats, prevStats *plusclient.Stats) limitRequestMetrics = append(limitRequestMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit request metrics count %d", len(limitRequestMetrics)) + return limitRequestMetrics } @@ -877,6 +1054,7 @@ func (c *NginxPlus) workerMetrics(stats, prevStats *plusclient.Stats) []*metrics dims = append(dims, &proto.Dimension{Name: "process_id", Value: fmt.Sprint(w.ProcessID)}) workerMetrics = append(workerMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("worker metrics count %d", len(workerMetrics)) return workerMetrics } From b0063eb4da51d10644e6b29f1625ea4355fca66b Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Fri, 26 Jul 2024 14:36:32 +0100 Subject: [PATCH 05/17] deps --- .../v2/src/core/metrics/sources/nginx_plus.go | 180 +++++++++++++++++- 1 file changed, 179 insertions(+), 1 deletion(-) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index e43476635..5d2ee2553 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -10,10 +10,12 @@ package sources import ( "context" "encoding/json" + "errors" "fmt" "io" "math" "net/http" + "slices" "sync" "time" @@ -50,6 +52,12 @@ type NginxPlus struct { logger *MetricSourceLogger } +type ExtendedStats struct { + *plusclient.Stats + endpoints []string + streamEndpoints []string +} + func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespace, plusAPI string, clientVersion int) *NginxPlus { return &NginxPlus{baseDimensions: baseDimensions, nginxNamespace: nginxNamespace, plusNamespace: plusNamespace, plusAPI: plusAPI, clientVersion: clientVersion, logger: NewMetricSourceLogger()} } @@ -87,7 +95,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - stats, err := cl.GetStats() + stats, err := c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -110,6 +118,159 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } +func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats, error) { + var intialStatsWg sync.WaitGroup + stats := &ExtendedStats{} + + intialStats := []struct { + target interface{} + fetchFunc interface{} + }{ + {&stats.endpoints, client.GetAvailableEndpoints}, + // before, we used error on this, if no stream endpoints we will continue and the if condition + // lower down should cater for it + {&stats.streamEndpoints, client.GetAvailableStreamEndpoints}, + {&stats.NginxInfo, client.GetNginxInfo}, + {&stats.Caches, client.GetCaches}, + {&stats.Processes, client.GetProcesses}, + {&stats.Slabs, client.GetSlabs}, + {&stats.Connections, client.GetConnections}, + {&stats.HTTPRequests, client.GetHTTPRequests}, + {&stats.SSL, client.GetSSL}, + {&stats.ServerZones, client.GetServerZones}, + {&stats.Upstreams, client.GetUpstreams}, + {&stats.LocationZones, client.GetLocationZones}, + {&stats.Resolvers, client.GetResolvers}, + {&stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, + {&stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, + {&stats.Workers, client.GetWorkers}, + } + + intialStatsErrChan := make(chan error, len(intialStats)) + + // run these functions in parallel + for _, stats := range intialStats { + intialStatsWg.Add(1) + go fetchAndAssign(&intialStatsWg, intialStatsErrChan, stats.target, stats.fetchFunc) + } + + intialStatsWg.Wait() + close(intialStatsErrChan) + + // only error if all the stats are empty + if len(intialStatsErrChan) == len(intialStats) { + return nil, errors.New("no useful metrics found") + } + + if slices.Contains(stats.endpoints, "stream") { + var streamStatsWg sync.WaitGroup + // check if these can get added to the client GetStats in a different way + endpointStats := []struct { + target interface{} + fetchFunc interface{} + statType string + }{ + {&stats.StreamServerZones, client.GetStreamServerZones, "server_zones"}, + {&stats.StreamUpstreams, client.GetStreamUpstreams, "upstreams"}, + {&stats.StreamLimitConnections, client.GetStreamConnectionsLimit, "limit_conns"}, + {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, + } + + streamStatsErrChan := make(chan error, len(endpointStats)) + + for _, stat := range endpointStats { + streamStatsWg.Add(1) + if slices.Contains(stats.streamEndpoints, stat.statType) { + go fetchAndAssign(&streamStatsWg, streamStatsErrChan, stat.target, stat.fetchFunc) + } + } + close(streamStatsErrChan) + + if len(streamStatsErrChan) == len(endpointStats) { + log.Warnf("no useful metrics found in stream stats") + } + } + + return stats.Stats, nil +} + + +func fetchData[T any](wg *sync.WaitGroup, errChan chan error, target *T, fetchFunc func() (T, error)) { + defer wg.Done() + data, err := fetchFunc() + if err != nil { + errStr := fmt.Errorf("failed to get stats: %w", err) + log.Debug(errStr) + errChan <- errStr + return + } + *target = data +} + +// this function takes the target type and matches it's function signature +func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { + defer wg.Done() + + switch t := target.(type) { + case *plusclient.Upstreams: + f := fetchFunc.(func() (plusclient.Upstreams, error)) + fetchData(wg, errChan, t, f) + case *plusclient.ServerZones: + f := fetchFunc.(func() (plusclient.ServerZones, error)) + fetchData(wg, errChan, t, f) + case *plusclient.StreamServerZones: + f := fetchFunc.(func() (plusclient.StreamServerZones, error)) + fetchData(wg, errChan, t, f) + case *plusclient.StreamUpstreams: + f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Slabs: + f := fetchFunc.(func() (plusclient.Slabs, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Caches: + f := fetchFunc.(func() (plusclient.Caches, error)) + fetchData(wg, errChan, t, f) + case *plusclient.HTTPLimitConnections: + f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) + fetchData(wg, errChan, t, f) + case *plusclient.StreamLimitConnections: + f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) + fetchData(wg, errChan, t, f) + case *plusclient.HTTPLimitRequests: + f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Resolvers: + f := fetchFunc.(func() (plusclient.Resolvers, error)) + fetchData(wg, errChan, t, f) + case *plusclient.LocationZones: + f := fetchFunc.(func() (plusclient.LocationZones, error)) + fetchData(wg, errChan, t, f) + case **plusclient.StreamZoneSync: + f := fetchFunc.(func() (*plusclient.StreamZoneSync, error)) + fetchData(wg, errChan, t, f) + case *[]*Workers: + f := fetchFunc.(func() ([]*Workers, error)) + fetchData(wg, errChan, t, f) + case *plusclient.NginxInfo: + f := fetchFunc.(func() (plusclient.NginxInfo, error)) + fetchData(wg, errChan, t, f) + case *plusclient.SSL: + f := fetchFunc.(func() (plusclient.SSL, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Connections: + f := fetchFunc.(func() (plusclient.Connections, error)) + fetchData(wg, errChan, t, f) + case *plusclient.HTTPRequests: + f := fetchFunc.(func() (plusclient.HTTPRequests, error)) + fetchData(wg, errChan, t, f) + case *plusclient.Processes: + f := fetchFunc.(func() (plusclient.Processes, error)) + fetchData(wg, errChan, t, f) + default: + errChan <- fmt.Errorf("unsupported type: %T", target) + } +} + func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI @@ -167,6 +328,7 @@ func (c *NginxPlus) instanceMetrics(stats, prevStats *plusclient.Stats) *metrics }) dims := c.baseDimensions.ToDimensions() + return metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE) } @@ -233,6 +395,7 @@ func (c *NginxPlus) sslMetrics(stats, prevStats *plusclient.Stats) *metrics.Stat func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper { zoneMetrics := make([]*metrics.StatsEntityWrapper, 0) + for name, sz := range stats.ServerZones { l := &namedMetric{namespace: c.plusNamespace, group: "http"} @@ -311,6 +474,8 @@ func (c *NginxPlus) serverZoneMetrics(stats, prevStats *plusclient.Stats) []*met zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -368,6 +533,8 @@ func (c *NginxPlus) streamServerZoneMetrics(stats, prevStats *plusclient.Stats) dims = append(dims, &proto.Dimension{Name: "server_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("stream server zone metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -434,6 +601,7 @@ func (c *NginxPlus) locationZoneMetrics(stats, prevStats *plusclient.Stats) []*m dims = append(dims, &proto.Dimension{Name: "location_zone", Value: name}) zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("location zone metrics count %d", len(zoneMetrics)) return zoneMetrics } @@ -578,6 +746,7 @@ func (c *NginxPlus) httpUpstreamMetrics(stats, prevStats *plusclient.Stats) []*m upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -678,6 +847,7 @@ func (c *NginxPlus) streamUpstreamMetrics(stats, prevStats *plusclient.Stats) [] upstreamDims = append(upstreamDims, &proto.Dimension{Name: "upstream_zone", Value: u.Zone}) upstreamMetrics = append(upstreamMetrics, metrics.NewStatsEntityWrapper(upstreamDims, simpleMetrics, proto.MetricsReport_UPSTREAMS)) } + log.Debugf("stream upstream metrics count %d", len(upstreamMetrics)) return upstreamMetrics } @@ -768,6 +938,8 @@ func (c *NginxPlus) cacheMetrics(stats, prevStats *plusclient.Stats) []*metrics. zoneMetrics = append(zoneMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_CACHE_ZONE)) } + log.Debugf("cache metrics count %d", len(zoneMetrics)) + return zoneMetrics } @@ -805,6 +977,7 @@ func (c *NginxPlus) slabMetrics(stats *plusclient.Stats) []*metrics.StatsEntityW slabMetrics = append(slabMetrics, metrics.NewStatsEntityWrapper(dims, slotSimpleMetrics, proto.MetricsReport_INSTANCE)) } } + log.Debugf("slab metrics count %d", len(slabMetrics)) return slabMetrics } @@ -824,6 +997,8 @@ func (c *NginxPlus) httpLimitConnsMetrics(stats, prevStats *plusclient.Stats) [] limitConnsMetrics = append(limitConnsMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit connection metrics count %d", len(limitConnsMetrics)) + return limitConnsMetrics } @@ -844,6 +1019,8 @@ func (c *NginxPlus) httpLimitRequestMetrics(stats, prevStats *plusclient.Stats) limitRequestMetrics = append(limitRequestMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("http limit request metrics count %d", len(limitRequestMetrics)) + return limitRequestMetrics } @@ -877,6 +1054,7 @@ func (c *NginxPlus) workerMetrics(stats, prevStats *plusclient.Stats) []*metrics dims = append(dims, &proto.Dimension{Name: "process_id", Value: fmt.Sprint(w.ProcessID)}) workerMetrics = append(workerMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE)) } + log.Debugf("worker metrics count %d", len(workerMetrics)) return workerMetrics } From 78e97e88d765229e1b126c4a09b2064269aea4fb Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Fri, 26 Jul 2024 14:37:19 +0100 Subject: [PATCH 06/17] formatting --- src/core/metrics/sources/nginx_plus.go | 7 +++---- .../nginx/agent/v2/src/core/metrics/sources/nginx_plus.go | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 5d2ee2553..9af9a34d2 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -54,7 +54,7 @@ type NginxPlus struct { type ExtendedStats struct { *plusclient.Stats - endpoints []string + endpoints []string streamEndpoints []string } @@ -118,7 +118,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats, error) { +func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, error) { var intialStatsWg sync.WaitGroup stats := &ExtendedStats{} @@ -173,7 +173,7 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats {&stats.StreamServerZones, client.GetStreamServerZones, "server_zones"}, {&stats.StreamUpstreams, client.GetStreamUpstreams, "upstreams"}, {&stats.StreamLimitConnections, client.GetStreamConnectionsLimit, "limit_conns"}, - {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, + {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, } streamStatsErrChan := make(chan error, len(endpointStats)) @@ -194,7 +194,6 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats return stats.Stats, nil } - func fetchData[T any](wg *sync.WaitGroup, errChan chan error, target *T, fetchFunc func() (T, error)) { defer wg.Done() data, err := fetchFunc() diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 5d2ee2553..9af9a34d2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -54,7 +54,7 @@ type NginxPlus struct { type ExtendedStats struct { *plusclient.Stats - endpoints []string + endpoints []string streamEndpoints []string } @@ -118,7 +118,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats, error) { +func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, error) { var intialStatsWg sync.WaitGroup stats := &ExtendedStats{} @@ -173,7 +173,7 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats {&stats.StreamServerZones, client.GetStreamServerZones, "server_zones"}, {&stats.StreamUpstreams, client.GetStreamUpstreams, "upstreams"}, {&stats.StreamLimitConnections, client.GetStreamConnectionsLimit, "limit_conns"}, - {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, + {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, } streamStatsErrChan := make(chan error, len(endpointStats)) @@ -194,7 +194,6 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient ) (*plusclient.Stats return stats.Stats, nil } - func fetchData[T any](wg *sync.WaitGroup, errChan chan error, target *T, fetchFunc func() (T, error)) { defer wg.Done() data, err := fetchFunc() From feea17cb3bbbeafdbf33b7cfa903b93c3edd7497 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Fri, 26 Jul 2024 14:43:55 +0100 Subject: [PATCH 07/17] get stats call --- src/core/metrics/sources/nginx_plus.go | 2 +- .../nginx/agent/v2/src/core/metrics/sources/nginx_plus.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 9af9a34d2..02a1ba067 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -79,7 +79,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - c.prevStats, err = cl.GetStats() + c.prevStats, err = c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 9af9a34d2..02a1ba067 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -79,7 +79,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } - c.prevStats, err = cl.GetStats() + c.prevStats, err = c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) From ff9e7aa740eebc75f84413ab71e09cdd5095e603 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Mon, 29 Jul 2024 11:58:42 +0100 Subject: [PATCH 08/17] added in unit test for changes made --- Makefile | 2 +- src/core/metrics/sources/nginx_plus.go | 207 ++++++++++++------ src/core/metrics/sources/nginx_plus_test.go | 104 ++++++++- .../v2/src/core/metrics/sources/nginx_plus.go | 207 ++++++++++++------ 4 files changed, 379 insertions(+), 141 deletions(-) diff --git a/Makefile b/Makefile index 391caa93e..ffea503c4 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ VERSION_WITH_V := v${VERSION} LDFLAGS = "-w -X main.version=${VERSION_WITH_V} -X main.commit=${COMMIT} -X main.date=${DATE}" DEBUG_LDFLAGS = "-X main.version=${VERSION_WITH_V} -X main.commit=${COMMIT} -X main.date=${DATE}" -CERTS_DIR := ./build/certs +CERTS_DIR := ../nginx-certs PACKAGE_PREFIX := nginx-agent OSS_PACKAGES_REPO := "packages.nginx.org" PLUS_PACKAGES_REPO := "pkgs.nginx.com" diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 02a1ba067..c80131bd5 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -39,6 +39,29 @@ const ( valueFloat64Zero = float64(0) ) +type Client interface { + GetAvailableEndpoints() ([]string, error) + GetAvailableStreamEndpoints() ([]string, error) + GetStreamServerZones() (*plusclient.StreamServerZones, error) + GetStreamUpstreams() (*plusclient.StreamUpstreams, error) + GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) + GetStreamZoneSync() (*plusclient.StreamZoneSync, error) + GetNginxInfo() (*plusclient.NginxInfo, error) + GetCaches() (*plusclient.Caches, error) + GetProcesses() (*plusclient.Processes, error) + GetSlabs() (*plusclient.Slabs, error) + GetConnections() (*plusclient.Connections, error) + GetHTTPRequests() (*plusclient.HTTPRequests, error) + GetSSL() (*plusclient.SSL, error) + GetServerZones() (*plusclient.ServerZones, error) + GetUpstreams() (*plusclient.Upstreams, error) + GetLocationZones() (*plusclient.LocationZones, error) + GetResolvers() (*plusclient.Resolvers, error) + GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) + GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) + GetWorkers() ([]*plusclient.Workers, error) +} + // NginxPlus generates metrics from NGINX Plus API type NginxPlus struct { baseDimensions *metrics.CommonDim @@ -118,11 +141,36 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, error) { - var intialStatsWg sync.WaitGroup - stats := &ExtendedStats{} +func (c *NginxPlus) defatultStats() (*plusclient.Stats) { + return &plusclient.Stats{ + Upstreams: map[string]plusclient.Upstream{}, + ServerZones: map[string]plusclient.ServerZone{}, + StreamServerZones: map[string]plusclient.StreamServerZone{}, + StreamUpstreams: map[string]plusclient.StreamUpstream{}, + Slabs: map[string]plusclient.Slab{}, + Caches: map[string]plusclient.HTTPCache{}, + HTTPLimitConnections: map[string]plusclient.LimitConnection{}, + StreamLimitConnections: map[string]plusclient.LimitConnection{}, + HTTPLimitRequests: map[string]plusclient.HTTPLimitRequest{}, + Resolvers: map[string]plusclient.Resolver{}, + LocationZones: map[string]plusclient.LocationZone{}, + StreamZoneSync: &plusclient.StreamZoneSync{}, + Workers: []*plusclient.Workers{}, + NginxInfo: plusclient.NginxInfo{}, + SSL: plusclient.SSL{}, + Connections: plusclient.Connections{}, + HTTPRequests: plusclient.HTTPRequests{}, + Processes: plusclient.Processes{}, + } +} - intialStats := []struct { +func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { + var initialStatsWg sync.WaitGroup + stats := &ExtendedStats{ + Stats: c.defatultStats(), + } + + initialStats := []struct { target interface{} fetchFunc interface{} }{ @@ -130,35 +178,36 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, // before, we used error on this, if no stream endpoints we will continue and the if condition // lower down should cater for it {&stats.streamEndpoints, client.GetAvailableStreamEndpoints}, - {&stats.NginxInfo, client.GetNginxInfo}, - {&stats.Caches, client.GetCaches}, - {&stats.Processes, client.GetProcesses}, - {&stats.Slabs, client.GetSlabs}, - {&stats.Connections, client.GetConnections}, - {&stats.HTTPRequests, client.GetHTTPRequests}, - {&stats.SSL, client.GetSSL}, - {&stats.ServerZones, client.GetServerZones}, - {&stats.Upstreams, client.GetUpstreams}, - {&stats.LocationZones, client.GetLocationZones}, - {&stats.Resolvers, client.GetResolvers}, - {&stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, - {&stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, - {&stats.Workers, client.GetWorkers}, + {&stats.Stats.NginxInfo, client.GetNginxInfo}, + {&stats.Stats.Caches, client.GetCaches}, + {&stats.Stats.Processes, client.GetProcesses}, + {&stats.Stats.Slabs, client.GetSlabs}, + {&stats.Stats.Connections, client.GetConnections}, + {&stats.Stats.HTTPRequests, client.GetHTTPRequests}, + {&stats.Stats.SSL, client.GetSSL}, + {&stats.Stats.ServerZones, client.GetServerZones}, + {&stats.Stats.Upstreams, client.GetUpstreams}, + {&stats.Stats.LocationZones, client.GetLocationZones}, + {&stats.Stats.Resolvers, client.GetResolvers}, + {&stats.Stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, + {&stats.Stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, + {&stats.Stats.Workers, client.GetWorkers}, } - intialStatsErrChan := make(chan error, len(intialStats)) + initialStatsErrChan := make(chan error, len(initialStats)*2) // run these functions in parallel - for _, stats := range intialStats { - intialStatsWg.Add(1) - go fetchAndAssign(&intialStatsWg, intialStatsErrChan, stats.target, stats.fetchFunc) + for idx, stats := range initialStats { + initialStatsWg.Add(1) + go fetchAndAssign(&initialStatsWg, initialStatsErrChan, stats.target, stats.fetchFunc) + log.Print(idx) } - intialStatsWg.Wait() - close(intialStatsErrChan) + initialStatsWg.Wait() + close(initialStatsErrChan) // only error if all the stats are empty - if len(intialStatsErrChan) == len(intialStats) { + if len(initialStatsErrChan) == len(initialStats) { return nil, errors.New("no useful metrics found") } @@ -176,7 +225,7 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, } - streamStatsErrChan := make(chan error, len(endpointStats)) + streamStatsErrChan := make(chan error, len(endpointStats)*2) for _, stat := range endpointStats { streamStatsWg.Add(1) @@ -194,77 +243,97 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, return stats.Stats, nil } -func fetchData[T any](wg *sync.WaitGroup, errChan chan error, target *T, fetchFunc func() (T, error)) { - defer wg.Done() +func fetchData[T any]( + errChan chan error, + target *T, + fetchFunc func() (*T, error), +) { data, err := fetchFunc() if err != nil { - errStr := fmt.Errorf("failed to get stats: %w", err) - log.Debug(errStr) - errChan <- errStr + errChan <- fmt.Errorf("failed to get stats: %w", err) return } - *target = data + //nolint:ineffassign + target = data } +func fetchDataVal[T any]( + errChan chan error, + target T, + fetchFunc func() (T, error), +) { + data, err := fetchFunc() + if err != nil { + errChan <- fmt.Errorf("failed to get stats: %w", err) + return + } + //nolint:ineffassign + target = data +} + + // this function takes the target type and matches it's function signature func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { defer wg.Done() switch t := target.(type) { + case []string: + f := fetchFunc.(func() ([]string, error)) + fetchDataVal(errChan, t, f) case *plusclient.Upstreams: - f := fetchFunc.(func() (plusclient.Upstreams, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Upstreams, error)) + fetchData(errChan, t, f) case *plusclient.ServerZones: - f := fetchFunc.(func() (plusclient.ServerZones, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.ServerZones, error)) + fetchData(errChan, t, f) case *plusclient.StreamServerZones: - f := fetchFunc.(func() (plusclient.StreamServerZones, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.StreamServerZones, error)) + fetchData(errChan, t, f) case *plusclient.StreamUpstreams: - f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.StreamUpstreams, error)) + fetchData(errChan, t, f) case *plusclient.Slabs: - f := fetchFunc.(func() (plusclient.Slabs, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Slabs, error)) + fetchData(errChan, t, f) case *plusclient.Caches: - f := fetchFunc.(func() (plusclient.Caches, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Caches, error)) + fetchData(errChan, t, f) case *plusclient.HTTPLimitConnections: - f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.HTTPLimitConnections, error)) + fetchData(errChan, t, f) case *plusclient.StreamLimitConnections: - f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.StreamLimitConnections, error)) + fetchData(errChan, t, f) case *plusclient.HTTPLimitRequests: - f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.HTTPLimitRequests, error)) + fetchData(errChan, t, f) case *plusclient.Resolvers: - f := fetchFunc.(func() (plusclient.Resolvers, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Resolvers, error)) + fetchData(errChan, t, f) case *plusclient.LocationZones: - f := fetchFunc.(func() (plusclient.LocationZones, error)) - fetchData(wg, errChan, t, f) - case **plusclient.StreamZoneSync: + f := fetchFunc.(func() (*plusclient.LocationZones, error)) + fetchData(errChan, t, f) + case *plusclient.StreamZoneSync: f := fetchFunc.(func() (*plusclient.StreamZoneSync, error)) - fetchData(wg, errChan, t, f) - case *[]*Workers: - f := fetchFunc.(func() ([]*Workers, error)) - fetchData(wg, errChan, t, f) + fetchData(errChan, t, f) + case []*plusclient.Workers: + f := fetchFunc.(func() (*[]*plusclient.Workers, error)) + fetchData(errChan, &t, f) case *plusclient.NginxInfo: - f := fetchFunc.(func() (plusclient.NginxInfo, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.NginxInfo, error)) + fetchData(errChan, t, f) case *plusclient.SSL: - f := fetchFunc.(func() (plusclient.SSL, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.SSL, error)) + fetchData(errChan, t, f) case *plusclient.Connections: - f := fetchFunc.(func() (plusclient.Connections, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Connections, error)) + fetchData(errChan, t, f) case *plusclient.HTTPRequests: - f := fetchFunc.(func() (plusclient.HTTPRequests, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.HTTPRequests, error)) + fetchData(errChan, t, f) case *plusclient.Processes: - f := fetchFunc.(func() (plusclient.Processes, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Processes, error)) + fetchData(errChan, t, f) default: errChan <- fmt.Errorf("unsupported type: %T", target) } diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index dd845411b..e0d41f0a5 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -12,6 +12,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "reflect" "sync" "testing" @@ -54,8 +55,6 @@ const ( currentPeer2UpstreamHeaderTime = 80 currentPeer1UpstreamResponseTime = 100 currentPeer2UpstreamResponseTime = 80 - currentUpstreamResponseTime = 100 - currentUpstreamConnectTime = 80 currentUpstreamFirstByteTime = 50 previousUpstreamHeaderTime = 98 previousUpstreamResponseTime = 98 @@ -556,6 +555,107 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< f.sendMetrics(ctx, m, f.collectMetrics(&stats, &prevStats)...) } +var _ Client = (*MockClient)(nil) + +type MockClient struct{ +} + +func (m *MockClient) GetAvailableEndpoints() ([]string, error) { + return []string{"stream"}, nil +} + +func (m *MockClient) GetAvailableStreamEndpoints() ([]string, error) { + return []string{"server_zones", "upstreams", "limit_conns", "zone_sync"}, nil +} + +func (m *MockClient) GetStreamServerZones() (*plusclient.StreamServerZones, error) { + return &plusclient.StreamServerZones{}, nil +} + +func (m *MockClient) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) { + return &plusclient.StreamUpstreams{}, nil +} + +func (m *MockClient) GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) { + return &plusclient.StreamLimitConnections{}, nil +} + +func (m *MockClient) GetStreamZoneSync() (*plusclient.StreamZoneSync, error) { + return &plusclient.StreamZoneSync{}, nil +} + +func (m *MockClient) GetNginxInfo() (*plusclient.NginxInfo, error) { + return &plusclient.NginxInfo{}, nil +} + +func (m *MockClient) GetCaches() (*plusclient.Caches, error) { + return &plusclient.Caches{}, nil +} + +func (m *MockClient) GetProcesses() (*plusclient.Processes, error) { + return &plusclient.Processes{}, nil +} + +func (m *MockClient) GetSlabs() (*plusclient.Slabs, error) { + return &plusclient.Slabs{}, nil +} + +func (m *MockClient) GetConnections() (*plusclient.Connections, error) { + return &plusclient.Connections{}, nil +} + +func (m *MockClient) GetHTTPRequests() (*plusclient.HTTPRequests, error) { + return &plusclient.HTTPRequests{}, nil +} + +func (m *MockClient) GetSSL() (*plusclient.SSL, error) { + return &plusclient.SSL{}, nil +} + +func (m *MockClient) GetServerZones() (*plusclient.ServerZones, error) { + return &plusclient.ServerZones{}, nil +} + +func (m *MockClient) GetUpstreams() (*plusclient.Upstreams, error) { + return &plusclient.Upstreams{}, nil +} + +func (m *MockClient) GetLocationZones() (*plusclient.LocationZones, error) { + return &plusclient.LocationZones{}, nil +} + +func (m *MockClient) GetResolvers() (*plusclient.Resolvers, error) { + return &plusclient.Resolvers{}, nil +} + +func (m *MockClient) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) { + return &plusclient.HTTPLimitRequests{}, nil +} + +func (m *MockClient) GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) { + return &plusclient.HTTPLimitConnections{}, nil +} + +func (m *MockClient) GetWorkers() ([]*plusclient.Workers, error) { + return []*plusclient.Workers{}, nil +} + +func TestGetStats(t *testing.T) { + client := &MockClient{} + + source := NewNginxPlus(nil, "", "", "", 9) + expectedStats := source.defatultStats() + + stats, err := source.getStats(client) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if !reflect.DeepEqual(stats, expectedStats) { + t.Fatalf("expected %v, got %v", expectedStats, stats) + } +} + func TestNginxPlusUpdate(t *testing.T) { nginxPlus := NewNginxPlus(&metrics.CommonDim{}, "test", PlusNamespace, "http://localhost:8080/api", 6) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 02a1ba067..c80131bd5 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -39,6 +39,29 @@ const ( valueFloat64Zero = float64(0) ) +type Client interface { + GetAvailableEndpoints() ([]string, error) + GetAvailableStreamEndpoints() ([]string, error) + GetStreamServerZones() (*plusclient.StreamServerZones, error) + GetStreamUpstreams() (*plusclient.StreamUpstreams, error) + GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) + GetStreamZoneSync() (*plusclient.StreamZoneSync, error) + GetNginxInfo() (*plusclient.NginxInfo, error) + GetCaches() (*plusclient.Caches, error) + GetProcesses() (*plusclient.Processes, error) + GetSlabs() (*plusclient.Slabs, error) + GetConnections() (*plusclient.Connections, error) + GetHTTPRequests() (*plusclient.HTTPRequests, error) + GetSSL() (*plusclient.SSL, error) + GetServerZones() (*plusclient.ServerZones, error) + GetUpstreams() (*plusclient.Upstreams, error) + GetLocationZones() (*plusclient.LocationZones, error) + GetResolvers() (*plusclient.Resolvers, error) + GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) + GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) + GetWorkers() ([]*plusclient.Workers, error) +} + // NginxPlus generates metrics from NGINX Plus API type NginxPlus struct { baseDimensions *metrics.CommonDim @@ -118,11 +141,36 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, error) { - var intialStatsWg sync.WaitGroup - stats := &ExtendedStats{} +func (c *NginxPlus) defatultStats() (*plusclient.Stats) { + return &plusclient.Stats{ + Upstreams: map[string]plusclient.Upstream{}, + ServerZones: map[string]plusclient.ServerZone{}, + StreamServerZones: map[string]plusclient.StreamServerZone{}, + StreamUpstreams: map[string]plusclient.StreamUpstream{}, + Slabs: map[string]plusclient.Slab{}, + Caches: map[string]plusclient.HTTPCache{}, + HTTPLimitConnections: map[string]plusclient.LimitConnection{}, + StreamLimitConnections: map[string]plusclient.LimitConnection{}, + HTTPLimitRequests: map[string]plusclient.HTTPLimitRequest{}, + Resolvers: map[string]plusclient.Resolver{}, + LocationZones: map[string]plusclient.LocationZone{}, + StreamZoneSync: &plusclient.StreamZoneSync{}, + Workers: []*plusclient.Workers{}, + NginxInfo: plusclient.NginxInfo{}, + SSL: plusclient.SSL{}, + Connections: plusclient.Connections{}, + HTTPRequests: plusclient.HTTPRequests{}, + Processes: plusclient.Processes{}, + } +} - intialStats := []struct { +func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { + var initialStatsWg sync.WaitGroup + stats := &ExtendedStats{ + Stats: c.defatultStats(), + } + + initialStats := []struct { target interface{} fetchFunc interface{} }{ @@ -130,35 +178,36 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, // before, we used error on this, if no stream endpoints we will continue and the if condition // lower down should cater for it {&stats.streamEndpoints, client.GetAvailableStreamEndpoints}, - {&stats.NginxInfo, client.GetNginxInfo}, - {&stats.Caches, client.GetCaches}, - {&stats.Processes, client.GetProcesses}, - {&stats.Slabs, client.GetSlabs}, - {&stats.Connections, client.GetConnections}, - {&stats.HTTPRequests, client.GetHTTPRequests}, - {&stats.SSL, client.GetSSL}, - {&stats.ServerZones, client.GetServerZones}, - {&stats.Upstreams, client.GetUpstreams}, - {&stats.LocationZones, client.GetLocationZones}, - {&stats.Resolvers, client.GetResolvers}, - {&stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, - {&stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, - {&stats.Workers, client.GetWorkers}, + {&stats.Stats.NginxInfo, client.GetNginxInfo}, + {&stats.Stats.Caches, client.GetCaches}, + {&stats.Stats.Processes, client.GetProcesses}, + {&stats.Stats.Slabs, client.GetSlabs}, + {&stats.Stats.Connections, client.GetConnections}, + {&stats.Stats.HTTPRequests, client.GetHTTPRequests}, + {&stats.Stats.SSL, client.GetSSL}, + {&stats.Stats.ServerZones, client.GetServerZones}, + {&stats.Stats.Upstreams, client.GetUpstreams}, + {&stats.Stats.LocationZones, client.GetLocationZones}, + {&stats.Stats.Resolvers, client.GetResolvers}, + {&stats.Stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, + {&stats.Stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, + {&stats.Stats.Workers, client.GetWorkers}, } - intialStatsErrChan := make(chan error, len(intialStats)) + initialStatsErrChan := make(chan error, len(initialStats)*2) // run these functions in parallel - for _, stats := range intialStats { - intialStatsWg.Add(1) - go fetchAndAssign(&intialStatsWg, intialStatsErrChan, stats.target, stats.fetchFunc) + for idx, stats := range initialStats { + initialStatsWg.Add(1) + go fetchAndAssign(&initialStatsWg, initialStatsErrChan, stats.target, stats.fetchFunc) + log.Print(idx) } - intialStatsWg.Wait() - close(intialStatsErrChan) + initialStatsWg.Wait() + close(initialStatsErrChan) // only error if all the stats are empty - if len(intialStatsErrChan) == len(intialStats) { + if len(initialStatsErrChan) == len(initialStats) { return nil, errors.New("no useful metrics found") } @@ -176,7 +225,7 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, } - streamStatsErrChan := make(chan error, len(endpointStats)) + streamStatsErrChan := make(chan error, len(endpointStats)*2) for _, stat := range endpointStats { streamStatsWg.Add(1) @@ -194,77 +243,97 @@ func (c *NginxPlus) getStats(client *plusclient.NginxClient) (*plusclient.Stats, return stats.Stats, nil } -func fetchData[T any](wg *sync.WaitGroup, errChan chan error, target *T, fetchFunc func() (T, error)) { - defer wg.Done() +func fetchData[T any]( + errChan chan error, + target *T, + fetchFunc func() (*T, error), +) { data, err := fetchFunc() if err != nil { - errStr := fmt.Errorf("failed to get stats: %w", err) - log.Debug(errStr) - errChan <- errStr + errChan <- fmt.Errorf("failed to get stats: %w", err) return } - *target = data + //nolint:ineffassign + target = data } +func fetchDataVal[T any]( + errChan chan error, + target T, + fetchFunc func() (T, error), +) { + data, err := fetchFunc() + if err != nil { + errChan <- fmt.Errorf("failed to get stats: %w", err) + return + } + //nolint:ineffassign + target = data +} + + // this function takes the target type and matches it's function signature func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { defer wg.Done() switch t := target.(type) { + case []string: + f := fetchFunc.(func() ([]string, error)) + fetchDataVal(errChan, t, f) case *plusclient.Upstreams: - f := fetchFunc.(func() (plusclient.Upstreams, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Upstreams, error)) + fetchData(errChan, t, f) case *plusclient.ServerZones: - f := fetchFunc.(func() (plusclient.ServerZones, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.ServerZones, error)) + fetchData(errChan, t, f) case *plusclient.StreamServerZones: - f := fetchFunc.(func() (plusclient.StreamServerZones, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.StreamServerZones, error)) + fetchData(errChan, t, f) case *plusclient.StreamUpstreams: - f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.StreamUpstreams, error)) + fetchData(errChan, t, f) case *plusclient.Slabs: - f := fetchFunc.(func() (plusclient.Slabs, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Slabs, error)) + fetchData(errChan, t, f) case *plusclient.Caches: - f := fetchFunc.(func() (plusclient.Caches, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Caches, error)) + fetchData(errChan, t, f) case *plusclient.HTTPLimitConnections: - f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.HTTPLimitConnections, error)) + fetchData(errChan, t, f) case *plusclient.StreamLimitConnections: - f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.StreamLimitConnections, error)) + fetchData(errChan, t, f) case *plusclient.HTTPLimitRequests: - f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.HTTPLimitRequests, error)) + fetchData(errChan, t, f) case *plusclient.Resolvers: - f := fetchFunc.(func() (plusclient.Resolvers, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Resolvers, error)) + fetchData(errChan, t, f) case *plusclient.LocationZones: - f := fetchFunc.(func() (plusclient.LocationZones, error)) - fetchData(wg, errChan, t, f) - case **plusclient.StreamZoneSync: + f := fetchFunc.(func() (*plusclient.LocationZones, error)) + fetchData(errChan, t, f) + case *plusclient.StreamZoneSync: f := fetchFunc.(func() (*plusclient.StreamZoneSync, error)) - fetchData(wg, errChan, t, f) - case *[]*Workers: - f := fetchFunc.(func() ([]*Workers, error)) - fetchData(wg, errChan, t, f) + fetchData(errChan, t, f) + case []*plusclient.Workers: + f := fetchFunc.(func() (*[]*plusclient.Workers, error)) + fetchData(errChan, &t, f) case *plusclient.NginxInfo: - f := fetchFunc.(func() (plusclient.NginxInfo, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.NginxInfo, error)) + fetchData(errChan, t, f) case *plusclient.SSL: - f := fetchFunc.(func() (plusclient.SSL, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.SSL, error)) + fetchData(errChan, t, f) case *plusclient.Connections: - f := fetchFunc.(func() (plusclient.Connections, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Connections, error)) + fetchData(errChan, t, f) case *plusclient.HTTPRequests: - f := fetchFunc.(func() (plusclient.HTTPRequests, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.HTTPRequests, error)) + fetchData(errChan, t, f) case *plusclient.Processes: - f := fetchFunc.(func() (plusclient.Processes, error)) - fetchData(wg, errChan, t, f) + f := fetchFunc.(func() (*plusclient.Processes, error)) + fetchData(errChan, t, f) default: errChan <- fmt.Errorf("unsupported type: %T", target) } From f668f9aec872601c41913cbfdbcdf18e1a588b86 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Mon, 29 Jul 2024 11:59:38 +0100 Subject: [PATCH 09/17] added in unit test for changes made --- src/core/metrics/sources/nginx_plus.go | 29 +++++++++---------- src/core/metrics/sources/nginx_plus_test.go | 3 +- .../v2/src/core/metrics/sources/nginx_plus.go | 29 +++++++++---------- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index c80131bd5..e460cabda 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -42,23 +42,23 @@ const ( type Client interface { GetAvailableEndpoints() ([]string, error) GetAvailableStreamEndpoints() ([]string, error) - GetStreamServerZones() (*plusclient.StreamServerZones, error) + GetStreamServerZones() (*plusclient.StreamServerZones, error) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) - GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) - GetStreamZoneSync() (*plusclient.StreamZoneSync, error) - GetNginxInfo() (*plusclient.NginxInfo, error) - GetCaches() (*plusclient.Caches, error) - GetProcesses() (*plusclient.Processes, error) - GetSlabs() (*plusclient.Slabs, error) - GetConnections() (*plusclient.Connections, error) - GetHTTPRequests() (*plusclient.HTTPRequests, error) - GetSSL() (*plusclient.SSL, error) - GetServerZones() (*plusclient.ServerZones, error) + GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) + GetStreamZoneSync() (*plusclient.StreamZoneSync, error) + GetNginxInfo() (*plusclient.NginxInfo, error) + GetCaches() (*plusclient.Caches, error) + GetProcesses() (*plusclient.Processes, error) + GetSlabs() (*plusclient.Slabs, error) + GetConnections() (*plusclient.Connections, error) + GetHTTPRequests() (*plusclient.HTTPRequests, error) + GetSSL() (*plusclient.SSL, error) + GetServerZones() (*plusclient.ServerZones, error) GetUpstreams() (*plusclient.Upstreams, error) GetLocationZones() (*plusclient.LocationZones, error) - GetResolvers() (*plusclient.Resolvers, error) + GetResolvers() (*plusclient.Resolvers, error) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) - GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) + GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) GetWorkers() ([]*plusclient.Workers, error) } @@ -141,7 +141,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) defatultStats() (*plusclient.Stats) { +func (c *NginxPlus) defatultStats() *plusclient.Stats { return &plusclient.Stats{ Upstreams: map[string]plusclient.Upstream{}, ServerZones: map[string]plusclient.ServerZone{}, @@ -271,7 +271,6 @@ func fetchDataVal[T any]( target = data } - // this function takes the target type and matches it's function signature func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { defer wg.Done() diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index e0d41f0a5..19ddbb119 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -557,8 +557,7 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< var _ Client = (*MockClient)(nil) -type MockClient struct{ -} +type MockClient struct{} func (m *MockClient) GetAvailableEndpoints() ([]string, error) { return []string{"stream"}, nil diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index c80131bd5..e460cabda 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -42,23 +42,23 @@ const ( type Client interface { GetAvailableEndpoints() ([]string, error) GetAvailableStreamEndpoints() ([]string, error) - GetStreamServerZones() (*plusclient.StreamServerZones, error) + GetStreamServerZones() (*plusclient.StreamServerZones, error) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) - GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) - GetStreamZoneSync() (*plusclient.StreamZoneSync, error) - GetNginxInfo() (*plusclient.NginxInfo, error) - GetCaches() (*plusclient.Caches, error) - GetProcesses() (*plusclient.Processes, error) - GetSlabs() (*plusclient.Slabs, error) - GetConnections() (*plusclient.Connections, error) - GetHTTPRequests() (*plusclient.HTTPRequests, error) - GetSSL() (*plusclient.SSL, error) - GetServerZones() (*plusclient.ServerZones, error) + GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) + GetStreamZoneSync() (*plusclient.StreamZoneSync, error) + GetNginxInfo() (*plusclient.NginxInfo, error) + GetCaches() (*plusclient.Caches, error) + GetProcesses() (*plusclient.Processes, error) + GetSlabs() (*plusclient.Slabs, error) + GetConnections() (*plusclient.Connections, error) + GetHTTPRequests() (*plusclient.HTTPRequests, error) + GetSSL() (*plusclient.SSL, error) + GetServerZones() (*plusclient.ServerZones, error) GetUpstreams() (*plusclient.Upstreams, error) GetLocationZones() (*plusclient.LocationZones, error) - GetResolvers() (*plusclient.Resolvers, error) + GetResolvers() (*plusclient.Resolvers, error) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) - GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) + GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) GetWorkers() ([]*plusclient.Workers, error) } @@ -141,7 +141,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) defatultStats() (*plusclient.Stats) { +func (c *NginxPlus) defatultStats() *plusclient.Stats { return &plusclient.Stats{ Upstreams: map[string]plusclient.Upstream{}, ServerZones: map[string]plusclient.ServerZone{}, @@ -271,7 +271,6 @@ func fetchDataVal[T any]( target = data } - // this function takes the target type and matches it's function signature func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { defer wg.Done() From bd532ef83b6f879e3c0b6b4803f8af632c50505e Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Mon, 29 Jul 2024 13:15:22 +0100 Subject: [PATCH 10/17] revert makefile change --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index ffea503c4..391caa93e 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ VERSION_WITH_V := v${VERSION} LDFLAGS = "-w -X main.version=${VERSION_WITH_V} -X main.commit=${COMMIT} -X main.date=${DATE}" DEBUG_LDFLAGS = "-X main.version=${VERSION_WITH_V} -X main.commit=${COMMIT} -X main.date=${DATE}" -CERTS_DIR := ../nginx-certs +CERTS_DIR := ./build/certs PACKAGE_PREFIX := nginx-agent OSS_PACKAGES_REPO := "packages.nginx.org" PLUS_PACKAGES_REPO := "pkgs.nginx.com" From e12c7182f9ae435f7f262fada479973dfc48cab2 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Wed, 31 Jul 2024 14:34:02 +0100 Subject: [PATCH 11/17] wip: issues with the generics --- src/core/metrics/sources/nginx_plus.go | 118 ++++++------ src/core/metrics/sources/nginx_plus_test.go | 201 +++++++++++++++++--- 2 files changed, 235 insertions(+), 84 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index e460cabda..cc32e95c9 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -118,6 +118,8 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } + log.Debug("NGINX_plus_Collect: getting stats") + stats, err := c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) @@ -125,6 +127,8 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } + log.Debug("NGINX_plus_Collect: got stats") + if c.prevStats == nil { c.prevStats = stats } @@ -135,6 +139,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxBuild = stats.NginxInfo.Build c.baseDimensions.NginxVersion = stats.NginxInfo.Version + log.Debugf("NGINX_plus_Collect: collecting stats %v", stats) c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) log.Debug("NGINX_plus_Collect: metrics sent") @@ -197,10 +202,9 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { initialStatsErrChan := make(chan error, len(initialStats)*2) // run these functions in parallel - for idx, stats := range initialStats { + for _, stats := range initialStats { initialStatsWg.Add(1) go fetchAndAssign(&initialStatsWg, initialStatsErrChan, stats.target, stats.fetchFunc) - log.Print(idx) } initialStatsWg.Wait() @@ -253,8 +257,10 @@ func fetchData[T any]( errChan <- fmt.Errorf("failed to get stats: %w", err) return } - //nolint:ineffassign + // nolint:ineffassign target = data + log.Printf("target %v", target) + log.Printf("data %v", data) } func fetchDataVal[T any]( @@ -267,8 +273,10 @@ func fetchDataVal[T any]( errChan <- fmt.Errorf("failed to get stats: %w", err) return } - //nolint:ineffassign + // nolint:ineffassign target = data + log.Printf("target %v", target) + log.Printf("data %v", data) } // this function takes the target type and matches it's function signature @@ -282,57 +290,57 @@ func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, case *plusclient.Upstreams: f := fetchFunc.(func() (*plusclient.Upstreams, error)) fetchData(errChan, t, f) - case *plusclient.ServerZones: - f := fetchFunc.(func() (*plusclient.ServerZones, error)) - fetchData(errChan, t, f) - case *plusclient.StreamServerZones: - f := fetchFunc.(func() (*plusclient.StreamServerZones, error)) - fetchData(errChan, t, f) - case *plusclient.StreamUpstreams: - f := fetchFunc.(func() (*plusclient.StreamUpstreams, error)) - fetchData(errChan, t, f) - case *plusclient.Slabs: - f := fetchFunc.(func() (*plusclient.Slabs, error)) - fetchData(errChan, t, f) - case *plusclient.Caches: - f := fetchFunc.(func() (*plusclient.Caches, error)) - fetchData(errChan, t, f) - case *plusclient.HTTPLimitConnections: - f := fetchFunc.(func() (*plusclient.HTTPLimitConnections, error)) - fetchData(errChan, t, f) - case *plusclient.StreamLimitConnections: - f := fetchFunc.(func() (*plusclient.StreamLimitConnections, error)) - fetchData(errChan, t, f) - case *plusclient.HTTPLimitRequests: - f := fetchFunc.(func() (*plusclient.HTTPLimitRequests, error)) - fetchData(errChan, t, f) - case *plusclient.Resolvers: - f := fetchFunc.(func() (*plusclient.Resolvers, error)) - fetchData(errChan, t, f) - case *plusclient.LocationZones: - f := fetchFunc.(func() (*plusclient.LocationZones, error)) - fetchData(errChan, t, f) - case *plusclient.StreamZoneSync: - f := fetchFunc.(func() (*plusclient.StreamZoneSync, error)) - fetchData(errChan, t, f) - case []*plusclient.Workers: - f := fetchFunc.(func() (*[]*plusclient.Workers, error)) - fetchData(errChan, &t, f) - case *plusclient.NginxInfo: - f := fetchFunc.(func() (*plusclient.NginxInfo, error)) - fetchData(errChan, t, f) - case *plusclient.SSL: - f := fetchFunc.(func() (*plusclient.SSL, error)) - fetchData(errChan, t, f) - case *plusclient.Connections: - f := fetchFunc.(func() (*plusclient.Connections, error)) - fetchData(errChan, t, f) - case *plusclient.HTTPRequests: - f := fetchFunc.(func() (*plusclient.HTTPRequests, error)) - fetchData(errChan, t, f) - case *plusclient.Processes: - f := fetchFunc.(func() (*plusclient.Processes, error)) - fetchData(errChan, t, f) + // case *plusclient.ServerZones: + // f := fetchFunc.(func() (plusclient.ServerZones, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamServerZones: + // f := fetchFunc.(func() (plusclient.StreamServerZones, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamUpstreams: + // f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) + // fetchData(errChan, t, f) + // case *plusclient.Slabs: + // f := fetchFunc.(func() (plusclient.Slabs, error)) + // fetchData(errChan, t, f) + // case *plusclient.Caches: + // f := fetchFunc.(func() (plusclient.Caches, error)) + // fetchData(errChan, t, f) + // case *plusclient.HTTPLimitConnections: + // f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamLimitConnections: + // f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) + // fetchData(errChan, t, f) + // case *plusclient.HTTPLimitRequests: + // f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) + // fetchData(errChan, t, f) + // case *plusclient.Resolvers: + // f := fetchFunc.(func() (plusclient.Resolvers, error)) + // fetchData(errChan, t, f) + // case *plusclient.LocationZones: + // f := fetchFunc.(func() (plusclient.LocationZones, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamZoneSync: + // f := fetchFunc.(func() (plusclient.StreamZoneSync, error)) + // fetchData(errChan, t, f) + // case []*plusclient.Workers: + // f := fetchFunc.(func() ([]*plusclient.Workers, error)) + // fetchData(errChan, &t, f) + // case *plusclient.NginxInfo: + // f := fetchFunc.(func() (plusclient.NginxInfo, error)) + // fetchData(errChan, t, f) + // case *plusclient.SSL: + // f := fetchFunc.(func() (plusclient.SSL, error)) + // fetchData(errChan, t, f) + // case *plusclient.Connections: + // f := fetchFunc.(func() (plusclient.Connections, error)) + // fetchData(errChan, t, f) + // case *plusclient.HTTPRequests: + // f := fetchFunc.(func() (plusclient.HTTPRequests, error)) + // fetchData(errChan, t, f) + // case *plusclient.Processes: + // f := fetchFunc.(func() (plusclient.Processes, error)) + // fetchData(errChan, t, f) default: errChan <- fmt.Errorf("unsupported type: %T", target) } diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index 19ddbb119..78c03f816 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -96,14 +96,9 @@ const ( workerProcessID = 12345 ) -type FakeNginxPlus struct { - *NginxPlus -} - -// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api -func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { - defer wg.Done() - stats := plusclient.Stats{ +var ( + availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"} + stats = plusclient.Stats{ HTTPRequests: plusclient.HTTPRequests{ Total: currentHTTPRequestTotal, Current: currentHTTPRequestCurrent, @@ -386,8 +381,7 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< }, }, } - - prevStats := plusclient.Stats{ + prevStats = plusclient.Stats{ HTTPRequests: plusclient.HTTPRequests{ Total: previousHTTPRequestTotal, Current: previousHTTPRequestCurrent, @@ -546,6 +540,15 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan< }, }, } +) + +type FakeNginxPlus struct { + *NginxPlus +} + +// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api +func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { + defer wg.Done() f.baseDimensions.NginxType = "plus" f.baseDimensions.PublishedAPI = f.plusAPI @@ -564,79 +567,82 @@ func (m *MockClient) GetAvailableEndpoints() ([]string, error) { } func (m *MockClient) GetAvailableStreamEndpoints() ([]string, error) { - return []string{"server_zones", "upstreams", "limit_conns", "zone_sync"}, nil + return availableZones, nil } func (m *MockClient) GetStreamServerZones() (*plusclient.StreamServerZones, error) { - return &plusclient.StreamServerZones{}, nil + return &stats.StreamServerZones, nil } func (m *MockClient) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) { - return &plusclient.StreamUpstreams{}, nil + return &stats.StreamUpstreams, nil } func (m *MockClient) GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) { - return &plusclient.StreamLimitConnections{}, nil + return &stats.StreamLimitConnections, nil } func (m *MockClient) GetStreamZoneSync() (*plusclient.StreamZoneSync, error) { - return &plusclient.StreamZoneSync{}, nil + return &plusclient.StreamZoneSync{ + Zones: stats.StreamZoneSync.Zones, + Status: stats.StreamZoneSync.Status, + }, nil } func (m *MockClient) GetNginxInfo() (*plusclient.NginxInfo, error) { - return &plusclient.NginxInfo{}, nil + return &stats.NginxInfo, nil } func (m *MockClient) GetCaches() (*plusclient.Caches, error) { - return &plusclient.Caches{}, nil + return &stats.Caches, nil } func (m *MockClient) GetProcesses() (*plusclient.Processes, error) { - return &plusclient.Processes{}, nil + return &stats.Processes, nil } func (m *MockClient) GetSlabs() (*plusclient.Slabs, error) { - return &plusclient.Slabs{}, nil + return &stats.Slabs, nil } func (m *MockClient) GetConnections() (*plusclient.Connections, error) { - return &plusclient.Connections{}, nil + return &stats.Connections, nil } func (m *MockClient) GetHTTPRequests() (*plusclient.HTTPRequests, error) { - return &plusclient.HTTPRequests{}, nil + return &stats.HTTPRequests, nil } func (m *MockClient) GetSSL() (*plusclient.SSL, error) { - return &plusclient.SSL{}, nil + return &stats.SSL, nil } func (m *MockClient) GetServerZones() (*plusclient.ServerZones, error) { - return &plusclient.ServerZones{}, nil + return &stats.ServerZones, nil } func (m *MockClient) GetUpstreams() (*plusclient.Upstreams, error) { - return &plusclient.Upstreams{}, nil + return &stats.Upstreams, nil } func (m *MockClient) GetLocationZones() (*plusclient.LocationZones, error) { - return &plusclient.LocationZones{}, nil + return &stats.LocationZones, nil } func (m *MockClient) GetResolvers() (*plusclient.Resolvers, error) { - return &plusclient.Resolvers{}, nil + return &stats.Resolvers, nil } func (m *MockClient) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) { - return &plusclient.HTTPLimitRequests{}, nil + return &stats.HTTPLimitRequests, nil } func (m *MockClient) GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) { - return &plusclient.HTTPLimitConnections{}, nil + return &stats.HTTPLimitConnections, nil } func (m *MockClient) GetWorkers() ([]*plusclient.Workers, error) { - return []*plusclient.Workers{}, nil + return stats.Workers, nil } func TestGetStats(t *testing.T) { @@ -655,6 +661,143 @@ func TestGetStats(t *testing.T) { } } +func TestFetchData(t *testing.T) { + type TestCase[T any] struct { + name string + fetchFunc func() (T, error) + expectedValue T + expectError bool + } + + pointerCases := []TestCase[*plusclient.Upstreams]{ + { + name: "Successful fetch", + fetchFunc: func() (*plusclient.Upstreams, error) { + return &stats.Upstreams, nil + }, + expectedValue: &stats.Upstreams, + expectError: false, + }, + { + name: "Fetch error", + fetchFunc: func() (*plusclient.Upstreams, error) { + return nil, fmt.Errorf("fetch error") + }, + expectedValue: nil, + expectError: true, + }, + } + + stats := &plusclient.Stats{} + for _, tc := range pointerCases { + t.Run(tc.name, func(t *testing.T) { + errChan := make(chan error, 1) + + fetchData(errChan, &stats.Upstreams, tc.fetchFunc) + close(errChan) + + if tc.expectError { + if len(errChan) == 0 { + t.Errorf("Expected an error, but got none") + } + } else { + if len(errChan) != 0 { + t.Errorf("Expected no errors, but got %d", len(errChan)) + } + if len(stats.Upstreams) == 0 { + t.Errorf("Expected target to be '%v', but got '%v'", tc.expectedValue, stats.Upstreams) + } + assert.Equal(t, stats.Upstreams, tc.expectedValue) + } + }) + } +} + +func TestFetchAndAssign(t *testing.T) { + type TestCase struct { + name string + target interface{} + fetchFunc interface{} + expectedValue interface{} + expectError bool + } + client := MockClient{} + // Define the test cases + testCases := []TestCase{ + { + name: "Fetch Upstreams", + target: new(plusclient.Upstreams), + fetchFunc: client.GetUpstreams, + expectedValue: stats.Upstreams, + expectError: false, + }, + // { + // name: "Fetch ServerZones", + // target: new(plusclient.ServerZones), + // fetchFunc: client.GetServerZones, + // expectedValue: stats.ServerZones, + // expectError: false, + // }, + { + name: "Fetch Available Stream Endpoints", + target: []string{}, + fetchFunc: client.GetAvailableStreamEndpoints, + expectedValue: availableZones, + expectError: false, + }, + { + name: "Unsupported Type", + target: new(int), + fetchFunc: func() (int, error) { return 0, nil }, + expectedValue: 0, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var wg sync.WaitGroup + errChan := make(chan error, 1) + + wg.Add(1) + go fetchAndAssign(&wg, errChan, tc.target, tc.fetchFunc) + wg.Wait() + close(errChan) + + if tc.expectError { + if len(errChan) == 0 { + t.Errorf("Expected an error, but got none") + } + } else { + if len(errChan) != 0 { + t.Errorf("Expected no errors, but got %d", len(errChan)) + } + + targetValue := tc.target + expectedValue := tc.expectedValue + + switch target := targetValue.(type) { + case *plusclient.Upstreams: + if *target != nil { + t.Errorf("Expected target to be '%v', but got '%v'", expectedValue, *target) + } + assert.Equal(t, target, tc.expectedValue) + case *plusclient.ServerZones: + if *target != nil { + t.Errorf("Expected target to be '%v', but got '%v'", expectedValue, *target) + } + assert.Equal(t, target, tc.expectedValue) + case []string: + if target != nil { + t.Errorf("Expected target to be '%v', but got '%v'", expectedValue, target) + } + assert.Equal(t, target, tc.expectedValue) + } + } + }) + } +} + func TestNginxPlusUpdate(t *testing.T) { nginxPlus := NewNginxPlus(&metrics.CommonDim{}, "test", PlusNamespace, "http://localhost:8080/api", 6) From 999cec60abc727ec6c0dd7ae713a1ffd609c4479 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Wed, 31 Jul 2024 14:34:59 +0100 Subject: [PATCH 12/17] local changes --- src/core/metrics/sources/nginx_plus_test.go | 18 +-- .../v2/src/core/metrics/sources/nginx_plus.go | 118 ++++++++++-------- 2 files changed, 72 insertions(+), 64 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index 78c03f816..d43465e69 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -98,7 +98,7 @@ const ( var ( availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"} - stats = plusclient.Stats{ + stats = plusclient.Stats{ HTTPRequests: plusclient.HTTPRequests{ Total: currentHTTPRequestTotal, Current: currentHTTPRequestCurrent, @@ -739,18 +739,18 @@ func TestFetchAndAssign(t *testing.T) { // expectError: false, // }, { - name: "Fetch Available Stream Endpoints", - target: []string{}, - fetchFunc: client.GetAvailableStreamEndpoints, + name: "Fetch Available Stream Endpoints", + target: []string{}, + fetchFunc: client.GetAvailableStreamEndpoints, expectedValue: availableZones, - expectError: false, + expectError: false, }, { - name: "Unsupported Type", - target: new(int), - fetchFunc: func() (int, error) { return 0, nil }, + name: "Unsupported Type", + target: new(int), + fetchFunc: func() (int, error) { return 0, nil }, expectedValue: 0, - expectError: true, + expectError: true, }, } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index e460cabda..cc32e95c9 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -118,6 +118,8 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } + log.Debug("NGINX_plus_Collect: getting stats") + stats, err := c.getStats(cl) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) @@ -125,6 +127,8 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m return } + log.Debug("NGINX_plus_Collect: got stats") + if c.prevStats == nil { c.prevStats = stats } @@ -135,6 +139,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxBuild = stats.NginxInfo.Build c.baseDimensions.NginxVersion = stats.NginxInfo.Version + log.Debugf("NGINX_plus_Collect: collecting stats %v", stats) c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) log.Debug("NGINX_plus_Collect: metrics sent") @@ -197,10 +202,9 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { initialStatsErrChan := make(chan error, len(initialStats)*2) // run these functions in parallel - for idx, stats := range initialStats { + for _, stats := range initialStats { initialStatsWg.Add(1) go fetchAndAssign(&initialStatsWg, initialStatsErrChan, stats.target, stats.fetchFunc) - log.Print(idx) } initialStatsWg.Wait() @@ -253,8 +257,10 @@ func fetchData[T any]( errChan <- fmt.Errorf("failed to get stats: %w", err) return } - //nolint:ineffassign + // nolint:ineffassign target = data + log.Printf("target %v", target) + log.Printf("data %v", data) } func fetchDataVal[T any]( @@ -267,8 +273,10 @@ func fetchDataVal[T any]( errChan <- fmt.Errorf("failed to get stats: %w", err) return } - //nolint:ineffassign + // nolint:ineffassign target = data + log.Printf("target %v", target) + log.Printf("data %v", data) } // this function takes the target type and matches it's function signature @@ -282,57 +290,57 @@ func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, case *plusclient.Upstreams: f := fetchFunc.(func() (*plusclient.Upstreams, error)) fetchData(errChan, t, f) - case *plusclient.ServerZones: - f := fetchFunc.(func() (*plusclient.ServerZones, error)) - fetchData(errChan, t, f) - case *plusclient.StreamServerZones: - f := fetchFunc.(func() (*plusclient.StreamServerZones, error)) - fetchData(errChan, t, f) - case *plusclient.StreamUpstreams: - f := fetchFunc.(func() (*plusclient.StreamUpstreams, error)) - fetchData(errChan, t, f) - case *plusclient.Slabs: - f := fetchFunc.(func() (*plusclient.Slabs, error)) - fetchData(errChan, t, f) - case *plusclient.Caches: - f := fetchFunc.(func() (*plusclient.Caches, error)) - fetchData(errChan, t, f) - case *plusclient.HTTPLimitConnections: - f := fetchFunc.(func() (*plusclient.HTTPLimitConnections, error)) - fetchData(errChan, t, f) - case *plusclient.StreamLimitConnections: - f := fetchFunc.(func() (*plusclient.StreamLimitConnections, error)) - fetchData(errChan, t, f) - case *plusclient.HTTPLimitRequests: - f := fetchFunc.(func() (*plusclient.HTTPLimitRequests, error)) - fetchData(errChan, t, f) - case *plusclient.Resolvers: - f := fetchFunc.(func() (*plusclient.Resolvers, error)) - fetchData(errChan, t, f) - case *plusclient.LocationZones: - f := fetchFunc.(func() (*plusclient.LocationZones, error)) - fetchData(errChan, t, f) - case *plusclient.StreamZoneSync: - f := fetchFunc.(func() (*plusclient.StreamZoneSync, error)) - fetchData(errChan, t, f) - case []*plusclient.Workers: - f := fetchFunc.(func() (*[]*plusclient.Workers, error)) - fetchData(errChan, &t, f) - case *plusclient.NginxInfo: - f := fetchFunc.(func() (*plusclient.NginxInfo, error)) - fetchData(errChan, t, f) - case *plusclient.SSL: - f := fetchFunc.(func() (*plusclient.SSL, error)) - fetchData(errChan, t, f) - case *plusclient.Connections: - f := fetchFunc.(func() (*plusclient.Connections, error)) - fetchData(errChan, t, f) - case *plusclient.HTTPRequests: - f := fetchFunc.(func() (*plusclient.HTTPRequests, error)) - fetchData(errChan, t, f) - case *plusclient.Processes: - f := fetchFunc.(func() (*plusclient.Processes, error)) - fetchData(errChan, t, f) + // case *plusclient.ServerZones: + // f := fetchFunc.(func() (plusclient.ServerZones, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamServerZones: + // f := fetchFunc.(func() (plusclient.StreamServerZones, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamUpstreams: + // f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) + // fetchData(errChan, t, f) + // case *plusclient.Slabs: + // f := fetchFunc.(func() (plusclient.Slabs, error)) + // fetchData(errChan, t, f) + // case *plusclient.Caches: + // f := fetchFunc.(func() (plusclient.Caches, error)) + // fetchData(errChan, t, f) + // case *plusclient.HTTPLimitConnections: + // f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamLimitConnections: + // f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) + // fetchData(errChan, t, f) + // case *plusclient.HTTPLimitRequests: + // f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) + // fetchData(errChan, t, f) + // case *plusclient.Resolvers: + // f := fetchFunc.(func() (plusclient.Resolvers, error)) + // fetchData(errChan, t, f) + // case *plusclient.LocationZones: + // f := fetchFunc.(func() (plusclient.LocationZones, error)) + // fetchData(errChan, t, f) + // case *plusclient.StreamZoneSync: + // f := fetchFunc.(func() (plusclient.StreamZoneSync, error)) + // fetchData(errChan, t, f) + // case []*plusclient.Workers: + // f := fetchFunc.(func() ([]*plusclient.Workers, error)) + // fetchData(errChan, &t, f) + // case *plusclient.NginxInfo: + // f := fetchFunc.(func() (plusclient.NginxInfo, error)) + // fetchData(errChan, t, f) + // case *plusclient.SSL: + // f := fetchFunc.(func() (plusclient.SSL, error)) + // fetchData(errChan, t, f) + // case *plusclient.Connections: + // f := fetchFunc.(func() (plusclient.Connections, error)) + // fetchData(errChan, t, f) + // case *plusclient.HTTPRequests: + // f := fetchFunc.(func() (plusclient.HTTPRequests, error)) + // fetchData(errChan, t, f) + // case *plusclient.Processes: + // f := fetchFunc.(func() (plusclient.Processes, error)) + // fetchData(errChan, t, f) default: errChan <- fmt.Errorf("unsupported type: %T", target) } From 9fb0b6da2238343543d602b46179dc69725f4126 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Wed, 31 Jul 2024 16:16:03 +0100 Subject: [PATCH 13/17] fixed issues with assignment --- src/core/metrics/sources/nginx_plus.go | 379 ++++++++++++-------- src/core/metrics/sources/nginx_plus_test.go | 164 ++------- 2 files changed, 253 insertions(+), 290 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index cc32e95c9..4a802d405 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -146,7 +146,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) defatultStats() *plusclient.Stats { +func (c *NginxPlus) defaultStats() *plusclient.Stats { return &plusclient.Stats{ Upstreams: map[string]plusclient.Upstream{}, ServerZones: map[string]plusclient.ServerZone{}, @@ -171,75 +171,259 @@ func (c *NginxPlus) defatultStats() *plusclient.Stats { func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { var initialStatsWg sync.WaitGroup + initialStatsErrChan := make(chan error, 16) stats := &ExtendedStats{ - Stats: c.defatultStats(), + Stats: c.defaultStats(), } - initialStats := []struct { - target interface{} - fetchFunc interface{} - }{ - {&stats.endpoints, client.GetAvailableEndpoints}, - // before, we used error on this, if no stream endpoints we will continue and the if condition - // lower down should cater for it - {&stats.streamEndpoints, client.GetAvailableStreamEndpoints}, - {&stats.Stats.NginxInfo, client.GetNginxInfo}, - {&stats.Stats.Caches, client.GetCaches}, - {&stats.Stats.Processes, client.GetProcesses}, - {&stats.Stats.Slabs, client.GetSlabs}, - {&stats.Stats.Connections, client.GetConnections}, - {&stats.Stats.HTTPRequests, client.GetHTTPRequests}, - {&stats.Stats.SSL, client.GetSSL}, - {&stats.Stats.ServerZones, client.GetServerZones}, - {&stats.Stats.Upstreams, client.GetUpstreams}, - {&stats.Stats.LocationZones, client.GetLocationZones}, - {&stats.Stats.Resolvers, client.GetResolvers}, - {&stats.Stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, - {&stats.Stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, - {&stats.Stats.Workers, client.GetWorkers}, - } + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + endpoints, err := client.GetAvailableEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Endpoints: %w", err) + return + } + stats.endpoints = endpoints + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + streamEndpoints, err := client.GetAvailableStreamEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Stream Endpoints: %w", err) + return + } + stats.streamEndpoints = streamEndpoints + }() - initialStatsErrChan := make(chan error, len(initialStats)*2) + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + nginxInfo, err := client.GetNginxInfo() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.NginxInfo = *nginxInfo + }() - // run these functions in parallel - for _, stats := range initialStats { - initialStatsWg.Add(1) - go fetchAndAssign(&initialStatsWg, initialStatsErrChan, stats.target, stats.fetchFunc) - } + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + caches, err := client.GetCaches() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.Caches = *caches + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + processes, err := client.GetProcesses() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Processes = *processes + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + slabs, err := client.GetSlabs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Slabs = *slabs + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + connections, err := client.GetConnections() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get connections: %w", err) + return + } + stats.Connections = *connections + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpRequests, err := client.GetHTTPRequests() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPRequests: %w", err) + return + } + stats.HTTPRequests = *httpRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + ssl, err := client.GetSSL() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get SSL: %w", err) + return + } + stats.SSL = *ssl + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + serverZones, err := client.GetServerZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get ServerZones: %w", err) + return + } + stats.ServerZones = *serverZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + upstreams, err := client.GetUpstreams() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Upstreams: %w", err) + return + } + stats.Upstreams = *upstreams + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + locationZones, err := client.GetLocationZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get LocationZones: %w", err) + return + } + stats.LocationZones = *locationZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + resolvers, err := client.GetResolvers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Resolvers: %w", err) + return + } + stats.Resolvers = *resolvers + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitRequests, err := client.GetHTTPLimitReqs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitRequests: %w", err) + return + } + stats.HTTPLimitRequests = *httpLimitRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitConnections, err := client.GetHTTPConnectionsLimit() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitConnections: %w", err) + return + } + stats.HTTPLimitConnections = *httpLimitConnections + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + workers, err := client.GetWorkers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Workers: %w", err) + return + } + stats.Workers = workers + }() initialStatsWg.Wait() close(initialStatsErrChan) // only error if all the stats are empty - if len(initialStatsErrChan) == len(initialStats) { + if len(initialStatsErrChan) == 16 { return nil, errors.New("no useful metrics found") } if slices.Contains(stats.endpoints, "stream") { var streamStatsWg sync.WaitGroup - // check if these can get added to the client GetStats in a different way - endpointStats := []struct { - target interface{} - fetchFunc interface{} - statType string - }{ - {&stats.StreamServerZones, client.GetStreamServerZones, "server_zones"}, - {&stats.StreamUpstreams, client.GetStreamUpstreams, "upstreams"}, - {&stats.StreamLimitConnections, client.GetStreamConnectionsLimit, "limit_conns"}, - {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, + // this may never be 4 depending on the if conditions + streamStatsErrChan := make(chan error, 4) + + if slices.Contains(stats.streamEndpoints, "server_zones") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamServerZones, err := client.GetStreamServerZones() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get streamServerZones: %w", err) + return + } + stats.StreamServerZones = *streamServerZones + }() } - streamStatsErrChan := make(chan error, len(endpointStats)*2) + if slices.Contains(stats.streamEndpoints, "upstreams") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamUpstreams, err := client.GetStreamUpstreams() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamUpstreams: %w", err) + return + } + stats.StreamUpstreams = *streamUpstreams + }() + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { - for _, stat := range endpointStats { streamStatsWg.Add(1) - if slices.Contains(stats.streamEndpoints, stat.statType) { - go fetchAndAssign(&streamStatsWg, streamStatsErrChan, stat.target, stat.fetchFunc) - } + go func() { + defer streamStatsWg.Done() + streamConnectionsLimit, err := client.GetStreamConnectionsLimit() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamLimitConnections: %w", err) + return + } + stats.StreamLimitConnections = *streamConnectionsLimit + }() + + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamZoneSync, err := client.GetStreamZoneSync() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamZoneSync: %w", err) + return + } + stats.StreamZoneSync = streamZoneSync + }() + } + streamStatsWg.Wait() close(streamStatsErrChan) - if len(streamStatsErrChan) == len(endpointStats) { + if len(streamStatsErrChan) > 0 { log.Warnf("no useful metrics found in stream stats") } } @@ -247,105 +431,6 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { return stats.Stats, nil } -func fetchData[T any]( - errChan chan error, - target *T, - fetchFunc func() (*T, error), -) { - data, err := fetchFunc() - if err != nil { - errChan <- fmt.Errorf("failed to get stats: %w", err) - return - } - // nolint:ineffassign - target = data - log.Printf("target %v", target) - log.Printf("data %v", data) -} - -func fetchDataVal[T any]( - errChan chan error, - target T, - fetchFunc func() (T, error), -) { - data, err := fetchFunc() - if err != nil { - errChan <- fmt.Errorf("failed to get stats: %w", err) - return - } - // nolint:ineffassign - target = data - log.Printf("target %v", target) - log.Printf("data %v", data) -} - -// this function takes the target type and matches it's function signature -func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { - defer wg.Done() - - switch t := target.(type) { - case []string: - f := fetchFunc.(func() ([]string, error)) - fetchDataVal(errChan, t, f) - case *plusclient.Upstreams: - f := fetchFunc.(func() (*plusclient.Upstreams, error)) - fetchData(errChan, t, f) - // case *plusclient.ServerZones: - // f := fetchFunc.(func() (plusclient.ServerZones, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamServerZones: - // f := fetchFunc.(func() (plusclient.StreamServerZones, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamUpstreams: - // f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) - // fetchData(errChan, t, f) - // case *plusclient.Slabs: - // f := fetchFunc.(func() (plusclient.Slabs, error)) - // fetchData(errChan, t, f) - // case *plusclient.Caches: - // f := fetchFunc.(func() (plusclient.Caches, error)) - // fetchData(errChan, t, f) - // case *plusclient.HTTPLimitConnections: - // f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamLimitConnections: - // f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) - // fetchData(errChan, t, f) - // case *plusclient.HTTPLimitRequests: - // f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) - // fetchData(errChan, t, f) - // case *plusclient.Resolvers: - // f := fetchFunc.(func() (plusclient.Resolvers, error)) - // fetchData(errChan, t, f) - // case *plusclient.LocationZones: - // f := fetchFunc.(func() (plusclient.LocationZones, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamZoneSync: - // f := fetchFunc.(func() (plusclient.StreamZoneSync, error)) - // fetchData(errChan, t, f) - // case []*plusclient.Workers: - // f := fetchFunc.(func() ([]*plusclient.Workers, error)) - // fetchData(errChan, &t, f) - // case *plusclient.NginxInfo: - // f := fetchFunc.(func() (plusclient.NginxInfo, error)) - // fetchData(errChan, t, f) - // case *plusclient.SSL: - // f := fetchFunc.(func() (plusclient.SSL, error)) - // fetchData(errChan, t, f) - // case *plusclient.Connections: - // f := fetchFunc.(func() (plusclient.Connections, error)) - // fetchData(errChan, t, f) - // case *plusclient.HTTPRequests: - // f := fetchFunc.(func() (plusclient.HTTPRequests, error)) - // fetchData(errChan, t, f) - // case *plusclient.Processes: - // f := fetchFunc.(func() (plusclient.Processes, error)) - // fetchData(errChan, t, f) - default: - errChan <- fmt.Errorf("unsupported type: %T", target) - } -} - func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index d43465e69..51e93ef25 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -12,7 +12,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "reflect" "sync" "testing" @@ -99,6 +98,25 @@ const ( var ( availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"} stats = plusclient.Stats{ + StreamZoneSync: &plusclient.StreamZoneSync{ + Zones: map[string]plusclient.SyncZone{ + "0": { + RecordsPending: 1, + RecordsTotal: 2, + }, + "1": { + RecordsPending: 3, + RecordsTotal: 4, + }, + }, + Status: plusclient.StreamZoneSyncStatus{ + BytesIn: 1, + MsgsIn: 2, + MsgsOut: 3, + BytesOut: 4, + NodesOnline: 5, + }, + }, HTTPRequests: plusclient.HTTPRequests{ Total: currentHTTPRequestTotal, Current: currentHTTPRequestCurrent, @@ -649,153 +667,13 @@ func TestGetStats(t *testing.T) { client := &MockClient{} source := NewNginxPlus(nil, "", "", "", 9) - expectedStats := source.defatultStats() - stats, err := source.getStats(client) + testStats, err := source.getStats(client) if err != nil { t.Fatalf("expected no error, got %v", err) } - if !reflect.DeepEqual(stats, expectedStats) { - t.Fatalf("expected %v, got %v", expectedStats, stats) - } -} - -func TestFetchData(t *testing.T) { - type TestCase[T any] struct { - name string - fetchFunc func() (T, error) - expectedValue T - expectError bool - } - - pointerCases := []TestCase[*plusclient.Upstreams]{ - { - name: "Successful fetch", - fetchFunc: func() (*plusclient.Upstreams, error) { - return &stats.Upstreams, nil - }, - expectedValue: &stats.Upstreams, - expectError: false, - }, - { - name: "Fetch error", - fetchFunc: func() (*plusclient.Upstreams, error) { - return nil, fmt.Errorf("fetch error") - }, - expectedValue: nil, - expectError: true, - }, - } - - stats := &plusclient.Stats{} - for _, tc := range pointerCases { - t.Run(tc.name, func(t *testing.T) { - errChan := make(chan error, 1) - - fetchData(errChan, &stats.Upstreams, tc.fetchFunc) - close(errChan) - - if tc.expectError { - if len(errChan) == 0 { - t.Errorf("Expected an error, but got none") - } - } else { - if len(errChan) != 0 { - t.Errorf("Expected no errors, but got %d", len(errChan)) - } - if len(stats.Upstreams) == 0 { - t.Errorf("Expected target to be '%v', but got '%v'", tc.expectedValue, stats.Upstreams) - } - assert.Equal(t, stats.Upstreams, tc.expectedValue) - } - }) - } -} - -func TestFetchAndAssign(t *testing.T) { - type TestCase struct { - name string - target interface{} - fetchFunc interface{} - expectedValue interface{} - expectError bool - } - client := MockClient{} - // Define the test cases - testCases := []TestCase{ - { - name: "Fetch Upstreams", - target: new(plusclient.Upstreams), - fetchFunc: client.GetUpstreams, - expectedValue: stats.Upstreams, - expectError: false, - }, - // { - // name: "Fetch ServerZones", - // target: new(plusclient.ServerZones), - // fetchFunc: client.GetServerZones, - // expectedValue: stats.ServerZones, - // expectError: false, - // }, - { - name: "Fetch Available Stream Endpoints", - target: []string{}, - fetchFunc: client.GetAvailableStreamEndpoints, - expectedValue: availableZones, - expectError: false, - }, - { - name: "Unsupported Type", - target: new(int), - fetchFunc: func() (int, error) { return 0, nil }, - expectedValue: 0, - expectError: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - var wg sync.WaitGroup - errChan := make(chan error, 1) - - wg.Add(1) - go fetchAndAssign(&wg, errChan, tc.target, tc.fetchFunc) - wg.Wait() - close(errChan) - - if tc.expectError { - if len(errChan) == 0 { - t.Errorf("Expected an error, but got none") - } - } else { - if len(errChan) != 0 { - t.Errorf("Expected no errors, but got %d", len(errChan)) - } - - targetValue := tc.target - expectedValue := tc.expectedValue - - switch target := targetValue.(type) { - case *plusclient.Upstreams: - if *target != nil { - t.Errorf("Expected target to be '%v', but got '%v'", expectedValue, *target) - } - assert.Equal(t, target, tc.expectedValue) - case *plusclient.ServerZones: - if *target != nil { - t.Errorf("Expected target to be '%v', but got '%v'", expectedValue, *target) - } - assert.Equal(t, target, tc.expectedValue) - case []string: - if target != nil { - t.Errorf("Expected target to be '%v', but got '%v'", expectedValue, target) - } - assert.Equal(t, target, tc.expectedValue) - } - } - }) - } + assert.Equal(t, stats, *testStats) } func TestNginxPlusUpdate(t *testing.T) { From e02a8f7d73283bc4cf268c326b1f56d767c72245 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Wed, 31 Jul 2024 16:25:44 +0100 Subject: [PATCH 14/17] updated stats retrieval --- src/core/metrics/sources/nginx_plus.go | 2 +- src/core/metrics/sources/nginx_plus_test.go | 17 +- .../v2/src/core/metrics/sources/nginx_plus.go | 379 +++++++++++------- 3 files changed, 245 insertions(+), 153 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 4a802d405..6195a7b08 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -364,7 +364,7 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { var streamStatsWg sync.WaitGroup // this may never be 4 depending on the if conditions streamStatsErrChan := make(chan error, 4) - + if slices.Contains(stats.streamEndpoints, "server_zones") { streamStatsWg.Add(1) go func() { diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index 51e93ef25..b067a987a 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -99,7 +99,7 @@ var ( availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"} stats = plusclient.Stats{ StreamZoneSync: &plusclient.StreamZoneSync{ - Zones: map[string]plusclient.SyncZone{ + Zones: map[string]plusclient.SyncZone{ "0": { RecordsPending: 1, RecordsTotal: 2, @@ -668,12 +668,19 @@ func TestGetStats(t *testing.T) { source := NewNginxPlus(nil, "", "", "", 9) - testStats, err := source.getStats(client) - if err != nil { - t.Fatalf("expected no error, got %v", err) + tests := []struct { + stats plusclient.Stats + }{ + { + stats: stats, + }, } - assert.Equal(t, stats, *testStats) + for _, test := range tests { + resultStats, err := source.getStats(client) + require.NoError(t, err) + assert.Equal(t, test.stats, *resultStats) + } } func TestNginxPlusUpdate(t *testing.T) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index cc32e95c9..6195a7b08 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -146,7 +146,7 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.prevStats = stats } -func (c *NginxPlus) defatultStats() *plusclient.Stats { +func (c *NginxPlus) defaultStats() *plusclient.Stats { return &plusclient.Stats{ Upstreams: map[string]plusclient.Upstream{}, ServerZones: map[string]plusclient.ServerZone{}, @@ -171,75 +171,259 @@ func (c *NginxPlus) defatultStats() *plusclient.Stats { func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { var initialStatsWg sync.WaitGroup + initialStatsErrChan := make(chan error, 16) stats := &ExtendedStats{ - Stats: c.defatultStats(), + Stats: c.defaultStats(), } - initialStats := []struct { - target interface{} - fetchFunc interface{} - }{ - {&stats.endpoints, client.GetAvailableEndpoints}, - // before, we used error on this, if no stream endpoints we will continue and the if condition - // lower down should cater for it - {&stats.streamEndpoints, client.GetAvailableStreamEndpoints}, - {&stats.Stats.NginxInfo, client.GetNginxInfo}, - {&stats.Stats.Caches, client.GetCaches}, - {&stats.Stats.Processes, client.GetProcesses}, - {&stats.Stats.Slabs, client.GetSlabs}, - {&stats.Stats.Connections, client.GetConnections}, - {&stats.Stats.HTTPRequests, client.GetHTTPRequests}, - {&stats.Stats.SSL, client.GetSSL}, - {&stats.Stats.ServerZones, client.GetServerZones}, - {&stats.Stats.Upstreams, client.GetUpstreams}, - {&stats.Stats.LocationZones, client.GetLocationZones}, - {&stats.Stats.Resolvers, client.GetResolvers}, - {&stats.Stats.HTTPLimitRequests, client.GetHTTPLimitReqs}, - {&stats.Stats.HTTPLimitConnections, client.GetHTTPConnectionsLimit}, - {&stats.Stats.Workers, client.GetWorkers}, - } + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + endpoints, err := client.GetAvailableEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Endpoints: %w", err) + return + } + stats.endpoints = endpoints + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + streamEndpoints, err := client.GetAvailableStreamEndpoints() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Available Stream Endpoints: %w", err) + return + } + stats.streamEndpoints = streamEndpoints + }() - initialStatsErrChan := make(chan error, len(initialStats)*2) + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + nginxInfo, err := client.GetNginxInfo() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.NginxInfo = *nginxInfo + }() - // run these functions in parallel - for _, stats := range initialStats { - initialStatsWg.Add(1) - go fetchAndAssign(&initialStatsWg, initialStatsErrChan, stats.target, stats.fetchFunc) - } + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + caches, err := client.GetCaches() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) + return + } + stats.Caches = *caches + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + processes, err := client.GetProcesses() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Processes = *processes + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + slabs, err := client.GetSlabs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) + return + } + stats.Slabs = *slabs + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + connections, err := client.GetConnections() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get connections: %w", err) + return + } + stats.Connections = *connections + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpRequests, err := client.GetHTTPRequests() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPRequests: %w", err) + return + } + stats.HTTPRequests = *httpRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + ssl, err := client.GetSSL() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get SSL: %w", err) + return + } + stats.SSL = *ssl + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + serverZones, err := client.GetServerZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get ServerZones: %w", err) + return + } + stats.ServerZones = *serverZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + upstreams, err := client.GetUpstreams() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Upstreams: %w", err) + return + } + stats.Upstreams = *upstreams + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + locationZones, err := client.GetLocationZones() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get LocationZones: %w", err) + return + } + stats.LocationZones = *locationZones + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + resolvers, err := client.GetResolvers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Resolvers: %w", err) + return + } + stats.Resolvers = *resolvers + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitRequests, err := client.GetHTTPLimitReqs() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitRequests: %w", err) + return + } + stats.HTTPLimitRequests = *httpLimitRequests + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + httpLimitConnections, err := client.GetHTTPConnectionsLimit() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitConnections: %w", err) + return + } + stats.HTTPLimitConnections = *httpLimitConnections + }() + + initialStatsWg.Add(1) + go func() { + defer initialStatsWg.Done() + workers, err := client.GetWorkers() + if err != nil { + initialStatsErrChan <- fmt.Errorf("failed to get Workers: %w", err) + return + } + stats.Workers = workers + }() initialStatsWg.Wait() close(initialStatsErrChan) // only error if all the stats are empty - if len(initialStatsErrChan) == len(initialStats) { + if len(initialStatsErrChan) == 16 { return nil, errors.New("no useful metrics found") } if slices.Contains(stats.endpoints, "stream") { var streamStatsWg sync.WaitGroup - // check if these can get added to the client GetStats in a different way - endpointStats := []struct { - target interface{} - fetchFunc interface{} - statType string - }{ - {&stats.StreamServerZones, client.GetStreamServerZones, "server_zones"}, - {&stats.StreamUpstreams, client.GetStreamUpstreams, "upstreams"}, - {&stats.StreamLimitConnections, client.GetStreamConnectionsLimit, "limit_conns"}, - {&stats.StreamZoneSync, client.GetStreamZoneSync, "zone_sync"}, + // this may never be 4 depending on the if conditions + streamStatsErrChan := make(chan error, 4) + + if slices.Contains(stats.streamEndpoints, "server_zones") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamServerZones, err := client.GetStreamServerZones() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get streamServerZones: %w", err) + return + } + stats.StreamServerZones = *streamServerZones + }() + } + + if slices.Contains(stats.streamEndpoints, "upstreams") { + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamUpstreams, err := client.GetStreamUpstreams() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamUpstreams: %w", err) + return + } + stats.StreamUpstreams = *streamUpstreams + }() } - streamStatsErrChan := make(chan error, len(endpointStats)*2) + if slices.Contains(stats.streamEndpoints, "limit_conns") { - for _, stat := range endpointStats { streamStatsWg.Add(1) - if slices.Contains(stats.streamEndpoints, stat.statType) { - go fetchAndAssign(&streamStatsWg, streamStatsErrChan, stat.target, stat.fetchFunc) - } + go func() { + defer streamStatsWg.Done() + streamConnectionsLimit, err := client.GetStreamConnectionsLimit() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamLimitConnections: %w", err) + return + } + stats.StreamLimitConnections = *streamConnectionsLimit + }() + + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + + streamStatsWg.Add(1) + go func() { + defer streamStatsWg.Done() + streamZoneSync, err := client.GetStreamZoneSync() + if err != nil { + streamStatsErrChan <- fmt.Errorf("failed to get StreamZoneSync: %w", err) + return + } + stats.StreamZoneSync = streamZoneSync + }() + } + streamStatsWg.Wait() close(streamStatsErrChan) - if len(streamStatsErrChan) == len(endpointStats) { + if len(streamStatsErrChan) > 0 { log.Warnf("no useful metrics found in stream stats") } } @@ -247,105 +431,6 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { return stats.Stats, nil } -func fetchData[T any]( - errChan chan error, - target *T, - fetchFunc func() (*T, error), -) { - data, err := fetchFunc() - if err != nil { - errChan <- fmt.Errorf("failed to get stats: %w", err) - return - } - // nolint:ineffassign - target = data - log.Printf("target %v", target) - log.Printf("data %v", data) -} - -func fetchDataVal[T any]( - errChan chan error, - target T, - fetchFunc func() (T, error), -) { - data, err := fetchFunc() - if err != nil { - errChan <- fmt.Errorf("failed to get stats: %w", err) - return - } - // nolint:ineffassign - target = data - log.Printf("target %v", target) - log.Printf("data %v", data) -} - -// this function takes the target type and matches it's function signature -func fetchAndAssign(wg *sync.WaitGroup, errChan chan error, target interface{}, fetchFunc interface{}) { - defer wg.Done() - - switch t := target.(type) { - case []string: - f := fetchFunc.(func() ([]string, error)) - fetchDataVal(errChan, t, f) - case *plusclient.Upstreams: - f := fetchFunc.(func() (*plusclient.Upstreams, error)) - fetchData(errChan, t, f) - // case *plusclient.ServerZones: - // f := fetchFunc.(func() (plusclient.ServerZones, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamServerZones: - // f := fetchFunc.(func() (plusclient.StreamServerZones, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamUpstreams: - // f := fetchFunc.(func() (plusclient.StreamUpstreams, error)) - // fetchData(errChan, t, f) - // case *plusclient.Slabs: - // f := fetchFunc.(func() (plusclient.Slabs, error)) - // fetchData(errChan, t, f) - // case *plusclient.Caches: - // f := fetchFunc.(func() (plusclient.Caches, error)) - // fetchData(errChan, t, f) - // case *plusclient.HTTPLimitConnections: - // f := fetchFunc.(func() (plusclient.HTTPLimitConnections, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamLimitConnections: - // f := fetchFunc.(func() (plusclient.StreamLimitConnections, error)) - // fetchData(errChan, t, f) - // case *plusclient.HTTPLimitRequests: - // f := fetchFunc.(func() (plusclient.HTTPLimitRequests, error)) - // fetchData(errChan, t, f) - // case *plusclient.Resolvers: - // f := fetchFunc.(func() (plusclient.Resolvers, error)) - // fetchData(errChan, t, f) - // case *plusclient.LocationZones: - // f := fetchFunc.(func() (plusclient.LocationZones, error)) - // fetchData(errChan, t, f) - // case *plusclient.StreamZoneSync: - // f := fetchFunc.(func() (plusclient.StreamZoneSync, error)) - // fetchData(errChan, t, f) - // case []*plusclient.Workers: - // f := fetchFunc.(func() ([]*plusclient.Workers, error)) - // fetchData(errChan, &t, f) - // case *plusclient.NginxInfo: - // f := fetchFunc.(func() (plusclient.NginxInfo, error)) - // fetchData(errChan, t, f) - // case *plusclient.SSL: - // f := fetchFunc.(func() (plusclient.SSL, error)) - // fetchData(errChan, t, f) - // case *plusclient.Connections: - // f := fetchFunc.(func() (plusclient.Connections, error)) - // fetchData(errChan, t, f) - // case *plusclient.HTTPRequests: - // f := fetchFunc.(func() (plusclient.HTTPRequests, error)) - // fetchData(errChan, t, f) - // case *plusclient.Processes: - // f := fetchFunc.(func() (plusclient.Processes, error)) - // fetchData(errChan, t, f) - default: - errChan <- fmt.Errorf("unsupported type: %T", target) - } -} - func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI From 2ac416b4d36aab9362e8c431e4c9d98208483ff7 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Wed, 31 Jul 2024 17:46:14 +0100 Subject: [PATCH 15/17] removed debug --- src/core/metrics/sources/nginx_plus.go | 1 - .../nginx/agent/v2/src/core/metrics/sources/nginx_plus.go | 1 - 2 files changed, 2 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 6195a7b08..981459dc8 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -139,7 +139,6 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxBuild = stats.NginxInfo.Build c.baseDimensions.NginxVersion = stats.NginxInfo.Version - log.Debugf("NGINX_plus_Collect: collecting stats %v", stats) c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) log.Debug("NGINX_plus_Collect: metrics sent") diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 6195a7b08..981459dc8 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -139,7 +139,6 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m c.baseDimensions.NginxBuild = stats.NginxInfo.Build c.baseDimensions.NginxVersion = stats.NginxInfo.Version - log.Debugf("NGINX_plus_Collect: collecting stats %v", stats) c.sendMetrics(ctx, m, c.collectMetrics(stats, c.prevStats)...) log.Debug("NGINX_plus_Collect: metrics sent") From 0877f74a9ac56d88eb4d91b7535a391d3edcbfa6 Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Thu, 1 Aug 2024 13:47:31 +0100 Subject: [PATCH 16/17] typo --- src/core/nginx.go | 2 +- .../vendor/github.com/nginx/agent/v2/src/core/nginx.go | 2 +- .../vendor/github.com/nginx/agent/v2/src/core/nginx.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/nginx.go b/src/core/nginx.go index f29f410ee..b85451dcf 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro if urlsLength == 0 || nginxStatus == "" { stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) if err != nil { - log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err) + log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err) } nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index f29f410ee..b85451dcf 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro if urlsLength == 0 || nginxStatus == "" { stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) if err != nil { - log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err) + log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err) } nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index f29f410ee..b85451dcf 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro if urlsLength == 0 || nginxStatus == "" { stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) if err != nil { - log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err) + log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err) } nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives) From 4ff1c98ebf94ebedbe6e9e10bc3b53acbe5c8dde Mon Sep 17 00:00:00 2001 From: Oliver O'Mahony Date: Fri, 2 Aug 2024 15:42:34 +0100 Subject: [PATCH 17/17] pushed the connections call to the end --- src/core/metrics/sources/nginx_plus.go | 35 ++++++++++++------- .../v2/src/core/metrics/sources/nginx_plus.go | 35 ++++++++++++------- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/core/metrics/sources/nginx_plus.go b/src/core/metrics/sources/nginx_plus.go index 981459dc8..3077083ba 100644 --- a/src/core/metrics/sources/nginx_plus.go +++ b/src/core/metrics/sources/nginx_plus.go @@ -170,7 +170,7 @@ func (c *NginxPlus) defaultStats() *plusclient.Stats { func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { var initialStatsWg sync.WaitGroup - initialStatsErrChan := make(chan error, 16) + initialStatsErrChan := make(chan error, 15) stats := &ExtendedStats{ Stats: c.defaultStats(), } @@ -241,17 +241,6 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { stats.Slabs = *slabs }() - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - connections, err := client.GetConnections() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get connections: %w", err) - return - } - stats.Connections = *connections - }() - initialStatsWg.Add(1) go func() { defer initialStatsWg.Done() @@ -427,6 +416,28 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { } } + // report connection metrics separately so it does not influence the results (all http requests show in the metrics) + var connectionsWg sync.WaitGroup + connectionsErrChan := make(chan error, 1) + + connectionsWg.Add(1) + go func() { + defer connectionsWg.Done() + connections, err := client.GetConnections() + if err != nil { + connectionsErrChan <- fmt.Errorf("failed to get connections: %w", err) + return + } + stats.Connections = *connections + }() + + connectionsWg.Wait() + close(connectionsErrChan) + + if len(connectionsErrChan) > 0 { + log.Warnf("connections metrics not found") + } + return stats.Stats, nil } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 981459dc8..3077083ba 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -170,7 +170,7 @@ func (c *NginxPlus) defaultStats() *plusclient.Stats { func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { var initialStatsWg sync.WaitGroup - initialStatsErrChan := make(chan error, 16) + initialStatsErrChan := make(chan error, 15) stats := &ExtendedStats{ Stats: c.defaultStats(), } @@ -241,17 +241,6 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { stats.Slabs = *slabs }() - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - connections, err := client.GetConnections() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get connections: %w", err) - return - } - stats.Connections = *connections - }() - initialStatsWg.Add(1) go func() { defer initialStatsWg.Done() @@ -427,6 +416,28 @@ func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { } } + // report connection metrics separately so it does not influence the results (all http requests show in the metrics) + var connectionsWg sync.WaitGroup + connectionsErrChan := make(chan error, 1) + + connectionsWg.Add(1) + go func() { + defer connectionsWg.Done() + connections, err := client.GetConnections() + if err != nil { + connectionsErrChan <- fmt.Errorf("failed to get connections: %w", err) + return + } + stats.Connections = *connections + }() + + connectionsWg.Wait() + close(connectionsErrChan) + + if len(connectionsErrChan) > 0 { + log.Warnf("connections metrics not found") + } + return stats.Stats, nil }