Skip to content

Commit

Permalink
Add worker conn metrics (#461)
Browse files Browse the repository at this point in the history
* Update nginx plus go client and default API version from 6 to 7. 
* Query the plus API to get latest version available
* Report worker metrics of NGINX Plus 30+
  • Loading branch information
Dean-Coakley authored Sep 13, 2023
1 parent cd6c484 commit 556f51e
Show file tree
Hide file tree
Showing 17 changed files with 480 additions and 233 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/maxbrunsfeld/counterfeiter/v6 v6.6.2
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/go-proto-validators v0.3.2
github.com/nginxinc/nginx-plus-go-client v0.10.0
github.com/nginxinc/nginx-plus-go-client v0.11.0
github.com/nginxinc/nginx-prometheus-exporter v0.11.0
github.com/nxadm/tail v1.4.8
github.com/orcaman/concurrent-map v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 h1:4kuARK6Y6Fx
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354/go.mod h1:KSVJerMDfblTH7p5MZaTt+8zaT2iEk3AkVb9PQdZuE8=
github.com/nginxinc/nginx-go-crossplane v0.4.24 h1:6gwZ8Boh8FS/kiZlfwSImO6UFnCjPeZ2uleHGSx4b+4=
github.com/nginxinc/nginx-go-crossplane v0.4.24/go.mod h1:UzbZnyFv0vPlt1Urbnp/mrFCzBL4tYCReFuNBpFQEfI=
github.com/nginxinc/nginx-plus-go-client v0.10.0 h1:3zsMMkPvRDo8D7ZSprXtbAEW/SDmezZWzxdyS+6oAlc=
github.com/nginxinc/nginx-plus-go-client v0.10.0/go.mod h1:0v3RsQCvRn/IyrMtW+DK6CNkz+PxEsXDJPjQ3yUMBF0=
github.com/nginxinc/nginx-plus-go-client v0.11.0 h1:XxKag3dlcF1gA0HgGsrJktbzBW4W+s369PIgT3lSR2c=
github.com/nginxinc/nginx-plus-go-client v0.11.0/go.mod h1:UvrcgWbUWEJzvbstNnPfq8Ogz9BTi1gHzlQ2ebAJisM=
github.com/nginxinc/nginx-prometheus-exporter v0.11.0 h1:21xjnqNgxtni2jDgAQ90bl15uDnrTreO9sIlu1YsX/U=
github.com/nginxinc/nginx-prometheus-exporter v0.11.0/go.mod h1:GdyHnWAb8q8OW1Pssrrqbcqra0SH0Vn6UXICMmyWkw8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down
2 changes: 1 addition & 1 deletion src/core/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
Nginx: Nginx{
Debug: false,
NginxCountingSocket: "unix:/var/run/nginx-agent/nginx.sock",
NginxClientVersion: 6,
NginxClientVersion: 7, // NGINX Plus R25+
ConfigReloadMonitoringPeriod: 10 * time.Second,
TreatWarningsAsErrors: false,
},
Expand Down
16 changes: 9 additions & 7 deletions src/core/metrics/collectors/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"reflect"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/metrics"
tutils "github.com/nginx/agent/v2/test/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

var (
Expand Down Expand Up @@ -49,10 +48,13 @@ var (
},
Features: config.Defaults.Features,
Nginx: config.Nginx{
Debug: false,
NginxCountingSocket: "unix:/var/run/nginx-agent/nginx.sock",
Debug: false,
NginxCountingSocket: "unix:/var/run/nginx-agent/nginx.sock",
NginxClientVersion: 9,
TreatWarningsAsErrors: false,
},
}

collectorConfigNoApi = &metrics.NginxCollectorConfig{
BinPath: "/path/to/nginx",
NginxId: nginxId,
Expand Down Expand Up @@ -174,8 +176,8 @@ func TestNginxCollector_Collect(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go nginxCollector.Collect(ctx, wg, make(chan<- *metrics.StatsEntityWrapper))
wg.Wait()

time.Sleep(10 * time.Millisecond)
mockNginxSource1.AssertExpectations(t)
mockNginxSource2.AssertExpectations(t)
}
Expand Down
6 changes: 6 additions & 0 deletions src/core/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ func GetCalculationMap() map[string]string {
"plus.slab.pages.free": "avg",
"plus.slab.pages.total": "avg",
"plus.slab.pages.pct_used": "avg",
"plus.worker.conn.accepted": "sum",
"plus.worker.conn.dropped": "sum",
"plus.worker.conn.active": "avg",
"plus.worker.conn.idle": "avg",
"plus.worker.http.request.total": "sum",
"plus.worker.http.request.current": "avg",
"plus.instance.count": "avg",
"container.cpu.cores": "avg",
"container.cpu.period": "avg",
Expand Down
110 changes: 100 additions & 10 deletions src/core/metrics/sources/nginx_plus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ package sources

import (
"context"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"sync"
"time"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core/metrics"
Expand All @@ -29,8 +32,9 @@ const (
peerStateUnavail = "unavail"
peerStateChecking = "checking"
peerStateUnhealthy = "unhealthy"
valueFloat64One = float64(1)
valueFloat64Zero = float64(0)

valueFloat64One = float64(1)
valueFloat64Zero = float64(0)
)

// NginxPlus generates metrics from NGINX Plus API
Expand All @@ -53,15 +57,23 @@ func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespa
func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()
c.init.Do(func() {
latestAPIVersion, err := c.getLatestAPIVersion(ctx, c.plusAPI)
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to check available api versions: %v", err))
} else {
c.clientVersion = latestAPIVersion
}

cl, err := plusclient.NewNginxClientWithVersion(&http.Client{}, c.plusAPI, c.clientVersion)
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client, %v", err))
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
return
}

c.prevStats, err = cl.GetStats()
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics, %v", err))
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
c.prevStats = nil
return
Expand All @@ -70,14 +82,14 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m

cl, err := plusclient.NewNginxClientWithVersion(&http.Client{}, c.plusAPI, c.clientVersion)
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client, %v", err))
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
return
}

stats, err := cl.GetStats()
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics, %v", err))
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
return
}
Expand Down Expand Up @@ -124,12 +136,13 @@ func (c *NginxPlus) collectMetrics(stats, prevStats *plusclient.Stats) (entries
entries = append(entries, c.serverZoneMetrics(stats, prevStats)...)
entries = append(entries, c.streamServerZoneMetrics(stats, prevStats)...)
entries = append(entries, c.locationZoneMetrics(stats, prevStats)...)
entries = append(entries, c.slabMetrics(stats, prevStats)...)
entries = append(entries, c.slabMetrics(stats)...)
entries = append(entries, c.httpLimitConnsMetrics(stats, prevStats)...)
entries = append(entries, c.httpLimitRequestMetrics(stats, prevStats)...)
entries = append(entries, c.cacheMetrics(stats, prevStats)...)
entries = append(entries, c.httpUpstreamMetrics(stats, prevStats)...)
entries = append(entries, c.streamUpstreamMetrics(stats, prevStats)...)
entries = append(entries, c.workerMetrics(stats, prevStats)...)

return
}
Expand Down Expand Up @@ -752,7 +765,7 @@ func (c *NginxPlus) cacheMetrics(stats, prevStats *plusclient.Stats) []*metrics.
return zoneMetrics
}

func (c *NginxPlus) slabMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper {
func (c *NginxPlus) slabMetrics(stats *plusclient.Stats) []*metrics.StatsEntityWrapper {
l := &namedMetric{namespace: c.plusNamespace, group: ""}
slabMetrics := make([]*metrics.StatsEntityWrapper, 0)

Expand Down Expand Up @@ -828,6 +841,43 @@ func (c *NginxPlus) httpLimitRequestMetrics(stats, prevStats *plusclient.Stats)
return limitRequestMetrics
}

func (c *NginxPlus) workerMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper {
workerMetrics := make([]*metrics.StatsEntityWrapper, 0)
prevWorkerProcs := make(map[uint64]*plusclient.Workers)

for _, pw := range prevStats.Workers {
prevWorkerProcs[pw.ProcessID] = pw
}

for _, w := range stats.Workers {
l := &namedMetric{namespace: c.plusNamespace, group: "worker"}

if _, exists := prevWorkerProcs[w.ProcessID]; exists {
w.Connections.Accepted = w.Connections.Accepted - prevWorkerProcs[w.ProcessID].Connections.Accepted
w.Connections.Dropped = w.Connections.Dropped - prevWorkerProcs[w.ProcessID].Connections.Dropped
w.Connections.Active = w.Connections.Active - prevWorkerProcs[w.ProcessID].Connections.Active
w.Connections.Idle = w.Connections.Idle - prevWorkerProcs[w.ProcessID].Connections.Idle
w.HTTP.HTTPRequests.Total = w.HTTP.HTTPRequests.Total - prevWorkerProcs[w.ProcessID].HTTP.HTTPRequests.Total
w.HTTP.HTTPRequests.Current = w.HTTP.HTTPRequests.Current - prevWorkerProcs[w.ProcessID].HTTP.HTTPRequests.Current
}

simpleMetrics := l.convertSamplesToSimpleMetrics(map[string]float64{
"conn.accepted": float64(w.Connections.Accepted),
"conn.dropped": float64(w.Connections.Dropped),
"conn.active": float64(w.Connections.Active),
"conn.idle": float64(w.Connections.Idle),
"http.request.total": float64(w.HTTP.HTTPRequests.Total),
"http.request.current": float64(w.HTTP.HTTPRequests.Current),
})

dims := c.baseDimensions.ToDimensions()
dims = append(dims, &proto.Dimension{Name: "process_id", Value: fmt.Sprint(w.ProcessID)})
workerMetrics = append(workerMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE))
}

return workerMetrics
}

func getHttpUpstreamPeerKey(peer plusclient.Peer) (key string) {
key = fmt.Sprintf("%s-%s-%s", peer.Server, peer.Service, peer.Name)
return
Expand All @@ -854,10 +904,50 @@ func createStreamPeerMap(peers []plusclient.StreamPeer) map[string]plusclient.St
return m
}

func boolToFloat64(mybool bool) float64 {
if mybool {
func boolToFloat64(myBool bool) float64 {
if myBool {
return valueFloat64One
} else {
return valueFloat64Zero
}
}

func (c *NginxPlus) getLatestAPIVersion(ctx context.Context, endpoint string) (int, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return 0, fmt.Errorf("failed to create a get request: %w", err)
}

httpClient := &http.Client{}

resp, err := httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("%v is not accessible: %w", endpoint, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("error while reading body of the response: %w", err)
}

var vers []int
err = json.Unmarshal(body, &vers)
if err != nil {
return 0, fmt.Errorf("error unmarshalling versions, got %q response: %w", string(body), err)
}

latestAPIVer := vers[len(vers)-1]
if latestAPIVer < c.clientVersion {
return 0, fmt.Errorf("%s/%v does not have a supported api version. Must be at least version %v", endpoint, latestAPIVer, c.clientVersion)
}

return latestAPIVer, nil
}
Loading

0 comments on commit 556f51e

Please sign in to comment.