Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker conn metrics #461

Merged
merged 14 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/maxbrunsfeld/counterfeiter/v6 v6.6.2
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/go-proto-validators v0.3.2
github.com/nginxinc/nginx-plus-go-client v0.10.0
github.com/nginxinc/nginx-plus-go-client v0.11.0
github.com/nginxinc/nginx-prometheus-exporter v0.11.0
github.com/nxadm/tail v1.4.8
github.com/orcaman/concurrent-map v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 h1:4kuARK6Y6Fx
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354/go.mod h1:KSVJerMDfblTH7p5MZaTt+8zaT2iEk3AkVb9PQdZuE8=
github.com/nginxinc/nginx-go-crossplane v0.4.24 h1:6gwZ8Boh8FS/kiZlfwSImO6UFnCjPeZ2uleHGSx4b+4=
github.com/nginxinc/nginx-go-crossplane v0.4.24/go.mod h1:UzbZnyFv0vPlt1Urbnp/mrFCzBL4tYCReFuNBpFQEfI=
github.com/nginxinc/nginx-plus-go-client v0.10.0 h1:3zsMMkPvRDo8D7ZSprXtbAEW/SDmezZWzxdyS+6oAlc=
github.com/nginxinc/nginx-plus-go-client v0.10.0/go.mod h1:0v3RsQCvRn/IyrMtW+DK6CNkz+PxEsXDJPjQ3yUMBF0=
github.com/nginxinc/nginx-plus-go-client v0.11.0 h1:XxKag3dlcF1gA0HgGsrJktbzBW4W+s369PIgT3lSR2c=
github.com/nginxinc/nginx-plus-go-client v0.11.0/go.mod h1:UvrcgWbUWEJzvbstNnPfq8Ogz9BTi1gHzlQ2ebAJisM=
github.com/nginxinc/nginx-prometheus-exporter v0.11.0 h1:21xjnqNgxtni2jDgAQ90bl15uDnrTreO9sIlu1YsX/U=
github.com/nginxinc/nginx-prometheus-exporter v0.11.0/go.mod h1:GdyHnWAb8q8OW1Pssrrqbcqra0SH0Vn6UXICMmyWkw8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down
2 changes: 1 addition & 1 deletion src/core/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
Nginx: Nginx{
Debug: false,
NginxCountingSocket: "unix:/var/run/nginx-agent/nginx.sock",
NginxClientVersion: 6,
NginxClientVersion: 7, // NGINX Plus R25+
ConfigReloadMonitoringPeriod: 10 * time.Second,
TreatWarningsAsErrors: false,
},
Expand Down
16 changes: 9 additions & 7 deletions src/core/metrics/collectors/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"reflect"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/metrics"
tutils "github.com/nginx/agent/v2/test/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

var (
Expand Down Expand Up @@ -49,10 +48,13 @@ var (
},
Features: config.Defaults.Features,
Nginx: config.Nginx{
Debug: false,
NginxCountingSocket: "unix:/var/run/nginx-agent/nginx.sock",
Debug: false,
NginxCountingSocket: "unix:/var/run/nginx-agent/nginx.sock",
NginxClientVersion: 9,
TreatWarningsAsErrors: false,
},
}

collectorConfigNoApi = &metrics.NginxCollectorConfig{
BinPath: "/path/to/nginx",
NginxId: nginxId,
Expand Down Expand Up @@ -174,8 +176,8 @@ func TestNginxCollector_Collect(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go nginxCollector.Collect(ctx, wg, make(chan<- *metrics.StatsEntityWrapper))
wg.Wait()

time.Sleep(10 * time.Millisecond)
mockNginxSource1.AssertExpectations(t)
mockNginxSource2.AssertExpectations(t)
}
Expand Down
6 changes: 6 additions & 0 deletions src/core/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ func GetCalculationMap() map[string]string {
"plus.slab.pages.free": "avg",
"plus.slab.pages.total": "avg",
"plus.slab.pages.pct_used": "avg",
"plus.worker.conn.accepted": "sum",
"plus.worker.conn.dropped": "sum",
"plus.worker.conn.active": "avg",
"plus.worker.conn.idle": "avg",
"plus.worker.http.request.total": "sum",
"plus.worker.http.request.current": "avg",
"plus.instance.count": "avg",
"container.cpu.cores": "avg",
"container.cpu.period": "avg",
Expand Down
110 changes: 100 additions & 10 deletions src/core/metrics/sources/nginx_plus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ package sources

import (
"context"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"sync"
"time"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core/metrics"
Expand All @@ -29,8 +32,9 @@ const (
peerStateUnavail = "unavail"
peerStateChecking = "checking"
peerStateUnhealthy = "unhealthy"
valueFloat64One = float64(1)
valueFloat64Zero = float64(0)

valueFloat64One = float64(1)
valueFloat64Zero = float64(0)
)

// NginxPlus generates metrics from NGINX Plus API
Expand All @@ -53,15 +57,23 @@ func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespa
func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()
c.init.Do(func() {
latestAPIVersion, err := c.getLatestAPIVersion(ctx, c.plusAPI)
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to check available api versions: %v", err))
} else {
c.clientVersion = latestAPIVersion
}

cl, err := plusclient.NewNginxClientWithVersion(&http.Client{}, c.plusAPI, c.clientVersion)
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client, %v", err))
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
return
}

c.prevStats, err = cl.GetStats()
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics, %v", err))
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
c.prevStats = nil
return
Expand All @@ -70,14 +82,14 @@ func (c *NginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *m

cl, err := plusclient.NewNginxClientWithVersion(&http.Client{}, c.plusAPI, c.clientVersion)
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client, %v", err))
c.logger.Log(fmt.Sprintf("Failed to create plus metrics client: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
return
}

stats, err := cl.GetStats()
if err != nil {
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics, %v", err))
c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err))
SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m)
return
}
Expand Down Expand Up @@ -124,12 +136,13 @@ func (c *NginxPlus) collectMetrics(stats, prevStats *plusclient.Stats) (entries
entries = append(entries, c.serverZoneMetrics(stats, prevStats)...)
entries = append(entries, c.streamServerZoneMetrics(stats, prevStats)...)
entries = append(entries, c.locationZoneMetrics(stats, prevStats)...)
entries = append(entries, c.slabMetrics(stats, prevStats)...)
entries = append(entries, c.slabMetrics(stats)...)
entries = append(entries, c.httpLimitConnsMetrics(stats, prevStats)...)
entries = append(entries, c.httpLimitRequestMetrics(stats, prevStats)...)
entries = append(entries, c.cacheMetrics(stats, prevStats)...)
entries = append(entries, c.httpUpstreamMetrics(stats, prevStats)...)
entries = append(entries, c.streamUpstreamMetrics(stats, prevStats)...)
entries = append(entries, c.workerMetrics(stats, prevStats)...)

return
}
Expand Down Expand Up @@ -752,7 +765,7 @@ func (c *NginxPlus) cacheMetrics(stats, prevStats *plusclient.Stats) []*metrics.
return zoneMetrics
}

func (c *NginxPlus) slabMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper {
func (c *NginxPlus) slabMetrics(stats *plusclient.Stats) []*metrics.StatsEntityWrapper {
l := &namedMetric{namespace: c.plusNamespace, group: ""}
slabMetrics := make([]*metrics.StatsEntityWrapper, 0)

Expand Down Expand Up @@ -828,6 +841,43 @@ func (c *NginxPlus) httpLimitRequestMetrics(stats, prevStats *plusclient.Stats)
return limitRequestMetrics
}

func (c *NginxPlus) workerMetrics(stats, prevStats *plusclient.Stats) []*metrics.StatsEntityWrapper {
workerMetrics := make([]*metrics.StatsEntityWrapper, 0)
prevWorkerProcs := make(map[uint64]*plusclient.Workers)

for _, pw := range prevStats.Workers {
prevWorkerProcs[pw.ProcessID] = pw
}

for _, w := range stats.Workers {
l := &namedMetric{namespace: c.plusNamespace, group: "worker"}

if _, exists := prevWorkerProcs[w.ProcessID]; exists {
w.Connections.Accepted = w.Connections.Accepted - prevWorkerProcs[w.ProcessID].Connections.Accepted
w.Connections.Dropped = w.Connections.Dropped - prevWorkerProcs[w.ProcessID].Connections.Dropped
w.Connections.Active = w.Connections.Active - prevWorkerProcs[w.ProcessID].Connections.Active
w.Connections.Idle = w.Connections.Idle - prevWorkerProcs[w.ProcessID].Connections.Idle
w.HTTP.HTTPRequests.Total = w.HTTP.HTTPRequests.Total - prevWorkerProcs[w.ProcessID].HTTP.HTTPRequests.Total
w.HTTP.HTTPRequests.Current = w.HTTP.HTTPRequests.Current - prevWorkerProcs[w.ProcessID].HTTP.HTTPRequests.Current
}

simpleMetrics := l.convertSamplesToSimpleMetrics(map[string]float64{
"conn.accepted": float64(w.Connections.Accepted),
"conn.dropped": float64(w.Connections.Dropped),
"conn.active": float64(w.Connections.Active),
"conn.idle": float64(w.Connections.Idle),
"http.request.total": float64(w.HTTP.HTTPRequests.Total),
"http.request.current": float64(w.HTTP.HTTPRequests.Current),
})

dims := c.baseDimensions.ToDimensions()
dims = append(dims, &proto.Dimension{Name: "process_id", Value: fmt.Sprint(w.ProcessID)})
workerMetrics = append(workerMetrics, metrics.NewStatsEntityWrapper(dims, simpleMetrics, proto.MetricsReport_INSTANCE))
}

return workerMetrics
}

func getHttpUpstreamPeerKey(peer plusclient.Peer) (key string) {
key = fmt.Sprintf("%s-%s-%s", peer.Server, peer.Service, peer.Name)
return
Expand All @@ -854,10 +904,50 @@ func createStreamPeerMap(peers []plusclient.StreamPeer) map[string]plusclient.St
return m
}

func boolToFloat64(mybool bool) float64 {
if mybool {
func boolToFloat64(myBool bool) float64 {
if myBool {
return valueFloat64One
} else {
return valueFloat64Zero
}
}

func (c *NginxPlus) getLatestAPIVersion(ctx context.Context, endpoint string) (int, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return 0, fmt.Errorf("failed to create a get request: %w", err)
}

httpClient := &http.Client{}

resp, err := httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("%v is not accessible: %w", endpoint, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("error while reading body of the response: %w", err)
}

var vers []int
err = json.Unmarshal(body, &vers)
if err != nil {
return 0, fmt.Errorf("error unmarshalling versions, got %q response: %w", string(body), err)
}

latestAPIVer := vers[len(vers)-1]
if latestAPIVer < c.clientVersion {
return 0, fmt.Errorf("%s/%v does not have a supported api version. Must be at least version %v", endpoint, latestAPIVer, c.clientVersion)
}

return latestAPIVer, nil
}
Loading