From cd7c63e2851fbcbfa94198ed1f107623a763d82f Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Mon, 7 Oct 2024 15:45:03 +0100 Subject: [PATCH] Bump NGINX plus go client version from v1 to v2 (#879) --- .DS_Store | Bin 0 -> 6148 bytes go.mod | 2 +- go.sum | 4 +- go.work.sum | 1 + src/core/metrics/sources/nginx_plus.go | 359 +------- src/core/metrics/sources/nginx_plus_test.go | 189 +---- test/performance/go.mod | 2 +- test/performance/go.sum | 4 +- .../v2/src/core/metrics/sources/nginx_plus.go | 359 +------- .../nginx-plus-go-client/{ => v2}/LICENSE | 0 .../{ => v2}/client/nginx.go | 769 +++++++++++------- test/performance/vendor/modules.txt | 6 +- .../nginx-plus-go-client/{ => v2}/LICENSE | 0 .../{ => v2}/client/nginx.go | 769 +++++++++++------- vendor/modules.txt | 6 +- 15 files changed, 1016 insertions(+), 1454 deletions(-) create mode 100644 .DS_Store rename test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/{ => v2}/LICENSE (100%) rename test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/{ => v2}/client/nginx.go (65%) rename vendor/github.com/nginxinc/nginx-plus-go-client/{ => v2}/LICENSE (100%) rename vendor/github.com/nginxinc/nginx-plus-go-client/{ => v2}/client/nginx.go (65%) diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 0 { - log.Warnf("no useful metrics found in stream stats") - } - } - - // report connection metrics separately so it does not influence the results (all http requests show in the metrics) - var connectionsWg sync.WaitGroup - connectionsErrChan := make(chan error, 1) - - connectionsWg.Add(1) - go func() { - defer connectionsWg.Done() - connections, err := client.GetConnections() - if err != nil { - connectionsErrChan <- fmt.Errorf("failed to get connections: %w", err) - return - } - stats.Connections = *connections - }() - - connectionsWg.Wait() - close(connectionsErrChan) - - if len(connectionsErrChan) > 0 { - log.Warnf("connections metrics not found") - } - - return stats.Stats, nil -} - func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI @@ -1265,43 +956,3 @@ func boolToFloat64(myBool bool) float64 { return valueFloat64Zero } } - -func (c *NginxPlus) getLatestAPIVersion(ctx context.Context, endpoint string) (int, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) - if err != nil { - return 0, fmt.Errorf("failed to create a get request: %w", err) - } - - httpClient := &http.Client{} - - resp, err := httpClient.Do(req) - if err != nil { - return 0, fmt.Errorf("%v is not accessible: %w", endpoint, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("error while reading body of the response: %w", err) - } - - var vers []int - err = json.Unmarshal(body, &vers) - if err != nil { - return 0, fmt.Errorf("error unmarshalling versions, got %q response: %w", string(body), err) - } - - latestAPIVer := vers[len(vers)-1] - if latestAPIVer < c.clientVersion { - return 0, fmt.Errorf("%s/%v does not have a supported api version. Must be at least version %v", endpoint, latestAPIVer, c.clientVersion) - } - - return latestAPIVer, nil -} diff --git a/src/core/metrics/sources/nginx_plus_test.go b/src/core/metrics/sources/nginx_plus_test.go index bc9d65049..28224cfa5 100644 --- a/src/core/metrics/sources/nginx_plus_test.go +++ b/src/core/metrics/sources/nginx_plus_test.go @@ -9,18 +9,14 @@ package sources import ( "context" - "fmt" - "net/http" - "net/http/httptest" "testing" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" - plusclient "github.com/nginxinc/nginx-plus-go-client/client" + plusclient "github.com/nginxinc/nginx-plus-go-client/v2/client" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const ( @@ -573,113 +569,6 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, m chan<- *metrics.StatsEnti f.sendMetrics(ctx, m, f.collectMetrics(&stats, &prevStats)...) } -var _ Client = (*MockClient)(nil) - -type MockClient struct{} - -func (m *MockClient) GetAvailableEndpoints() ([]string, error) { - return []string{"stream"}, nil -} - -func (m *MockClient) GetAvailableStreamEndpoints() ([]string, error) { - return availableZones, nil -} - -func (m *MockClient) GetStreamServerZones() (*plusclient.StreamServerZones, error) { - return &stats.StreamServerZones, nil -} - -func (m *MockClient) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) { - return &stats.StreamUpstreams, nil -} - -func (m *MockClient) GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) { - return &stats.StreamLimitConnections, nil -} - -func (m *MockClient) GetStreamZoneSync() (*plusclient.StreamZoneSync, error) { - return &plusclient.StreamZoneSync{ - Zones: stats.StreamZoneSync.Zones, - Status: stats.StreamZoneSync.Status, - }, nil -} - -func (m *MockClient) GetNginxInfo() (*plusclient.NginxInfo, error) { - return &stats.NginxInfo, nil -} - -func (m *MockClient) GetCaches() (*plusclient.Caches, error) { - return &stats.Caches, nil -} - -func (m *MockClient) GetProcesses() (*plusclient.Processes, error) { - return &stats.Processes, nil -} - -func (m *MockClient) GetSlabs() (*plusclient.Slabs, error) { - return &stats.Slabs, nil -} - -func (m *MockClient) GetConnections() (*plusclient.Connections, error) { - return &stats.Connections, nil -} - -func (m *MockClient) GetHTTPRequests() (*plusclient.HTTPRequests, error) { - return &stats.HTTPRequests, nil -} - -func (m *MockClient) GetSSL() (*plusclient.SSL, error) { - return &stats.SSL, nil -} - -func (m *MockClient) GetServerZones() (*plusclient.ServerZones, error) { - return &stats.ServerZones, nil -} - -func (m *MockClient) GetUpstreams() (*plusclient.Upstreams, error) { - return &stats.Upstreams, nil -} - -func (m *MockClient) GetLocationZones() (*plusclient.LocationZones, error) { - return &stats.LocationZones, nil -} - -func (m *MockClient) GetResolvers() (*plusclient.Resolvers, error) { - return &stats.Resolvers, nil -} - -func (m *MockClient) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) { - return &stats.HTTPLimitRequests, nil -} - -func (m *MockClient) GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) { - return &stats.HTTPLimitConnections, nil -} - -func (m *MockClient) GetWorkers() ([]*plusclient.Workers, error) { - return stats.Workers, nil -} - -func TestGetStats(t *testing.T) { - client := &MockClient{} - - source := NewNginxPlus(nil, "", "", "", 9) - - tests := []struct { - stats plusclient.Stats - }{ - { - stats: stats, - }, - } - - for _, test := range tests { - resultStats, err := source.getStats(client) - require.NoError(t, err) - assert.Equal(t, test.stats, *resultStats) - } -} - func TestNginxPlusUpdate(t *testing.T) { nginxPlus := NewNginxPlus(&metrics.CommonDim{}, "test", PlusNamespace, "http://localhost:8080/api", 6) @@ -1244,79 +1133,3 @@ func TestNginxPlus_Collect(t *testing.T) { assert.Len(t, extraMetrics, 0, "metrics returned but not tested") } } - -func TestGetLatestAPIVersion(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - switch req.URL.String() { - case "/api": - data := []byte("[1,2,3,4,5,6,7,8,9]") - _, err := rw.Write(data) - require.NoError(t, err) - case "/oldapi": - data := []byte("[1,2,3,4,5]") - _, err := rw.Write(data) - require.NoError(t, err) - case "/api25": - data := []byte("[1,2,3,4,5,6,7]") - _, err := rw.Write(data) - require.NoError(t, err) - default: - rw.WriteHeader(http.StatusInternalServerError) - data := []byte("") - _, err := rw.Write(data) - require.NoError(t, err) - } - })) - defer server.Close() - - tests := []struct { - name string - clientVersion int - endpoint string - expectedAPIVersion int - expectErr bool - }{ - { - name: "NGINX Plus R30", - clientVersion: 7, - endpoint: "/api", - expectedAPIVersion: 9, - expectErr: false, - }, - { - name: "NGINX Plus R25", - clientVersion: 7, - endpoint: "/api25", - expectedAPIVersion: 7, - expectErr: false, - }, - { - name: "old nginx plus", - clientVersion: 7, - endpoint: "/oldapi", - expectedAPIVersion: 0, - expectErr: true, - }, - { - name: "invalid path", - clientVersion: 7, - endpoint: "/notexisting", - expectedAPIVersion: 0, - expectErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &NginxPlus{ - clientVersion: tt.clientVersion, - } - got, err := c.getLatestAPIVersion(context.Background(), fmt.Sprintf("%s%s", server.URL, tt.endpoint)) - if tt.expectErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - assert.Equal(t, tt.expectedAPIVersion, got) - }) - } -} diff --git a/test/performance/go.mod b/test/performance/go.mod index 6af355816..0ad3f6408 100644 --- a/test/performance/go.mod +++ b/test/performance/go.mod @@ -46,7 +46,7 @@ require ( github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/nginxinc/nginx-go-crossplane v0.4.48 // indirect - github.com/nginxinc/nginx-plus-go-client v1.2.2 // indirect + github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1 // indirect github.com/nginxinc/nginx-prometheus-exporter v1.2.0 // indirect github.com/nxadm/tail v1.4.11 // indirect github.com/orcaman/concurrent-map v1.0.0 // indirect diff --git a/test/performance/go.sum b/test/performance/go.sum index 1245a2bca..60726c12a 100644 --- a/test/performance/go.sum +++ b/test/performance/go.sum @@ -98,8 +98,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nginxinc/nginx-go-crossplane v0.4.48 h1:Cf8sn8dTLJevtvNjGsla/wwa+rPTjm8eTfwSMqWisoI= github.com/nginxinc/nginx-go-crossplane v0.4.48/go.mod h1:fgSibLM12jGRsh7QHpgL8wTKMEbfc594vSLK9ovwM6U= -github.com/nginxinc/nginx-plus-go-client v1.2.2 h1:sl7HqNDDZq2EVu0eQQVoZ6PKYGa4h2dB/Qr5Ib0YKGw= -github.com/nginxinc/nginx-plus-go-client v1.2.2/go.mod h1:n8OFLzrJulJ2fur28Cwa1Qp5DZNS2VicLV+Adt30LQ4= +github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1 h1:5VVK38bnELMDWnwfF6dSv57ResXh9AUzeDa72ENj94o= +github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1/go.mod h1:He+1izxYxVVO5/C9ZTukwOpvkAx5eS19nRQgKXDhX5I= github.com/nginxinc/nginx-prometheus-exporter v1.2.0 h1:jmu63tEm3hcupIpaIH72NEdwuNgGaQ/9q0IrnNqaPto= github.com/nginxinc/nginx-prometheus-exporter v1.2.0/go.mod h1:y8KANg4+7e/9HxT3vZDetjETLR5YhU0zv2WhZquo7C0= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go index 105b063df..bae80365e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_plus.go @@ -9,20 +9,14 @@ package sources import ( "context" - "encoding/json" - "errors" "fmt" - "io" "math" - "net/http" - "slices" "sync" - "time" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core/metrics" - plusclient "github.com/nginxinc/nginx-plus-go-client/client" + plusclient "github.com/nginxinc/nginx-plus-go-client/v2/client" log "github.com/sirupsen/logrus" ) @@ -87,21 +81,14 @@ func NewNginxPlus(baseDimensions *metrics.CommonDim, nginxNamespace, plusNamespa func (c *NginxPlus) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { c.init.Do(func() { - latestAPIVersion, err := c.getLatestAPIVersion(ctx, c.plusAPI) - if err != nil { - c.logger.Log(fmt.Sprintf("Failed to check available api versions: %v", err)) - } else { - c.clientVersion = latestAPIVersion - } - - cl, err := plusclient.NewNginxClient(c.plusAPI, plusclient.WithAPIVersion(c.clientVersion)) + cl, err := plusclient.NewNginxClient(c.plusAPI, plusclient.WithMaxAPIVersion()) if err != nil { c.logger.Log(fmt.Sprintf("Failed to create plus metrics client: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) return } - c.prevStats, err = c.getStats(cl) + c.prevStats, err = cl.GetStats(ctx) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -110,7 +97,7 @@ func (c *NginxPlus) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWr } }) - cl, err := plusclient.NewNginxClient(c.plusAPI, plusclient.WithAPIVersion(c.clientVersion)) + cl, err := plusclient.NewNginxClient(c.plusAPI, plusclient.WithMaxAPIVersion()) if err != nil { c.logger.Log(fmt.Sprintf("Failed to create plus metrics client: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -119,7 +106,7 @@ func (c *NginxPlus) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWr log.Debug("NGINX_plus_Collect: getting stats") - stats, err := c.getStats(cl) + stats, err := cl.GetStats(ctx) if err != nil { c.logger.Log(fmt.Sprintf("Failed to retrieve plus metrics: %v", err)) SendNginxDownStatus(ctx, c.baseDimensions.ToDimensions(), m) @@ -144,302 +131,6 @@ func (c *NginxPlus) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWr c.prevStats = stats } -func (c *NginxPlus) defaultStats() *plusclient.Stats { - return &plusclient.Stats{ - Upstreams: map[string]plusclient.Upstream{}, - ServerZones: map[string]plusclient.ServerZone{}, - StreamServerZones: map[string]plusclient.StreamServerZone{}, - StreamUpstreams: map[string]plusclient.StreamUpstream{}, - Slabs: map[string]plusclient.Slab{}, - Caches: map[string]plusclient.HTTPCache{}, - HTTPLimitConnections: map[string]plusclient.LimitConnection{}, - StreamLimitConnections: map[string]plusclient.LimitConnection{}, - HTTPLimitRequests: map[string]plusclient.HTTPLimitRequest{}, - Resolvers: map[string]plusclient.Resolver{}, - LocationZones: map[string]plusclient.LocationZone{}, - StreamZoneSync: &plusclient.StreamZoneSync{}, - Workers: []*plusclient.Workers{}, - NginxInfo: plusclient.NginxInfo{}, - SSL: plusclient.SSL{}, - Connections: plusclient.Connections{}, - HTTPRequests: plusclient.HTTPRequests{}, - Processes: plusclient.Processes{}, - } -} - -func (c *NginxPlus) getStats(client Client) (*plusclient.Stats, error) { - var initialStatsWg sync.WaitGroup - initialStatsErrChan := make(chan error, 15) - stats := &ExtendedStats{ - Stats: c.defaultStats(), - } - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - endpoints, err := client.GetAvailableEndpoints() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get Available Endpoints: %w", err) - return - } - stats.endpoints = endpoints - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - streamEndpoints, err := client.GetAvailableStreamEndpoints() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get Available Stream Endpoints: %w", err) - return - } - stats.streamEndpoints = streamEndpoints - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - nginxInfo, err := client.GetNginxInfo() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) - return - } - stats.NginxInfo = *nginxInfo - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - caches, err := client.GetCaches() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get NGINX info: %w", err) - return - } - stats.Caches = *caches - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - processes, err := client.GetProcesses() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) - return - } - stats.Processes = *processes - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - slabs, err := client.GetSlabs() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get processes: %w", err) - return - } - stats.Slabs = *slabs - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - httpRequests, err := client.GetHTTPRequests() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get HTTPRequests: %w", err) - return - } - stats.HTTPRequests = *httpRequests - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - ssl, err := client.GetSSL() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get SSL: %w", err) - return - } - stats.SSL = *ssl - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - serverZones, err := client.GetServerZones() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get ServerZones: %w", err) - return - } - stats.ServerZones = *serverZones - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - upstreams, err := client.GetUpstreams() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get Upstreams: %w", err) - return - } - stats.Upstreams = *upstreams - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - locationZones, err := client.GetLocationZones() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get LocationZones: %w", err) - return - } - stats.LocationZones = *locationZones - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - resolvers, err := client.GetResolvers() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get Resolvers: %w", err) - return - } - stats.Resolvers = *resolvers - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - httpLimitRequests, err := client.GetHTTPLimitReqs() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitRequests: %w", err) - return - } - stats.HTTPLimitRequests = *httpLimitRequests - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - httpLimitConnections, err := client.GetHTTPConnectionsLimit() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get HTTPLimitConnections: %w", err) - return - } - stats.HTTPLimitConnections = *httpLimitConnections - }() - - initialStatsWg.Add(1) - go func() { - defer initialStatsWg.Done() - workers, err := client.GetWorkers() - if err != nil { - initialStatsErrChan <- fmt.Errorf("failed to get Workers: %w", err) - return - } - stats.Workers = workers - }() - - initialStatsWg.Wait() - close(initialStatsErrChan) - - // only error if all the stats are empty - if len(initialStatsErrChan) == 16 { - return nil, errors.New("no useful metrics found") - } - - if slices.Contains(stats.endpoints, "stream") { - var streamStatsWg sync.WaitGroup - // this may never be 4 depending on the if conditions - streamStatsErrChan := make(chan error, 4) - - if slices.Contains(stats.streamEndpoints, "server_zones") { - streamStatsWg.Add(1) - go func() { - defer streamStatsWg.Done() - streamServerZones, err := client.GetStreamServerZones() - if err != nil { - streamStatsErrChan <- fmt.Errorf("failed to get streamServerZones: %w", err) - return - } - stats.StreamServerZones = *streamServerZones - }() - } - - if slices.Contains(stats.streamEndpoints, "upstreams") { - streamStatsWg.Add(1) - go func() { - defer streamStatsWg.Done() - streamUpstreams, err := client.GetStreamUpstreams() - if err != nil { - streamStatsErrChan <- fmt.Errorf("failed to get StreamUpstreams: %w", err) - return - } - stats.StreamUpstreams = *streamUpstreams - }() - } - - if slices.Contains(stats.streamEndpoints, "limit_conns") { - - streamStatsWg.Add(1) - go func() { - defer streamStatsWg.Done() - streamConnectionsLimit, err := client.GetStreamConnectionsLimit() - if err != nil { - streamStatsErrChan <- fmt.Errorf("failed to get StreamLimitConnections: %w", err) - return - } - stats.StreamLimitConnections = *streamConnectionsLimit - }() - - } - - if slices.Contains(stats.streamEndpoints, "limit_conns") { - - streamStatsWg.Add(1) - go func() { - defer streamStatsWg.Done() - streamZoneSync, err := client.GetStreamZoneSync() - if err != nil { - streamStatsErrChan <- fmt.Errorf("failed to get StreamZoneSync: %w", err) - return - } - stats.StreamZoneSync = streamZoneSync - }() - - } - streamStatsWg.Wait() - close(streamStatsErrChan) - - if len(streamStatsErrChan) > 0 { - log.Warnf("no useful metrics found in stream stats") - } - } - - // report connection metrics separately so it does not influence the results (all http requests show in the metrics) - var connectionsWg sync.WaitGroup - connectionsErrChan := make(chan error, 1) - - connectionsWg.Add(1) - go func() { - defer connectionsWg.Done() - connections, err := client.GetConnections() - if err != nil { - connectionsErrChan <- fmt.Errorf("failed to get connections: %w", err) - return - } - stats.Connections = *connections - }() - - connectionsWg.Wait() - close(connectionsErrChan) - - if len(connectionsErrChan) > 0 { - log.Warnf("connections metrics not found") - } - - return stats.Stats, nil -} - func (c *NginxPlus) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.baseDimensions = dimensions c.plusAPI = collectorConf.PlusAPI @@ -1265,43 +956,3 @@ func boolToFloat64(myBool bool) float64 { return valueFloat64Zero } } - -func (c *NginxPlus) getLatestAPIVersion(ctx context.Context, endpoint string) (int, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) - if err != nil { - return 0, fmt.Errorf("failed to create a get request: %w", err) - } - - httpClient := &http.Client{} - - resp, err := httpClient.Do(req) - if err != nil { - return 0, fmt.Errorf("%v is not accessible: %w", endpoint, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("error while reading body of the response: %w", err) - } - - var vers []int - err = json.Unmarshal(body, &vers) - if err != nil { - return 0, fmt.Errorf("error unmarshalling versions, got %q response: %w", string(body), err) - } - - latestAPIVer := vers[len(vers)-1] - if latestAPIVer < c.clientVersion { - return 0, fmt.Errorf("%s/%v does not have a supported api version. Must be at least version %v", endpoint, latestAPIVer, c.clientVersion) - } - - return latestAPIVer, nil -} diff --git a/test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/LICENSE b/test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/v2/LICENSE similarity index 100% rename from test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/LICENSE rename to test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/v2/LICENSE diff --git a/test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/client/nginx.go b/test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/v2/client/nginx.go similarity index 65% rename from test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/client/nginx.go rename to test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/v2/client/nginx.go index 87503ecae..7d16420ac 100644 --- a/test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/client/nginx.go +++ b/test/performance/vendor/github.com/nginxinc/nginx-plus-go-client/v2/client/nginx.go @@ -11,7 +11,10 @@ import ( "reflect" "slices" "strings" + "sync" "time" + + "golang.org/x/sync/errgroup" ) const ( @@ -37,8 +40,13 @@ var ( defaultWeight = 1 ) -// ErrUnsupportedVer means that client's API version is not supported by NGINX plus API. -var ErrUnsupportedVer = errors.New("API version of the client is not supported by running NGINX Plus") +var ( + ErrParameterRequired = errors.New("parameter is required") + ErrServerNotFound = errors.New("server not found") + ErrServerExists = errors.New("server already exists") + ErrNotSupported = errors.New("not supported") + ErrInvalidTimeout = errors.New("invalid timeout") +) // NginxClient lets you access NGINX Plus API. type NginxClient struct { @@ -116,6 +124,40 @@ func (internalError *internalError) Wrap(err string) *internalError { return internalError } +// this is an internal representation of the Stats object including endpoint and streamEndpoint lists. +type extendedStats struct { + endpoints []string + streamEndpoints []string + Stats +} + +func defaultStats() *extendedStats { + return &extendedStats{ + endpoints: []string{}, + streamEndpoints: []string{}, + Stats: Stats{ + Upstreams: map[string]Upstream{}, + ServerZones: map[string]ServerZone{}, + StreamServerZones: map[string]StreamServerZone{}, + StreamUpstreams: map[string]StreamUpstream{}, + Slabs: map[string]Slab{}, + Caches: map[string]HTTPCache{}, + HTTPLimitConnections: map[string]LimitConnection{}, + StreamLimitConnections: map[string]LimitConnection{}, + HTTPLimitRequests: map[string]HTTPLimitRequest{}, + Resolvers: map[string]Resolver{}, + LocationZones: map[string]LocationZone{}, + StreamZoneSync: nil, + Workers: []*Workers{}, + NginxInfo: NginxInfo{}, + SSL: SSL{}, + Connections: Connections{}, + HTTPRequests: HTTPRequests{}, + Processes: Processes{}, + }, + } +} + // Stats represents NGINX Plus stats fetched from the NGINX Plus API. // https://nginx.org/en/docs/http/ngx_http_api_module.html type Stats struct { @@ -546,6 +588,19 @@ func WithCheckAPI() Option { } } +// WithMaxAPIVersion sets the API version to the max API version. +func WithMaxAPIVersion() Option { + return func(o *NginxClient) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + version, err := o.GetMaxAPIVersion(ctx) + if err != nil { + return + } + o.apiVersion = version + } +} + // NewNginxClient creates a new NginxClient. func NewNginxClient(apiEndpoint string, opts ...Option) (*NginxClient, error) { c := &NginxClient{ @@ -560,15 +615,17 @@ func NewNginxClient(apiEndpoint string, opts ...Option) (*NginxClient, error) { } if c.httpClient == nil { - return nil, errors.New("http client is not set") + return nil, fmt.Errorf("http client: %w", ErrParameterRequired) } if !versionSupported(c.apiVersion) { - return nil, fmt.Errorf("API version %v is not supported by the client", c.apiVersion) + return nil, fmt.Errorf("API version %v: %w by the client", c.apiVersion, ErrNotSupported) } if c.checkAPI { - versions, err := getAPIVersions(c.httpClient, apiEndpoint) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + versions, err := c.getAPIVersions(ctx, c.httpClient, apiEndpoint) if err != nil { return nil, fmt.Errorf("error accessing the API: %w", err) } @@ -580,7 +637,7 @@ func NewNginxClient(apiEndpoint string, opts ...Option) (*NginxClient, error) { } } if !found { - return nil, fmt.Errorf("API version %v is not supported by the server", c.apiVersion) + return nil, fmt.Errorf("API version %v: %w by the server", c.apiVersion, ErrNotSupported) } } @@ -596,10 +653,24 @@ func versionSupported(n int) bool { return false } -func getAPIVersions(httpClient *http.Client, endpoint string) (*versions, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() +// GetMaxAPIVersion returns the maximum API version supported by the server and the client. +func (client *NginxClient) GetMaxAPIVersion(ctx context.Context) (int, error) { + serverVersions, err := client.getAPIVersions(ctx, client.httpClient, client.apiEndpoint) + if err != nil { + return 0, fmt.Errorf("failed to get max API version: %w", err) + } + + maxServerVersion := slices.Max(*serverVersions) + maxClientVersion := slices.Max(supportedAPIVersions) + + if maxServerVersion > maxClientVersion { + return maxClientVersion, nil + } + + return maxServerVersion, nil +} +func (client *NginxClient) getAPIVersions(ctx context.Context, httpClient *http.Client, endpoint string) (*versions, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if err != nil { return nil, fmt.Errorf("failed to create a get request: %w", err) @@ -611,7 +682,9 @@ func getAPIVersions(httpClient *http.Client, endpoint string) (*versions, error) defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode) + return nil, createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf( + "failed to get endpoint %q, expected %v response, got %v", + endpoint, http.StatusOK, resp.StatusCode)) } body, err := io.ReadAll(resp.Body) @@ -658,17 +731,17 @@ func readAPIErrorResponse(respBody io.ReadCloser) (*apiErrorResponse, error) { } // CheckIfUpstreamExists checks if the upstream exists in NGINX. If the upstream doesn't exist, it returns the error. -func (client *NginxClient) CheckIfUpstreamExists(upstream string) error { - _, err := client.GetHTTPServers(upstream) +func (client *NginxClient) CheckIfUpstreamExists(ctx context.Context, upstream string) error { + _, err := client.GetHTTPServers(ctx, upstream) return err } // GetHTTPServers returns the servers of the upstream from NGINX. -func (client *NginxClient) GetHTTPServers(upstream string) ([]UpstreamServer, error) { +func (client *NginxClient) GetHTTPServers(ctx context.Context, upstream string) ([]UpstreamServer, error) { path := fmt.Sprintf("http/upstreams/%v/servers", upstream) var servers []UpstreamServer - err := client.get(path, &servers) + err := client.get(ctx, path, &servers) if err != nil { return nil, fmt.Errorf("failed to get the HTTP servers of upstream %v: %w", upstream, err) } @@ -677,17 +750,17 @@ func (client *NginxClient) GetHTTPServers(upstream string) ([]UpstreamServer, er } // AddHTTPServer adds the server to the upstream. -func (client *NginxClient) AddHTTPServer(upstream string, server UpstreamServer) error { - id, err := client.getIDOfHTTPServer(upstream, server.Server) +func (client *NginxClient) AddHTTPServer(ctx context.Context, upstream string, server UpstreamServer) error { + id, err := client.getIDOfHTTPServer(ctx, upstream, server.Server) if err != nil { return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, err) } if id != -1 { - return fmt.Errorf("failed to add %v server to %v upstream: server already exists", server.Server, upstream) + return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, ErrServerExists) } path := fmt.Sprintf("http/upstreams/%v/servers/", upstream) - err = client.post(path, &server) + err = client.post(ctx, path, &server) if err != nil { return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, err) } @@ -696,17 +769,17 @@ func (client *NginxClient) AddHTTPServer(upstream string, server UpstreamServer) } // DeleteHTTPServer the server from the upstream. -func (client *NginxClient) DeleteHTTPServer(upstream string, server string) error { - id, err := client.getIDOfHTTPServer(upstream, server) +func (client *NginxClient) DeleteHTTPServer(ctx context.Context, upstream string, server string) error { + id, err := client.getIDOfHTTPServer(ctx, upstream, server) if err != nil { return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, err) } if id == -1 { - return fmt.Errorf("failed to remove %v server from %v upstream: server doesn't exist", server, upstream) + return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, ErrServerNotFound) } path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, id) - err = client.delete(path, http.StatusOK) + err = client.delete(ctx, path, http.StatusOK) if err != nil { return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, err) } @@ -718,8 +791,8 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. // Servers that are in the slice and exist in NGINX, but have different parameters, will be updated. -func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) { - serversInNginx, err := client.GetHTTPServers(upstream) +func (client *NginxClient) UpdateHTTPServers(ctx context.Context, upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) { + serversInNginx, err := client.GetHTTPServers(ctx, upstream) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } @@ -734,21 +807,21 @@ func (client *NginxClient) UpdateHTTPServers(upstream string, servers []Upstream toAdd, toDelete, toUpdate := determineUpdates(formattedServers, serversInNginx) for _, server := range toAdd { - err := client.AddHTTPServer(upstream, server) + err := client.AddHTTPServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } } for _, server := range toDelete { - err := client.DeleteHTTPServer(upstream, server.Server) + err := client.DeleteHTTPServer(ctx, upstream, server.Server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } } for _, server := range toUpdate { - err := client.UpdateHTTPServer(upstream, server) + err := client.UpdateHTTPServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } @@ -836,8 +909,8 @@ func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamSe return } -func (client *NginxClient) getIDOfHTTPServer(upstream string, name string) (int, error) { - servers, err := client.GetHTTPServers(upstream) +func (client *NginxClient) getIDOfHTTPServer(ctx context.Context, upstream string, name string) (int, error) { + servers, err := client.GetHTTPServers(ctx, upstream) if err != nil { return -1, fmt.Errorf("error getting id of server %v of upstream %v: %w", name, upstream, err) } @@ -851,10 +924,7 @@ func (client *NginxClient) getIDOfHTTPServer(upstream string, name string) (int, return -1, nil } -func (client *NginxClient) get(path string, data interface{}) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) get(ctx context.Context, path string, data interface{}) error { url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, client.apiVersion, path) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) @@ -885,10 +955,7 @@ func (client *NginxClient) get(path string, data interface{}) error { return nil } -func (client *NginxClient) post(path string, input interface{}) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) post(ctx context.Context, path string, input interface{}) error { url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, client.apiVersion, path) jsonInput, err := json.Marshal(input) @@ -917,10 +984,7 @@ func (client *NginxClient) post(path string, input interface{}) error { return nil } -func (client *NginxClient) delete(path string, expectedStatusCode int) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) delete(ctx context.Context, path string, expectedStatusCode int) error { path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, client.apiVersion, path) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, path, nil) @@ -942,10 +1006,7 @@ func (client *NginxClient) delete(path string, expectedStatusCode int) error { return nil } -func (client *NginxClient) patch(path string, input interface{}, expectedStatusCode int) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) patch(ctx context.Context, path string, input interface{}, expectedStatusCode int) error { path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, client.apiVersion, path) jsonInput, err := json.Marshal(input) @@ -973,17 +1034,17 @@ func (client *NginxClient) patch(path string, input interface{}, expectedStatusC } // CheckIfStreamUpstreamExists checks if the stream upstream exists in NGINX. If the upstream doesn't exist, it returns the error. -func (client *NginxClient) CheckIfStreamUpstreamExists(upstream string) error { - _, err := client.GetStreamServers(upstream) +func (client *NginxClient) CheckIfStreamUpstreamExists(ctx context.Context, upstream string) error { + _, err := client.GetStreamServers(ctx, upstream) return err } // GetStreamServers returns the stream servers of the upstream from NGINX. -func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamServer, error) { +func (client *NginxClient) GetStreamServers(ctx context.Context, upstream string) ([]StreamUpstreamServer, error) { path := fmt.Sprintf("stream/upstreams/%v/servers", upstream) var servers []StreamUpstreamServer - err := client.get(path, &servers) + err := client.get(ctx, path, &servers) if err != nil { return nil, fmt.Errorf("failed to get stream servers of upstream server %v: %w", upstream, err) } @@ -991,17 +1052,17 @@ func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamSe } // AddStreamServer adds the stream server to the upstream. -func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstreamServer) error { - id, err := client.getIDOfStreamServer(upstream, server.Server) +func (client *NginxClient) AddStreamServer(ctx context.Context, upstream string, server StreamUpstreamServer) error { + id, err := client.getIDOfStreamServer(ctx, upstream, server.Server) if err != nil { return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, err) } if id != -1 { - return fmt.Errorf("failed to add %v stream server to %v upstream: server already exists", server.Server, upstream) + return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, ErrServerExists) } path := fmt.Sprintf("stream/upstreams/%v/servers/", upstream) - err = client.post(path, &server) + err = client.post(ctx, path, &server) if err != nil { return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, err) } @@ -1009,17 +1070,17 @@ func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstrea } // DeleteStreamServer the server from the upstream. -func (client *NginxClient) DeleteStreamServer(upstream string, server string) error { - id, err := client.getIDOfStreamServer(upstream, server) +func (client *NginxClient) DeleteStreamServer(ctx context.Context, upstream string, server string) error { + id, err := client.getIDOfStreamServer(ctx, upstream, server) if err != nil { return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, err) } if id == -1 { - return fmt.Errorf("failed to remove %v stream server from %v upstream: server doesn't exist", server, upstream) + return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, ErrServerNotFound) } path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, id) - err = client.delete(path, http.StatusOK) + err = client.delete(ctx, path, http.StatusOK) if err != nil { return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, err) } @@ -1030,8 +1091,8 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. // Servers that are in the slice and exist in NGINX, but have different parameters, will be updated. -func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) { - serversInNginx, err := client.GetStreamServers(upstream) +func (client *NginxClient) UpdateStreamServers(ctx context.Context, upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) { + serversInNginx, err := client.GetStreamServers(ctx, upstream) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } @@ -1045,21 +1106,21 @@ func (client *NginxClient) UpdateStreamServers(upstream string, servers []Stream toAdd, toDelete, toUpdate := determineStreamUpdates(formattedServers, serversInNginx) for _, server := range toAdd { - err := client.AddStreamServer(upstream, server) + err := client.AddStreamServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } } for _, server := range toDelete { - err := client.DeleteStreamServer(upstream, server.Server) + err := client.DeleteStreamServer(ctx, upstream, server.Server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } } for _, server := range toUpdate { - err := client.UpdateStreamServer(upstream, server) + err := client.UpdateStreamServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } @@ -1068,8 +1129,8 @@ func (client *NginxClient) UpdateStreamServers(upstream string, servers []Stream return toAdd, toDelete, toUpdate, nil } -func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) { - servers, err := client.GetStreamServers(upstream) +func (client *NginxClient) getIDOfStreamServer(ctx context.Context, upstream string, name string) (int, error) { + servers, err := client.GetStreamServers(ctx, upstream) if err != nil { return -1, fmt.Errorf("error getting id of stream server %v of upstream %v: %w", name, upstream, err) } @@ -1162,238 +1223,400 @@ func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers } // GetStats gets process, slab, connection, request, ssl, zone, stream zone, upstream and stream upstream related stats from the NGINX Plus API. -func (client *NginxClient) GetStats() (*Stats, error) { - endpoints, err := client.GetAvailableEndpoints() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } +func (client *NginxClient) GetStats(ctx context.Context) (*Stats, error) { + initialGroup, initialCtx := errgroup.WithContext(ctx) + var mu sync.Mutex + stats := defaultStats() + // Collecting initial stats + initialGroup.Go(func() error { + endpoints, err := client.GetAvailableEndpoints(initialCtx) + if err != nil { + return fmt.Errorf("failed to get available Endpoints: %w", err) + } - info, err := client.GetNginxInfo() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.endpoints = endpoints + mu.Unlock() + return nil + }) - caches, err := client.GetCaches() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + nginxInfo, err := client.GetNginxInfo(initialCtx) + if err != nil { + return fmt.Errorf("failed to get NGINX info: %w", err) + } - processes, err := client.GetProcesses() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.NginxInfo = *nginxInfo + mu.Unlock() - slabs, err := client.GetSlabs() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - cons, err := client.GetConnections() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + caches, err := client.GetCaches(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Caches: %w", err) + } - requests, err := client.GetHTTPRequests() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Caches = *caches + mu.Unlock() - ssl, err := client.GetSSL() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - zones, err := client.GetServerZones() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + processes, err := client.GetProcesses(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Process information: %w", err) + } - upstreams, err := client.GetUpstreams() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Processes = *processes + mu.Unlock() - locationZones, err := client.GetLocationZones() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - resolvers, err := client.GetResolvers() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + slabs, err := client.GetSlabs(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Slabs: %w", err) + } - limitReqs, err := client.GetHTTPLimitReqs() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Slabs = *slabs + mu.Unlock() - limitConnsHTTP, err := client.GetHTTPConnectionsLimit() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - workers, err := client.GetWorkers() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + httpRequests, err := client.GetHTTPRequests(initialCtx) + if err != nil { + return fmt.Errorf("failed to get HTTP Requests: %w", err) + } + + mu.Lock() + stats.HTTPRequests = *httpRequests + mu.Unlock() - streamZones := &StreamServerZones{} - streamUpstreams := &StreamUpstreams{} - limitConnsStream := &StreamLimitConnections{} - var streamZoneSync *StreamZoneSync + return nil + }) - if slices.Contains(endpoints, "stream") { - streamEndpoints, err := client.GetAvailableStreamEndpoints() + initialGroup.Go(func() error { + ssl, err := client.GetSSL(initialCtx) if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) + return fmt.Errorf("failed to get SSL: %w", err) } - if slices.Contains(streamEndpoints, "server_zones") { - streamZones, err = client.GetStreamServerZones() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.SSL = *ssl + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + serverZones, err := client.GetServerZones(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Server Zones: %w", err) } - if slices.Contains(streamEndpoints, "upstreams") { - streamUpstreams, err = client.GetStreamUpstreams() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.ServerZones = *serverZones + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + upstreams, err := client.GetUpstreams(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Upstreams: %w", err) } - if slices.Contains(streamEndpoints, "limit_conns") { - limitConnsStream, err = client.GetStreamConnectionsLimit() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Upstreams = *upstreams + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + locationZones, err := client.GetLocationZones(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Location Zones: %w", err) + } + + mu.Lock() + stats.LocationZones = *locationZones + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + resolvers, err := client.GetResolvers(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Resolvers: %w", err) } - if slices.Contains(streamEndpoints, "zone_sync") { - streamZoneSync, err = client.GetStreamZoneSync() + mu.Lock() + stats.Resolvers = *resolvers + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + httpLimitRequests, err := client.GetHTTPLimitReqs(initialCtx) + if err != nil { + return fmt.Errorf("failed to get HTTPLimitRequests: %w", err) + } + + mu.Lock() + stats.HTTPLimitRequests = *httpLimitRequests + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + httpLimitConnections, err := client.GetHTTPConnectionsLimit(initialCtx) + if err != nil { + return fmt.Errorf("failed to get HTTPLimitConnections: %w", err) + } + + mu.Lock() + stats.HTTPLimitConnections = *httpLimitConnections + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + workers, err := client.GetWorkers(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Workers: %w", err) + } + + mu.Lock() + stats.Workers = workers + mu.Unlock() + + return nil + }) + + if err := initialGroup.Wait(); err != nil { + return nil, fmt.Errorf("error returned from contacting Plus API: %w", err) + } + + // Process stream endpoints if they exist + if slices.Contains(stats.endpoints, "stream") { + availableStreamGroup, asgCtx := errgroup.WithContext(ctx) + + availableStreamGroup.Go(func() error { + streamEndpoints, err := client.GetAvailableStreamEndpoints(asgCtx) if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) + return fmt.Errorf("failed to get available Stream Endpoints: %w", err) } + + mu.Lock() + stats.streamEndpoints = streamEndpoints + mu.Unlock() + + return nil + }) + + if err := availableStreamGroup.Wait(); err != nil { + return nil, fmt.Errorf("no useful metrics found in stream stats: %w", err) } + + streamGroup, sgCtx := errgroup.WithContext(ctx) + + if slices.Contains(stats.streamEndpoints, "server_zones") { + streamGroup.Go(func() error { + streamServerZones, err := client.GetStreamServerZones(sgCtx) + if err != nil { + return fmt.Errorf("failed to get streamServerZones: %w", err) + } + + mu.Lock() + stats.StreamServerZones = *streamServerZones + mu.Unlock() + + return nil + }) + } + + if slices.Contains(stats.streamEndpoints, "upstreams") { + streamGroup.Go(func() error { + streamUpstreams, err := client.GetStreamUpstreams(sgCtx) + if err != nil { + return fmt.Errorf("failed to get StreamUpstreams: %w", err) + } + + mu.Lock() + stats.StreamUpstreams = *streamUpstreams + mu.Unlock() + + return nil + }) + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + streamGroup.Go(func() error { + streamConnectionsLimit, err := client.GetStreamConnectionsLimit(sgCtx) + if err != nil { + return fmt.Errorf("failed to get StreamLimitConnections: %w", err) + } + + mu.Lock() + stats.StreamLimitConnections = *streamConnectionsLimit + mu.Unlock() + + return nil + }) + + streamGroup.Go(func() error { + streamZoneSync, err := client.GetStreamZoneSync(sgCtx) + if err != nil { + return fmt.Errorf("failed to get StreamZoneSync: %w", err) + } + + mu.Lock() + stats.StreamZoneSync = streamZoneSync + mu.Unlock() + + return nil + }) + } + + if err := streamGroup.Wait(); err != nil { + return nil, fmt.Errorf("no useful metrics found in stream stats: %w", err) + } + } + + // Report connection metrics separately so it does not influence the results + connectionsGroup, cgCtx := errgroup.WithContext(ctx) + + connectionsGroup.Go(func() error { + // replace this call with a context specific call + connections, err := client.GetConnections(cgCtx) + if err != nil { + return fmt.Errorf("failed to get connections: %w", err) + } + + mu.Lock() + stats.Connections = *connections + mu.Unlock() + + return nil + }) + + if err := connectionsGroup.Wait(); err != nil { + return nil, fmt.Errorf("connections metrics not found: %w", err) } - return &Stats{ - NginxInfo: *info, - Caches: *caches, - Processes: *processes, - Slabs: *slabs, - Connections: *cons, - HTTPRequests: *requests, - SSL: *ssl, - ServerZones: *zones, - StreamServerZones: *streamZones, - Upstreams: *upstreams, - StreamUpstreams: *streamUpstreams, - StreamZoneSync: streamZoneSync, - LocationZones: *locationZones, - Resolvers: *resolvers, - HTTPLimitRequests: *limitReqs, - HTTPLimitConnections: *limitConnsHTTP, - StreamLimitConnections: *limitConnsStream, - Workers: workers, - }, nil + return &stats.Stats, nil } // GetAvailableEndpoints returns available endpoints in the API. -func (client *NginxClient) GetAvailableEndpoints() ([]string, error) { +func (client *NginxClient) GetAvailableEndpoints(ctx context.Context) ([]string, error) { var endpoints []string - err := client.get("", &endpoints) + err := client.get(ctx, "", &endpoints) if err != nil { return nil, fmt.Errorf("failed to get endpoints: %w", err) } return endpoints, nil } -// GetAvailableStreamEndpoints returns available stream endpoints in the API. -func (client *NginxClient) GetAvailableStreamEndpoints() ([]string, error) { +// GetAvailableStreamEndpoints returns available stream endpoints in the API with a context. +func (client *NginxClient) GetAvailableStreamEndpoints(ctx context.Context) ([]string, error) { var endpoints []string - err := client.get("stream", &endpoints) + err := client.get(ctx, "stream", &endpoints) if err != nil { return nil, fmt.Errorf("failed to get endpoints: %w", err) } return endpoints, nil } -// GetNginxInfo returns Nginx stats. -func (client *NginxClient) GetNginxInfo() (*NginxInfo, error) { +// GetNginxInfo returns Nginx stats with a context. +func (client *NginxClient) GetNginxInfo(ctx context.Context) (*NginxInfo, error) { var info NginxInfo - err := client.get("nginx", &info) + err := client.get(ctx, "nginx", &info) if err != nil { return nil, fmt.Errorf("failed to get info: %w", err) } return &info, nil } -// GetCaches returns Cache stats. -func (client *NginxClient) GetCaches() (*Caches, error) { +// GetCaches returns Cache stats with a context. +func (client *NginxClient) GetCaches(ctx context.Context) (*Caches, error) { var caches Caches - err := client.get("http/caches", &caches) + err := client.get(ctx, "http/caches", &caches) if err != nil { return nil, fmt.Errorf("failed to get caches: %w", err) } return &caches, nil } -// GetSlabs returns Slabs stats. -func (client *NginxClient) GetSlabs() (*Slabs, error) { +// GetSlabs returns Slabs stats with a context. +func (client *NginxClient) GetSlabs(ctx context.Context) (*Slabs, error) { var slabs Slabs - err := client.get("slabs", &slabs) + err := client.get(ctx, "slabs", &slabs) if err != nil { return nil, fmt.Errorf("failed to get slabs: %w", err) } return &slabs, nil } -// GetConnections returns Connections stats. -func (client *NginxClient) GetConnections() (*Connections, error) { +// GetConnections returns Connections stats with a context. +func (client *NginxClient) GetConnections(ctx context.Context) (*Connections, error) { var cons Connections - err := client.get("connections", &cons) + err := client.get(ctx, "connections", &cons) if err != nil { return nil, fmt.Errorf("failed to get connections: %w", err) } return &cons, nil } -// GetHTTPRequests returns http/requests stats. -func (client *NginxClient) GetHTTPRequests() (*HTTPRequests, error) { +// GetHTTPRequests returns http/requests stats with a context. +func (client *NginxClient) GetHTTPRequests(ctx context.Context) (*HTTPRequests, error) { var requests HTTPRequests - err := client.get("http/requests", &requests) + err := client.get(ctx, "http/requests", &requests) if err != nil { return nil, fmt.Errorf("failed to get http requests: %w", err) } return &requests, nil } -// GetSSL returns SSL stats. -func (client *NginxClient) GetSSL() (*SSL, error) { +// GetSSL returns SSL stats with a context. +func (client *NginxClient) GetSSL(ctx context.Context) (*SSL, error) { var ssl SSL - err := client.get("ssl", &ssl) + err := client.get(ctx, "ssl", &ssl) if err != nil { return nil, fmt.Errorf("failed to get ssl: %w", err) } return &ssl, nil } -// GetServerZones returns http/server_zones stats. -func (client *NginxClient) GetServerZones() (*ServerZones, error) { +// GetServerZones returns http/server_zones stats with a context. +func (client *NginxClient) GetServerZones(ctx context.Context) (*ServerZones, error) { var zones ServerZones - err := client.get("http/server_zones", &zones) + err := client.get(ctx, "http/server_zones", &zones) if err != nil { return nil, fmt.Errorf("failed to get server zones: %w", err) } return &zones, err } -// GetStreamServerZones returns stream/server_zones stats. -func (client *NginxClient) GetStreamServerZones() (*StreamServerZones, error) { +// GetStreamServerZones returns stream/server_zones stats with a context. +func (client *NginxClient) GetStreamServerZones(ctx context.Context) (*StreamServerZones, error) { var zones StreamServerZones - err := client.get("stream/server_zones", &zones) + err := client.get(ctx, "stream/server_zones", &zones) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1406,20 +1629,20 @@ func (client *NginxClient) GetStreamServerZones() (*StreamServerZones, error) { return &zones, err } -// GetUpstreams returns http/upstreams stats. -func (client *NginxClient) GetUpstreams() (*Upstreams, error) { +// GetUpstreams returns http/upstreams stats with a context. +func (client *NginxClient) GetUpstreams(ctx context.Context) (*Upstreams, error) { var upstreams Upstreams - err := client.get("http/upstreams", &upstreams) + err := client.get(ctx, "http/upstreams", &upstreams) if err != nil { return nil, fmt.Errorf("failed to get upstreams: %w", err) } return &upstreams, nil } -// GetStreamUpstreams returns stream/upstreams stats. -func (client *NginxClient) GetStreamUpstreams() (*StreamUpstreams, error) { +// GetStreamUpstreams returns stream/upstreams stats with a context. +func (client *NginxClient) GetStreamUpstreams(ctx context.Context) (*StreamUpstreams, error) { var upstreams StreamUpstreams - err := client.get("stream/upstreams", &upstreams) + err := client.get(ctx, "stream/upstreams", &upstreams) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1432,10 +1655,10 @@ func (client *NginxClient) GetStreamUpstreams() (*StreamUpstreams, error) { return &upstreams, nil } -// GetStreamZoneSync returns stream/zone_sync stats. -func (client *NginxClient) GetStreamZoneSync() (*StreamZoneSync, error) { +// GetStreamZoneSync returns stream/zone_sync stats with a context. +func (client *NginxClient) GetStreamZoneSync(ctx context.Context) (*StreamZoneSync, error) { var streamZoneSync StreamZoneSync - err := client.get("stream/zone_sync", &streamZoneSync) + err := client.get(ctx, "stream/zone_sync", &streamZoneSync) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1449,13 +1672,13 @@ func (client *NginxClient) GetStreamZoneSync() (*StreamZoneSync, error) { return &streamZoneSync, err } -// GetLocationZones returns http/location_zones stats. -func (client *NginxClient) GetLocationZones() (*LocationZones, error) { +// GetLocationZones returns http/location_zones stats with a context. +func (client *NginxClient) GetLocationZones(ctx context.Context) (*LocationZones, error) { var locationZones LocationZones if client.apiVersion < 5 { return &locationZones, nil } - err := client.get("http/location_zones", &locationZones) + err := client.get(ctx, "http/location_zones", &locationZones) if err != nil { return nil, fmt.Errorf("failed to get location zones: %w", err) } @@ -1463,13 +1686,13 @@ func (client *NginxClient) GetLocationZones() (*LocationZones, error) { return &locationZones, err } -// GetResolvers returns Resolvers stats. -func (client *NginxClient) GetResolvers() (*Resolvers, error) { +// GetResolvers returns Resolvers stats with a context. +func (client *NginxClient) GetResolvers(ctx context.Context) (*Resolvers, error) { var resolvers Resolvers if client.apiVersion < 5 { return &resolvers, nil } - err := client.get("resolvers", &resolvers) + err := client.get(ctx, "resolvers", &resolvers) if err != nil { return nil, fmt.Errorf("failed to get resolvers: %w", err) } @@ -1477,10 +1700,10 @@ func (client *NginxClient) GetResolvers() (*Resolvers, error) { return &resolvers, err } -// GetProcesses returns Processes stats. -func (client *NginxClient) GetProcesses() (*Processes, error) { +// GetProcesses returns Processes stats with a context. +func (client *NginxClient) GetProcesses(ctx context.Context) (*Processes, error) { var processes Processes - err := client.get("processes", &processes) + err := client.get(ctx, "processes", &processes) if err != nil { return nil, fmt.Errorf("failed to get processes: %w", err) } @@ -1495,27 +1718,27 @@ type KeyValPairs map[string]string type KeyValPairsByZone map[string]KeyValPairs // GetKeyValPairs fetches key/value pairs for a given HTTP zone. -func (client *NginxClient) GetKeyValPairs(zone string) (KeyValPairs, error) { - return client.getKeyValPairs(zone, httpContext) +func (client *NginxClient) GetKeyValPairs(ctx context.Context, zone string) (KeyValPairs, error) { + return client.getKeyValPairs(ctx, zone, httpContext) } // GetStreamKeyValPairs fetches key/value pairs for a given Stream zone. -func (client *NginxClient) GetStreamKeyValPairs(zone string) (KeyValPairs, error) { - return client.getKeyValPairs(zone, streamContext) +func (client *NginxClient) GetStreamKeyValPairs(ctx context.Context, zone string) (KeyValPairs, error) { + return client.getKeyValPairs(ctx, zone, streamContext) } -func (client *NginxClient) getKeyValPairs(zone string, stream bool) (KeyValPairs, error) { +func (client *NginxClient) getKeyValPairs(ctx context.Context, zone string, stream bool) (KeyValPairs, error) { base := "http" if stream { base = "stream" } if zone == "" { - return nil, errors.New("zone required") + return nil, fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) var keyValPairs KeyValPairs - err := client.get(path, &keyValPairs) + err := client.get(ctx, path, &keyValPairs) if err != nil { return nil, fmt.Errorf("failed to get keyvals for %v/%v zone: %w", base, zone, err) } @@ -1523,16 +1746,16 @@ func (client *NginxClient) getKeyValPairs(zone string, stream bool) (KeyValPairs } // GetAllKeyValPairs fetches all key/value pairs for all HTTP zones. -func (client *NginxClient) GetAllKeyValPairs() (KeyValPairsByZone, error) { - return client.getAllKeyValPairs(httpContext) +func (client *NginxClient) GetAllKeyValPairs(ctx context.Context) (KeyValPairsByZone, error) { + return client.getAllKeyValPairs(ctx, httpContext) } // GetAllStreamKeyValPairs fetches all key/value pairs for all Stream zones. -func (client *NginxClient) GetAllStreamKeyValPairs() (KeyValPairsByZone, error) { - return client.getAllKeyValPairs(streamContext) +func (client *NginxClient) GetAllStreamKeyValPairs(ctx context.Context) (KeyValPairsByZone, error) { + return client.getAllKeyValPairs(ctx, streamContext) } -func (client *NginxClient) getAllKeyValPairs(stream bool) (KeyValPairsByZone, error) { +func (client *NginxClient) getAllKeyValPairs(ctx context.Context, stream bool) (KeyValPairsByZone, error) { base := "http" if stream { base = "stream" @@ -1540,7 +1763,7 @@ func (client *NginxClient) getAllKeyValPairs(stream bool) (KeyValPairsByZone, er path := fmt.Sprintf("%v/keyvals", base) var keyValPairsByZone KeyValPairsByZone - err := client.get(path, &keyValPairsByZone) + err := client.get(ctx, path, &keyValPairsByZone) if err != nil { return nil, fmt.Errorf("failed to get keyvals for all %v zones: %w", base, err) } @@ -1548,27 +1771,27 @@ func (client *NginxClient) getAllKeyValPairs(stream bool) (KeyValPairsByZone, er } // AddKeyValPair adds a new key/value pair to a given HTTP zone. -func (client *NginxClient) AddKeyValPair(zone string, key string, val string) error { - return client.addKeyValPair(zone, key, val, httpContext) +func (client *NginxClient) AddKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.addKeyValPair(ctx, zone, key, val, httpContext) } // AddStreamKeyValPair adds a new key/value pair to a given Stream zone. -func (client *NginxClient) AddStreamKeyValPair(zone string, key string, val string) error { - return client.addKeyValPair(zone, key, val, streamContext) +func (client *NginxClient) AddStreamKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.addKeyValPair(ctx, zone, key, val, streamContext) } -func (client *NginxClient) addKeyValPair(zone string, key string, val string, stream bool) error { +func (client *NginxClient) addKeyValPair(ctx context.Context, zone string, key string, val string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) input := KeyValPairs{key: val} - err := client.post(path, &input) + err := client.post(ctx, path, &input) if err != nil { return fmt.Errorf("failed to add key value pair for %v/%v zone: %w", base, zone, err) } @@ -1576,27 +1799,27 @@ func (client *NginxClient) addKeyValPair(zone string, key string, val string, st } // ModifyKeyValPair modifies the value of an existing key in a given HTTP zone. -func (client *NginxClient) ModifyKeyValPair(zone string, key string, val string) error { - return client.modifyKeyValPair(zone, key, val, httpContext) +func (client *NginxClient) ModifyKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.modifyKeyValPair(ctx, zone, key, val, httpContext) } // ModifyStreamKeyValPair modifies the value of an existing key in a given Stream zone. -func (client *NginxClient) ModifyStreamKeyValPair(zone string, key string, val string) error { - return client.modifyKeyValPair(zone, key, val, streamContext) +func (client *NginxClient) ModifyStreamKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.modifyKeyValPair(ctx, zone, key, val, streamContext) } -func (client *NginxClient) modifyKeyValPair(zone string, key string, val string, stream bool) error { +func (client *NginxClient) modifyKeyValPair(ctx context.Context, zone string, key string, val string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) input := KeyValPairs{key: val} - err := client.patch(path, &input, http.StatusNoContent) + err := client.patch(ctx, path, &input, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to update key value pair for %v/%v zone: %w", base, zone, err) } @@ -1604,24 +1827,24 @@ func (client *NginxClient) modifyKeyValPair(zone string, key string, val string, } // DeleteKeyValuePair deletes the key/value pair for a key in a given HTTP zone. -func (client *NginxClient) DeleteKeyValuePair(zone string, key string) error { - return client.deleteKeyValuePair(zone, key, httpContext) +func (client *NginxClient) DeleteKeyValuePair(ctx context.Context, zone string, key string) error { + return client.deleteKeyValuePair(ctx, zone, key, httpContext) } // DeleteStreamKeyValuePair deletes the key/value pair for a key in a given Stream zone. -func (client *NginxClient) DeleteStreamKeyValuePair(zone string, key string) error { - return client.deleteKeyValuePair(zone, key, streamContext) +func (client *NginxClient) DeleteStreamKeyValuePair(ctx context.Context, zone string, key string) error { + return client.deleteKeyValuePair(ctx, zone, key, streamContext) } // To delete a key/value pair you set the value to null via the API, // then NGINX+ will delete the key. -func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bool) error { +func (client *NginxClient) deleteKeyValuePair(ctx context.Context, zone string, key string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } // map[string]string can't have a nil value so we use a different type here. @@ -1629,7 +1852,7 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo keyval[key] = nil path := fmt.Sprintf("%v/keyvals/%v", base, zone) - err := client.patch(path, &keyval, http.StatusNoContent) + err := client.patch(ctx, path, &keyval, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to remove key values pair for %v/%v zone: %w", base, zone, err) } @@ -1637,26 +1860,26 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo } // DeleteKeyValPairs deletes all the key-value pairs in a given HTTP zone. -func (client *NginxClient) DeleteKeyValPairs(zone string) error { - return client.deleteKeyValPairs(zone, httpContext) +func (client *NginxClient) DeleteKeyValPairs(ctx context.Context, zone string) error { + return client.deleteKeyValPairs(ctx, zone, httpContext) } // DeleteStreamKeyValPairs deletes all the key-value pairs in a given Stream zone. -func (client *NginxClient) DeleteStreamKeyValPairs(zone string) error { - return client.deleteKeyValPairs(zone, streamContext) +func (client *NginxClient) DeleteStreamKeyValPairs(ctx context.Context, zone string) error { + return client.deleteKeyValPairs(ctx, zone, streamContext) } -func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error { +func (client *NginxClient) deleteKeyValPairs(ctx context.Context, zone string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) - err := client.delete(path, http.StatusNoContent) + err := client.delete(ctx, path, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to remove all key value pairs for %v/%v zone: %w", base, zone, err) } @@ -1664,10 +1887,10 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error { } // UpdateHTTPServer updates the server of the upstream. -func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error { +func (client *NginxClient) UpdateHTTPServer(ctx context.Context, upstream string, server UpstreamServer) error { path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID) server.ID = 0 - err := client.patch(path, &server, http.StatusOK) + err := client.patch(ctx, path, &server, http.StatusOK) if err != nil { return fmt.Errorf("failed to update %v server to %v upstream: %w", server.Server, upstream, err) } @@ -1676,10 +1899,10 @@ func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServ } // UpdateStreamServer updates the stream server of the upstream. -func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error { +func (client *NginxClient) UpdateStreamServer(ctx context.Context, upstream string, server StreamUpstreamServer) error { path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID) server.ID = 0 - err := client.patch(path, &server, http.StatusOK) + err := client.patch(ctx, path, &server, http.StatusOK) if err != nil { return fmt.Errorf("failed to update %v stream server to %v upstream: %w", server.Server, upstream, err) } @@ -1708,39 +1931,39 @@ func addPortToServer(server string) string { return fmt.Sprintf("%v:%v", server, defaultServerPort) } -// GetHTTPLimitReqs returns http/limit_reqs stats. -func (client *NginxClient) GetHTTPLimitReqs() (*HTTPLimitRequests, error) { +// GetHTTPLimitReqs returns http/limit_reqs stats with a context. +func (client *NginxClient) GetHTTPLimitReqs(ctx context.Context) (*HTTPLimitRequests, error) { var limitReqs HTTPLimitRequests if client.apiVersion < 6 { return &limitReqs, nil } - err := client.get("http/limit_reqs", &limitReqs) + err := client.get(ctx, "http/limit_reqs", &limitReqs) if err != nil { return nil, fmt.Errorf("failed to get http limit requests: %w", err) } return &limitReqs, nil } -// GetHTTPConnectionsLimit returns http/limit_conns stats. -func (client *NginxClient) GetHTTPConnectionsLimit() (*HTTPLimitConnections, error) { +// GetHTTPConnectionsLimit returns http/limit_conns stats with a context. +func (client *NginxClient) GetHTTPConnectionsLimit(ctx context.Context) (*HTTPLimitConnections, error) { var limitConns HTTPLimitConnections if client.apiVersion < 6 { return &limitConns, nil } - err := client.get("http/limit_conns", &limitConns) + err := client.get(ctx, "http/limit_conns", &limitConns) if err != nil { return nil, fmt.Errorf("failed to get http connections limit: %w", err) } return &limitConns, nil } -// GetStreamConnectionsLimit returns stream/limit_conns stats. -func (client *NginxClient) GetStreamConnectionsLimit() (*StreamLimitConnections, error) { +// GetStreamConnectionsLimit returns stream/limit_conns stats with a context. +func (client *NginxClient) GetStreamConnectionsLimit(ctx context.Context) (*StreamLimitConnections, error) { var limitConns StreamLimitConnections if client.apiVersion < 6 { return &limitConns, nil } - err := client.get("stream/limit_conns", &limitConns) + err := client.get(ctx, "stream/limit_conns", &limitConns) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1754,12 +1977,12 @@ func (client *NginxClient) GetStreamConnectionsLimit() (*StreamLimitConnections, } // GetWorkers returns workers stats. -func (client *NginxClient) GetWorkers() ([]*Workers, error) { +func (client *NginxClient) GetWorkers(ctx context.Context) ([]*Workers, error) { var workers []*Workers if client.apiVersion < 9 { return workers, nil } - err := client.get("workers", &workers) + err := client.get(ctx, "workers", &workers) if err != nil { return nil, fmt.Errorf("failed to get workers: %w", err) } diff --git a/test/performance/vendor/modules.txt b/test/performance/vendor/modules.txt index 900513cfb..aa7c8def2 100644 --- a/test/performance/vendor/modules.txt +++ b/test/performance/vendor/modules.txt @@ -182,9 +182,9 @@ github.com/nginx/agent/v2/test/utils/system # github.com/nginxinc/nginx-go-crossplane v0.4.48 ## explicit; go 1.19 github.com/nginxinc/nginx-go-crossplane -# github.com/nginxinc/nginx-plus-go-client v1.2.2 -## explicit; go 1.21.2 -github.com/nginxinc/nginx-plus-go-client/client +# github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1 +## explicit; go 1.22.6 +github.com/nginxinc/nginx-plus-go-client/v2/client # github.com/nginxinc/nginx-prometheus-exporter v1.2.0 ## explicit; go 1.21.3 github.com/nginxinc/nginx-prometheus-exporter/client diff --git a/vendor/github.com/nginxinc/nginx-plus-go-client/LICENSE b/vendor/github.com/nginxinc/nginx-plus-go-client/v2/LICENSE similarity index 100% rename from vendor/github.com/nginxinc/nginx-plus-go-client/LICENSE rename to vendor/github.com/nginxinc/nginx-plus-go-client/v2/LICENSE diff --git a/vendor/github.com/nginxinc/nginx-plus-go-client/client/nginx.go b/vendor/github.com/nginxinc/nginx-plus-go-client/v2/client/nginx.go similarity index 65% rename from vendor/github.com/nginxinc/nginx-plus-go-client/client/nginx.go rename to vendor/github.com/nginxinc/nginx-plus-go-client/v2/client/nginx.go index 87503ecae..7d16420ac 100644 --- a/vendor/github.com/nginxinc/nginx-plus-go-client/client/nginx.go +++ b/vendor/github.com/nginxinc/nginx-plus-go-client/v2/client/nginx.go @@ -11,7 +11,10 @@ import ( "reflect" "slices" "strings" + "sync" "time" + + "golang.org/x/sync/errgroup" ) const ( @@ -37,8 +40,13 @@ var ( defaultWeight = 1 ) -// ErrUnsupportedVer means that client's API version is not supported by NGINX plus API. -var ErrUnsupportedVer = errors.New("API version of the client is not supported by running NGINX Plus") +var ( + ErrParameterRequired = errors.New("parameter is required") + ErrServerNotFound = errors.New("server not found") + ErrServerExists = errors.New("server already exists") + ErrNotSupported = errors.New("not supported") + ErrInvalidTimeout = errors.New("invalid timeout") +) // NginxClient lets you access NGINX Plus API. type NginxClient struct { @@ -116,6 +124,40 @@ func (internalError *internalError) Wrap(err string) *internalError { return internalError } +// this is an internal representation of the Stats object including endpoint and streamEndpoint lists. +type extendedStats struct { + endpoints []string + streamEndpoints []string + Stats +} + +func defaultStats() *extendedStats { + return &extendedStats{ + endpoints: []string{}, + streamEndpoints: []string{}, + Stats: Stats{ + Upstreams: map[string]Upstream{}, + ServerZones: map[string]ServerZone{}, + StreamServerZones: map[string]StreamServerZone{}, + StreamUpstreams: map[string]StreamUpstream{}, + Slabs: map[string]Slab{}, + Caches: map[string]HTTPCache{}, + HTTPLimitConnections: map[string]LimitConnection{}, + StreamLimitConnections: map[string]LimitConnection{}, + HTTPLimitRequests: map[string]HTTPLimitRequest{}, + Resolvers: map[string]Resolver{}, + LocationZones: map[string]LocationZone{}, + StreamZoneSync: nil, + Workers: []*Workers{}, + NginxInfo: NginxInfo{}, + SSL: SSL{}, + Connections: Connections{}, + HTTPRequests: HTTPRequests{}, + Processes: Processes{}, + }, + } +} + // Stats represents NGINX Plus stats fetched from the NGINX Plus API. // https://nginx.org/en/docs/http/ngx_http_api_module.html type Stats struct { @@ -546,6 +588,19 @@ func WithCheckAPI() Option { } } +// WithMaxAPIVersion sets the API version to the max API version. +func WithMaxAPIVersion() Option { + return func(o *NginxClient) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + version, err := o.GetMaxAPIVersion(ctx) + if err != nil { + return + } + o.apiVersion = version + } +} + // NewNginxClient creates a new NginxClient. func NewNginxClient(apiEndpoint string, opts ...Option) (*NginxClient, error) { c := &NginxClient{ @@ -560,15 +615,17 @@ func NewNginxClient(apiEndpoint string, opts ...Option) (*NginxClient, error) { } if c.httpClient == nil { - return nil, errors.New("http client is not set") + return nil, fmt.Errorf("http client: %w", ErrParameterRequired) } if !versionSupported(c.apiVersion) { - return nil, fmt.Errorf("API version %v is not supported by the client", c.apiVersion) + return nil, fmt.Errorf("API version %v: %w by the client", c.apiVersion, ErrNotSupported) } if c.checkAPI { - versions, err := getAPIVersions(c.httpClient, apiEndpoint) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + versions, err := c.getAPIVersions(ctx, c.httpClient, apiEndpoint) if err != nil { return nil, fmt.Errorf("error accessing the API: %w", err) } @@ -580,7 +637,7 @@ func NewNginxClient(apiEndpoint string, opts ...Option) (*NginxClient, error) { } } if !found { - return nil, fmt.Errorf("API version %v is not supported by the server", c.apiVersion) + return nil, fmt.Errorf("API version %v: %w by the server", c.apiVersion, ErrNotSupported) } } @@ -596,10 +653,24 @@ func versionSupported(n int) bool { return false } -func getAPIVersions(httpClient *http.Client, endpoint string) (*versions, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() +// GetMaxAPIVersion returns the maximum API version supported by the server and the client. +func (client *NginxClient) GetMaxAPIVersion(ctx context.Context) (int, error) { + serverVersions, err := client.getAPIVersions(ctx, client.httpClient, client.apiEndpoint) + if err != nil { + return 0, fmt.Errorf("failed to get max API version: %w", err) + } + + maxServerVersion := slices.Max(*serverVersions) + maxClientVersion := slices.Max(supportedAPIVersions) + + if maxServerVersion > maxClientVersion { + return maxClientVersion, nil + } + + return maxServerVersion, nil +} +func (client *NginxClient) getAPIVersions(ctx context.Context, httpClient *http.Client, endpoint string) (*versions, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if err != nil { return nil, fmt.Errorf("failed to create a get request: %w", err) @@ -611,7 +682,9 @@ func getAPIVersions(httpClient *http.Client, endpoint string) (*versions, error) defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode) + return nil, createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf( + "failed to get endpoint %q, expected %v response, got %v", + endpoint, http.StatusOK, resp.StatusCode)) } body, err := io.ReadAll(resp.Body) @@ -658,17 +731,17 @@ func readAPIErrorResponse(respBody io.ReadCloser) (*apiErrorResponse, error) { } // CheckIfUpstreamExists checks if the upstream exists in NGINX. If the upstream doesn't exist, it returns the error. -func (client *NginxClient) CheckIfUpstreamExists(upstream string) error { - _, err := client.GetHTTPServers(upstream) +func (client *NginxClient) CheckIfUpstreamExists(ctx context.Context, upstream string) error { + _, err := client.GetHTTPServers(ctx, upstream) return err } // GetHTTPServers returns the servers of the upstream from NGINX. -func (client *NginxClient) GetHTTPServers(upstream string) ([]UpstreamServer, error) { +func (client *NginxClient) GetHTTPServers(ctx context.Context, upstream string) ([]UpstreamServer, error) { path := fmt.Sprintf("http/upstreams/%v/servers", upstream) var servers []UpstreamServer - err := client.get(path, &servers) + err := client.get(ctx, path, &servers) if err != nil { return nil, fmt.Errorf("failed to get the HTTP servers of upstream %v: %w", upstream, err) } @@ -677,17 +750,17 @@ func (client *NginxClient) GetHTTPServers(upstream string) ([]UpstreamServer, er } // AddHTTPServer adds the server to the upstream. -func (client *NginxClient) AddHTTPServer(upstream string, server UpstreamServer) error { - id, err := client.getIDOfHTTPServer(upstream, server.Server) +func (client *NginxClient) AddHTTPServer(ctx context.Context, upstream string, server UpstreamServer) error { + id, err := client.getIDOfHTTPServer(ctx, upstream, server.Server) if err != nil { return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, err) } if id != -1 { - return fmt.Errorf("failed to add %v server to %v upstream: server already exists", server.Server, upstream) + return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, ErrServerExists) } path := fmt.Sprintf("http/upstreams/%v/servers/", upstream) - err = client.post(path, &server) + err = client.post(ctx, path, &server) if err != nil { return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, err) } @@ -696,17 +769,17 @@ func (client *NginxClient) AddHTTPServer(upstream string, server UpstreamServer) } // DeleteHTTPServer the server from the upstream. -func (client *NginxClient) DeleteHTTPServer(upstream string, server string) error { - id, err := client.getIDOfHTTPServer(upstream, server) +func (client *NginxClient) DeleteHTTPServer(ctx context.Context, upstream string, server string) error { + id, err := client.getIDOfHTTPServer(ctx, upstream, server) if err != nil { return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, err) } if id == -1 { - return fmt.Errorf("failed to remove %v server from %v upstream: server doesn't exist", server, upstream) + return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, ErrServerNotFound) } path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, id) - err = client.delete(path, http.StatusOK) + err = client.delete(ctx, path, http.StatusOK) if err != nil { return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, err) } @@ -718,8 +791,8 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. // Servers that are in the slice and exist in NGINX, but have different parameters, will be updated. -func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) { - serversInNginx, err := client.GetHTTPServers(upstream) +func (client *NginxClient) UpdateHTTPServers(ctx context.Context, upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) { + serversInNginx, err := client.GetHTTPServers(ctx, upstream) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } @@ -734,21 +807,21 @@ func (client *NginxClient) UpdateHTTPServers(upstream string, servers []Upstream toAdd, toDelete, toUpdate := determineUpdates(formattedServers, serversInNginx) for _, server := range toAdd { - err := client.AddHTTPServer(upstream, server) + err := client.AddHTTPServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } } for _, server := range toDelete { - err := client.DeleteHTTPServer(upstream, server.Server) + err := client.DeleteHTTPServer(ctx, upstream, server.Server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } } for _, server := range toUpdate { - err := client.UpdateHTTPServer(upstream, server) + err := client.UpdateHTTPServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %w", upstream, err) } @@ -836,8 +909,8 @@ func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamSe return } -func (client *NginxClient) getIDOfHTTPServer(upstream string, name string) (int, error) { - servers, err := client.GetHTTPServers(upstream) +func (client *NginxClient) getIDOfHTTPServer(ctx context.Context, upstream string, name string) (int, error) { + servers, err := client.GetHTTPServers(ctx, upstream) if err != nil { return -1, fmt.Errorf("error getting id of server %v of upstream %v: %w", name, upstream, err) } @@ -851,10 +924,7 @@ func (client *NginxClient) getIDOfHTTPServer(upstream string, name string) (int, return -1, nil } -func (client *NginxClient) get(path string, data interface{}) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) get(ctx context.Context, path string, data interface{}) error { url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, client.apiVersion, path) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) @@ -885,10 +955,7 @@ func (client *NginxClient) get(path string, data interface{}) error { return nil } -func (client *NginxClient) post(path string, input interface{}) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) post(ctx context.Context, path string, input interface{}) error { url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, client.apiVersion, path) jsonInput, err := json.Marshal(input) @@ -917,10 +984,7 @@ func (client *NginxClient) post(path string, input interface{}) error { return nil } -func (client *NginxClient) delete(path string, expectedStatusCode int) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) delete(ctx context.Context, path string, expectedStatusCode int) error { path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, client.apiVersion, path) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, path, nil) @@ -942,10 +1006,7 @@ func (client *NginxClient) delete(path string, expectedStatusCode int) error { return nil } -func (client *NginxClient) patch(path string, input interface{}, expectedStatusCode int) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - +func (client *NginxClient) patch(ctx context.Context, path string, input interface{}, expectedStatusCode int) error { path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, client.apiVersion, path) jsonInput, err := json.Marshal(input) @@ -973,17 +1034,17 @@ func (client *NginxClient) patch(path string, input interface{}, expectedStatusC } // CheckIfStreamUpstreamExists checks if the stream upstream exists in NGINX. If the upstream doesn't exist, it returns the error. -func (client *NginxClient) CheckIfStreamUpstreamExists(upstream string) error { - _, err := client.GetStreamServers(upstream) +func (client *NginxClient) CheckIfStreamUpstreamExists(ctx context.Context, upstream string) error { + _, err := client.GetStreamServers(ctx, upstream) return err } // GetStreamServers returns the stream servers of the upstream from NGINX. -func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamServer, error) { +func (client *NginxClient) GetStreamServers(ctx context.Context, upstream string) ([]StreamUpstreamServer, error) { path := fmt.Sprintf("stream/upstreams/%v/servers", upstream) var servers []StreamUpstreamServer - err := client.get(path, &servers) + err := client.get(ctx, path, &servers) if err != nil { return nil, fmt.Errorf("failed to get stream servers of upstream server %v: %w", upstream, err) } @@ -991,17 +1052,17 @@ func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamSe } // AddStreamServer adds the stream server to the upstream. -func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstreamServer) error { - id, err := client.getIDOfStreamServer(upstream, server.Server) +func (client *NginxClient) AddStreamServer(ctx context.Context, upstream string, server StreamUpstreamServer) error { + id, err := client.getIDOfStreamServer(ctx, upstream, server.Server) if err != nil { return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, err) } if id != -1 { - return fmt.Errorf("failed to add %v stream server to %v upstream: server already exists", server.Server, upstream) + return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, ErrServerExists) } path := fmt.Sprintf("stream/upstreams/%v/servers/", upstream) - err = client.post(path, &server) + err = client.post(ctx, path, &server) if err != nil { return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, err) } @@ -1009,17 +1070,17 @@ func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstrea } // DeleteStreamServer the server from the upstream. -func (client *NginxClient) DeleteStreamServer(upstream string, server string) error { - id, err := client.getIDOfStreamServer(upstream, server) +func (client *NginxClient) DeleteStreamServer(ctx context.Context, upstream string, server string) error { + id, err := client.getIDOfStreamServer(ctx, upstream, server) if err != nil { return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, err) } if id == -1 { - return fmt.Errorf("failed to remove %v stream server from %v upstream: server doesn't exist", server, upstream) + return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, ErrServerNotFound) } path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, id) - err = client.delete(path, http.StatusOK) + err = client.delete(ctx, path, http.StatusOK) if err != nil { return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, err) } @@ -1030,8 +1091,8 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. // Servers that are in the slice and exist in NGINX, but have different parameters, will be updated. -func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) { - serversInNginx, err := client.GetStreamServers(upstream) +func (client *NginxClient) UpdateStreamServers(ctx context.Context, upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) { + serversInNginx, err := client.GetStreamServers(ctx, upstream) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } @@ -1045,21 +1106,21 @@ func (client *NginxClient) UpdateStreamServers(upstream string, servers []Stream toAdd, toDelete, toUpdate := determineStreamUpdates(formattedServers, serversInNginx) for _, server := range toAdd { - err := client.AddStreamServer(upstream, server) + err := client.AddStreamServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } } for _, server := range toDelete { - err := client.DeleteStreamServer(upstream, server.Server) + err := client.DeleteStreamServer(ctx, upstream, server.Server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } } for _, server := range toUpdate { - err := client.UpdateStreamServer(upstream, server) + err := client.UpdateStreamServer(ctx, upstream, server) if err != nil { return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %w", upstream, err) } @@ -1068,8 +1129,8 @@ func (client *NginxClient) UpdateStreamServers(upstream string, servers []Stream return toAdd, toDelete, toUpdate, nil } -func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) { - servers, err := client.GetStreamServers(upstream) +func (client *NginxClient) getIDOfStreamServer(ctx context.Context, upstream string, name string) (int, error) { + servers, err := client.GetStreamServers(ctx, upstream) if err != nil { return -1, fmt.Errorf("error getting id of stream server %v of upstream %v: %w", name, upstream, err) } @@ -1162,238 +1223,400 @@ func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers } // GetStats gets process, slab, connection, request, ssl, zone, stream zone, upstream and stream upstream related stats from the NGINX Plus API. -func (client *NginxClient) GetStats() (*Stats, error) { - endpoints, err := client.GetAvailableEndpoints() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } +func (client *NginxClient) GetStats(ctx context.Context) (*Stats, error) { + initialGroup, initialCtx := errgroup.WithContext(ctx) + var mu sync.Mutex + stats := defaultStats() + // Collecting initial stats + initialGroup.Go(func() error { + endpoints, err := client.GetAvailableEndpoints(initialCtx) + if err != nil { + return fmt.Errorf("failed to get available Endpoints: %w", err) + } - info, err := client.GetNginxInfo() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.endpoints = endpoints + mu.Unlock() + return nil + }) - caches, err := client.GetCaches() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + nginxInfo, err := client.GetNginxInfo(initialCtx) + if err != nil { + return fmt.Errorf("failed to get NGINX info: %w", err) + } - processes, err := client.GetProcesses() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.NginxInfo = *nginxInfo + mu.Unlock() - slabs, err := client.GetSlabs() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - cons, err := client.GetConnections() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + caches, err := client.GetCaches(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Caches: %w", err) + } - requests, err := client.GetHTTPRequests() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Caches = *caches + mu.Unlock() - ssl, err := client.GetSSL() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - zones, err := client.GetServerZones() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + processes, err := client.GetProcesses(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Process information: %w", err) + } - upstreams, err := client.GetUpstreams() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Processes = *processes + mu.Unlock() - locationZones, err := client.GetLocationZones() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - resolvers, err := client.GetResolvers() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + slabs, err := client.GetSlabs(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Slabs: %w", err) + } - limitReqs, err := client.GetHTTPLimitReqs() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Slabs = *slabs + mu.Unlock() - limitConnsHTTP, err := client.GetHTTPConnectionsLimit() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + return nil + }) - workers, err := client.GetWorkers() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + initialGroup.Go(func() error { + httpRequests, err := client.GetHTTPRequests(initialCtx) + if err != nil { + return fmt.Errorf("failed to get HTTP Requests: %w", err) + } + + mu.Lock() + stats.HTTPRequests = *httpRequests + mu.Unlock() - streamZones := &StreamServerZones{} - streamUpstreams := &StreamUpstreams{} - limitConnsStream := &StreamLimitConnections{} - var streamZoneSync *StreamZoneSync + return nil + }) - if slices.Contains(endpoints, "stream") { - streamEndpoints, err := client.GetAvailableStreamEndpoints() + initialGroup.Go(func() error { + ssl, err := client.GetSSL(initialCtx) if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) + return fmt.Errorf("failed to get SSL: %w", err) } - if slices.Contains(streamEndpoints, "server_zones") { - streamZones, err = client.GetStreamServerZones() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.SSL = *ssl + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + serverZones, err := client.GetServerZones(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Server Zones: %w", err) } - if slices.Contains(streamEndpoints, "upstreams") { - streamUpstreams, err = client.GetStreamUpstreams() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.ServerZones = *serverZones + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + upstreams, err := client.GetUpstreams(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Upstreams: %w", err) } - if slices.Contains(streamEndpoints, "limit_conns") { - limitConnsStream, err = client.GetStreamConnectionsLimit() - if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) - } + mu.Lock() + stats.Upstreams = *upstreams + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + locationZones, err := client.GetLocationZones(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Location Zones: %w", err) + } + + mu.Lock() + stats.LocationZones = *locationZones + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + resolvers, err := client.GetResolvers(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Resolvers: %w", err) } - if slices.Contains(streamEndpoints, "zone_sync") { - streamZoneSync, err = client.GetStreamZoneSync() + mu.Lock() + stats.Resolvers = *resolvers + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + httpLimitRequests, err := client.GetHTTPLimitReqs(initialCtx) + if err != nil { + return fmt.Errorf("failed to get HTTPLimitRequests: %w", err) + } + + mu.Lock() + stats.HTTPLimitRequests = *httpLimitRequests + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + httpLimitConnections, err := client.GetHTTPConnectionsLimit(initialCtx) + if err != nil { + return fmt.Errorf("failed to get HTTPLimitConnections: %w", err) + } + + mu.Lock() + stats.HTTPLimitConnections = *httpLimitConnections + mu.Unlock() + + return nil + }) + + initialGroup.Go(func() error { + workers, err := client.GetWorkers(initialCtx) + if err != nil { + return fmt.Errorf("failed to get Workers: %w", err) + } + + mu.Lock() + stats.Workers = workers + mu.Unlock() + + return nil + }) + + if err := initialGroup.Wait(); err != nil { + return nil, fmt.Errorf("error returned from contacting Plus API: %w", err) + } + + // Process stream endpoints if they exist + if slices.Contains(stats.endpoints, "stream") { + availableStreamGroup, asgCtx := errgroup.WithContext(ctx) + + availableStreamGroup.Go(func() error { + streamEndpoints, err := client.GetAvailableStreamEndpoints(asgCtx) if err != nil { - return nil, fmt.Errorf("failed to get stats: %w", err) + return fmt.Errorf("failed to get available Stream Endpoints: %w", err) } + + mu.Lock() + stats.streamEndpoints = streamEndpoints + mu.Unlock() + + return nil + }) + + if err := availableStreamGroup.Wait(); err != nil { + return nil, fmt.Errorf("no useful metrics found in stream stats: %w", err) } + + streamGroup, sgCtx := errgroup.WithContext(ctx) + + if slices.Contains(stats.streamEndpoints, "server_zones") { + streamGroup.Go(func() error { + streamServerZones, err := client.GetStreamServerZones(sgCtx) + if err != nil { + return fmt.Errorf("failed to get streamServerZones: %w", err) + } + + mu.Lock() + stats.StreamServerZones = *streamServerZones + mu.Unlock() + + return nil + }) + } + + if slices.Contains(stats.streamEndpoints, "upstreams") { + streamGroup.Go(func() error { + streamUpstreams, err := client.GetStreamUpstreams(sgCtx) + if err != nil { + return fmt.Errorf("failed to get StreamUpstreams: %w", err) + } + + mu.Lock() + stats.StreamUpstreams = *streamUpstreams + mu.Unlock() + + return nil + }) + } + + if slices.Contains(stats.streamEndpoints, "limit_conns") { + streamGroup.Go(func() error { + streamConnectionsLimit, err := client.GetStreamConnectionsLimit(sgCtx) + if err != nil { + return fmt.Errorf("failed to get StreamLimitConnections: %w", err) + } + + mu.Lock() + stats.StreamLimitConnections = *streamConnectionsLimit + mu.Unlock() + + return nil + }) + + streamGroup.Go(func() error { + streamZoneSync, err := client.GetStreamZoneSync(sgCtx) + if err != nil { + return fmt.Errorf("failed to get StreamZoneSync: %w", err) + } + + mu.Lock() + stats.StreamZoneSync = streamZoneSync + mu.Unlock() + + return nil + }) + } + + if err := streamGroup.Wait(); err != nil { + return nil, fmt.Errorf("no useful metrics found in stream stats: %w", err) + } + } + + // Report connection metrics separately so it does not influence the results + connectionsGroup, cgCtx := errgroup.WithContext(ctx) + + connectionsGroup.Go(func() error { + // replace this call with a context specific call + connections, err := client.GetConnections(cgCtx) + if err != nil { + return fmt.Errorf("failed to get connections: %w", err) + } + + mu.Lock() + stats.Connections = *connections + mu.Unlock() + + return nil + }) + + if err := connectionsGroup.Wait(); err != nil { + return nil, fmt.Errorf("connections metrics not found: %w", err) } - return &Stats{ - NginxInfo: *info, - Caches: *caches, - Processes: *processes, - Slabs: *slabs, - Connections: *cons, - HTTPRequests: *requests, - SSL: *ssl, - ServerZones: *zones, - StreamServerZones: *streamZones, - Upstreams: *upstreams, - StreamUpstreams: *streamUpstreams, - StreamZoneSync: streamZoneSync, - LocationZones: *locationZones, - Resolvers: *resolvers, - HTTPLimitRequests: *limitReqs, - HTTPLimitConnections: *limitConnsHTTP, - StreamLimitConnections: *limitConnsStream, - Workers: workers, - }, nil + return &stats.Stats, nil } // GetAvailableEndpoints returns available endpoints in the API. -func (client *NginxClient) GetAvailableEndpoints() ([]string, error) { +func (client *NginxClient) GetAvailableEndpoints(ctx context.Context) ([]string, error) { var endpoints []string - err := client.get("", &endpoints) + err := client.get(ctx, "", &endpoints) if err != nil { return nil, fmt.Errorf("failed to get endpoints: %w", err) } return endpoints, nil } -// GetAvailableStreamEndpoints returns available stream endpoints in the API. -func (client *NginxClient) GetAvailableStreamEndpoints() ([]string, error) { +// GetAvailableStreamEndpoints returns available stream endpoints in the API with a context. +func (client *NginxClient) GetAvailableStreamEndpoints(ctx context.Context) ([]string, error) { var endpoints []string - err := client.get("stream", &endpoints) + err := client.get(ctx, "stream", &endpoints) if err != nil { return nil, fmt.Errorf("failed to get endpoints: %w", err) } return endpoints, nil } -// GetNginxInfo returns Nginx stats. -func (client *NginxClient) GetNginxInfo() (*NginxInfo, error) { +// GetNginxInfo returns Nginx stats with a context. +func (client *NginxClient) GetNginxInfo(ctx context.Context) (*NginxInfo, error) { var info NginxInfo - err := client.get("nginx", &info) + err := client.get(ctx, "nginx", &info) if err != nil { return nil, fmt.Errorf("failed to get info: %w", err) } return &info, nil } -// GetCaches returns Cache stats. -func (client *NginxClient) GetCaches() (*Caches, error) { +// GetCaches returns Cache stats with a context. +func (client *NginxClient) GetCaches(ctx context.Context) (*Caches, error) { var caches Caches - err := client.get("http/caches", &caches) + err := client.get(ctx, "http/caches", &caches) if err != nil { return nil, fmt.Errorf("failed to get caches: %w", err) } return &caches, nil } -// GetSlabs returns Slabs stats. -func (client *NginxClient) GetSlabs() (*Slabs, error) { +// GetSlabs returns Slabs stats with a context. +func (client *NginxClient) GetSlabs(ctx context.Context) (*Slabs, error) { var slabs Slabs - err := client.get("slabs", &slabs) + err := client.get(ctx, "slabs", &slabs) if err != nil { return nil, fmt.Errorf("failed to get slabs: %w", err) } return &slabs, nil } -// GetConnections returns Connections stats. -func (client *NginxClient) GetConnections() (*Connections, error) { +// GetConnections returns Connections stats with a context. +func (client *NginxClient) GetConnections(ctx context.Context) (*Connections, error) { var cons Connections - err := client.get("connections", &cons) + err := client.get(ctx, "connections", &cons) if err != nil { return nil, fmt.Errorf("failed to get connections: %w", err) } return &cons, nil } -// GetHTTPRequests returns http/requests stats. -func (client *NginxClient) GetHTTPRequests() (*HTTPRequests, error) { +// GetHTTPRequests returns http/requests stats with a context. +func (client *NginxClient) GetHTTPRequests(ctx context.Context) (*HTTPRequests, error) { var requests HTTPRequests - err := client.get("http/requests", &requests) + err := client.get(ctx, "http/requests", &requests) if err != nil { return nil, fmt.Errorf("failed to get http requests: %w", err) } return &requests, nil } -// GetSSL returns SSL stats. -func (client *NginxClient) GetSSL() (*SSL, error) { +// GetSSL returns SSL stats with a context. +func (client *NginxClient) GetSSL(ctx context.Context) (*SSL, error) { var ssl SSL - err := client.get("ssl", &ssl) + err := client.get(ctx, "ssl", &ssl) if err != nil { return nil, fmt.Errorf("failed to get ssl: %w", err) } return &ssl, nil } -// GetServerZones returns http/server_zones stats. -func (client *NginxClient) GetServerZones() (*ServerZones, error) { +// GetServerZones returns http/server_zones stats with a context. +func (client *NginxClient) GetServerZones(ctx context.Context) (*ServerZones, error) { var zones ServerZones - err := client.get("http/server_zones", &zones) + err := client.get(ctx, "http/server_zones", &zones) if err != nil { return nil, fmt.Errorf("failed to get server zones: %w", err) } return &zones, err } -// GetStreamServerZones returns stream/server_zones stats. -func (client *NginxClient) GetStreamServerZones() (*StreamServerZones, error) { +// GetStreamServerZones returns stream/server_zones stats with a context. +func (client *NginxClient) GetStreamServerZones(ctx context.Context) (*StreamServerZones, error) { var zones StreamServerZones - err := client.get("stream/server_zones", &zones) + err := client.get(ctx, "stream/server_zones", &zones) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1406,20 +1629,20 @@ func (client *NginxClient) GetStreamServerZones() (*StreamServerZones, error) { return &zones, err } -// GetUpstreams returns http/upstreams stats. -func (client *NginxClient) GetUpstreams() (*Upstreams, error) { +// GetUpstreams returns http/upstreams stats with a context. +func (client *NginxClient) GetUpstreams(ctx context.Context) (*Upstreams, error) { var upstreams Upstreams - err := client.get("http/upstreams", &upstreams) + err := client.get(ctx, "http/upstreams", &upstreams) if err != nil { return nil, fmt.Errorf("failed to get upstreams: %w", err) } return &upstreams, nil } -// GetStreamUpstreams returns stream/upstreams stats. -func (client *NginxClient) GetStreamUpstreams() (*StreamUpstreams, error) { +// GetStreamUpstreams returns stream/upstreams stats with a context. +func (client *NginxClient) GetStreamUpstreams(ctx context.Context) (*StreamUpstreams, error) { var upstreams StreamUpstreams - err := client.get("stream/upstreams", &upstreams) + err := client.get(ctx, "stream/upstreams", &upstreams) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1432,10 +1655,10 @@ func (client *NginxClient) GetStreamUpstreams() (*StreamUpstreams, error) { return &upstreams, nil } -// GetStreamZoneSync returns stream/zone_sync stats. -func (client *NginxClient) GetStreamZoneSync() (*StreamZoneSync, error) { +// GetStreamZoneSync returns stream/zone_sync stats with a context. +func (client *NginxClient) GetStreamZoneSync(ctx context.Context) (*StreamZoneSync, error) { var streamZoneSync StreamZoneSync - err := client.get("stream/zone_sync", &streamZoneSync) + err := client.get(ctx, "stream/zone_sync", &streamZoneSync) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1449,13 +1672,13 @@ func (client *NginxClient) GetStreamZoneSync() (*StreamZoneSync, error) { return &streamZoneSync, err } -// GetLocationZones returns http/location_zones stats. -func (client *NginxClient) GetLocationZones() (*LocationZones, error) { +// GetLocationZones returns http/location_zones stats with a context. +func (client *NginxClient) GetLocationZones(ctx context.Context) (*LocationZones, error) { var locationZones LocationZones if client.apiVersion < 5 { return &locationZones, nil } - err := client.get("http/location_zones", &locationZones) + err := client.get(ctx, "http/location_zones", &locationZones) if err != nil { return nil, fmt.Errorf("failed to get location zones: %w", err) } @@ -1463,13 +1686,13 @@ func (client *NginxClient) GetLocationZones() (*LocationZones, error) { return &locationZones, err } -// GetResolvers returns Resolvers stats. -func (client *NginxClient) GetResolvers() (*Resolvers, error) { +// GetResolvers returns Resolvers stats with a context. +func (client *NginxClient) GetResolvers(ctx context.Context) (*Resolvers, error) { var resolvers Resolvers if client.apiVersion < 5 { return &resolvers, nil } - err := client.get("resolvers", &resolvers) + err := client.get(ctx, "resolvers", &resolvers) if err != nil { return nil, fmt.Errorf("failed to get resolvers: %w", err) } @@ -1477,10 +1700,10 @@ func (client *NginxClient) GetResolvers() (*Resolvers, error) { return &resolvers, err } -// GetProcesses returns Processes stats. -func (client *NginxClient) GetProcesses() (*Processes, error) { +// GetProcesses returns Processes stats with a context. +func (client *NginxClient) GetProcesses(ctx context.Context) (*Processes, error) { var processes Processes - err := client.get("processes", &processes) + err := client.get(ctx, "processes", &processes) if err != nil { return nil, fmt.Errorf("failed to get processes: %w", err) } @@ -1495,27 +1718,27 @@ type KeyValPairs map[string]string type KeyValPairsByZone map[string]KeyValPairs // GetKeyValPairs fetches key/value pairs for a given HTTP zone. -func (client *NginxClient) GetKeyValPairs(zone string) (KeyValPairs, error) { - return client.getKeyValPairs(zone, httpContext) +func (client *NginxClient) GetKeyValPairs(ctx context.Context, zone string) (KeyValPairs, error) { + return client.getKeyValPairs(ctx, zone, httpContext) } // GetStreamKeyValPairs fetches key/value pairs for a given Stream zone. -func (client *NginxClient) GetStreamKeyValPairs(zone string) (KeyValPairs, error) { - return client.getKeyValPairs(zone, streamContext) +func (client *NginxClient) GetStreamKeyValPairs(ctx context.Context, zone string) (KeyValPairs, error) { + return client.getKeyValPairs(ctx, zone, streamContext) } -func (client *NginxClient) getKeyValPairs(zone string, stream bool) (KeyValPairs, error) { +func (client *NginxClient) getKeyValPairs(ctx context.Context, zone string, stream bool) (KeyValPairs, error) { base := "http" if stream { base = "stream" } if zone == "" { - return nil, errors.New("zone required") + return nil, fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) var keyValPairs KeyValPairs - err := client.get(path, &keyValPairs) + err := client.get(ctx, path, &keyValPairs) if err != nil { return nil, fmt.Errorf("failed to get keyvals for %v/%v zone: %w", base, zone, err) } @@ -1523,16 +1746,16 @@ func (client *NginxClient) getKeyValPairs(zone string, stream bool) (KeyValPairs } // GetAllKeyValPairs fetches all key/value pairs for all HTTP zones. -func (client *NginxClient) GetAllKeyValPairs() (KeyValPairsByZone, error) { - return client.getAllKeyValPairs(httpContext) +func (client *NginxClient) GetAllKeyValPairs(ctx context.Context) (KeyValPairsByZone, error) { + return client.getAllKeyValPairs(ctx, httpContext) } // GetAllStreamKeyValPairs fetches all key/value pairs for all Stream zones. -func (client *NginxClient) GetAllStreamKeyValPairs() (KeyValPairsByZone, error) { - return client.getAllKeyValPairs(streamContext) +func (client *NginxClient) GetAllStreamKeyValPairs(ctx context.Context) (KeyValPairsByZone, error) { + return client.getAllKeyValPairs(ctx, streamContext) } -func (client *NginxClient) getAllKeyValPairs(stream bool) (KeyValPairsByZone, error) { +func (client *NginxClient) getAllKeyValPairs(ctx context.Context, stream bool) (KeyValPairsByZone, error) { base := "http" if stream { base = "stream" @@ -1540,7 +1763,7 @@ func (client *NginxClient) getAllKeyValPairs(stream bool) (KeyValPairsByZone, er path := fmt.Sprintf("%v/keyvals", base) var keyValPairsByZone KeyValPairsByZone - err := client.get(path, &keyValPairsByZone) + err := client.get(ctx, path, &keyValPairsByZone) if err != nil { return nil, fmt.Errorf("failed to get keyvals for all %v zones: %w", base, err) } @@ -1548,27 +1771,27 @@ func (client *NginxClient) getAllKeyValPairs(stream bool) (KeyValPairsByZone, er } // AddKeyValPair adds a new key/value pair to a given HTTP zone. -func (client *NginxClient) AddKeyValPair(zone string, key string, val string) error { - return client.addKeyValPair(zone, key, val, httpContext) +func (client *NginxClient) AddKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.addKeyValPair(ctx, zone, key, val, httpContext) } // AddStreamKeyValPair adds a new key/value pair to a given Stream zone. -func (client *NginxClient) AddStreamKeyValPair(zone string, key string, val string) error { - return client.addKeyValPair(zone, key, val, streamContext) +func (client *NginxClient) AddStreamKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.addKeyValPair(ctx, zone, key, val, streamContext) } -func (client *NginxClient) addKeyValPair(zone string, key string, val string, stream bool) error { +func (client *NginxClient) addKeyValPair(ctx context.Context, zone string, key string, val string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) input := KeyValPairs{key: val} - err := client.post(path, &input) + err := client.post(ctx, path, &input) if err != nil { return fmt.Errorf("failed to add key value pair for %v/%v zone: %w", base, zone, err) } @@ -1576,27 +1799,27 @@ func (client *NginxClient) addKeyValPair(zone string, key string, val string, st } // ModifyKeyValPair modifies the value of an existing key in a given HTTP zone. -func (client *NginxClient) ModifyKeyValPair(zone string, key string, val string) error { - return client.modifyKeyValPair(zone, key, val, httpContext) +func (client *NginxClient) ModifyKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.modifyKeyValPair(ctx, zone, key, val, httpContext) } // ModifyStreamKeyValPair modifies the value of an existing key in a given Stream zone. -func (client *NginxClient) ModifyStreamKeyValPair(zone string, key string, val string) error { - return client.modifyKeyValPair(zone, key, val, streamContext) +func (client *NginxClient) ModifyStreamKeyValPair(ctx context.Context, zone string, key string, val string) error { + return client.modifyKeyValPair(ctx, zone, key, val, streamContext) } -func (client *NginxClient) modifyKeyValPair(zone string, key string, val string, stream bool) error { +func (client *NginxClient) modifyKeyValPair(ctx context.Context, zone string, key string, val string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) input := KeyValPairs{key: val} - err := client.patch(path, &input, http.StatusNoContent) + err := client.patch(ctx, path, &input, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to update key value pair for %v/%v zone: %w", base, zone, err) } @@ -1604,24 +1827,24 @@ func (client *NginxClient) modifyKeyValPair(zone string, key string, val string, } // DeleteKeyValuePair deletes the key/value pair for a key in a given HTTP zone. -func (client *NginxClient) DeleteKeyValuePair(zone string, key string) error { - return client.deleteKeyValuePair(zone, key, httpContext) +func (client *NginxClient) DeleteKeyValuePair(ctx context.Context, zone string, key string) error { + return client.deleteKeyValuePair(ctx, zone, key, httpContext) } // DeleteStreamKeyValuePair deletes the key/value pair for a key in a given Stream zone. -func (client *NginxClient) DeleteStreamKeyValuePair(zone string, key string) error { - return client.deleteKeyValuePair(zone, key, streamContext) +func (client *NginxClient) DeleteStreamKeyValuePair(ctx context.Context, zone string, key string) error { + return client.deleteKeyValuePair(ctx, zone, key, streamContext) } // To delete a key/value pair you set the value to null via the API, // then NGINX+ will delete the key. -func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bool) error { +func (client *NginxClient) deleteKeyValuePair(ctx context.Context, zone string, key string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } // map[string]string can't have a nil value so we use a different type here. @@ -1629,7 +1852,7 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo keyval[key] = nil path := fmt.Sprintf("%v/keyvals/%v", base, zone) - err := client.patch(path, &keyval, http.StatusNoContent) + err := client.patch(ctx, path, &keyval, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to remove key values pair for %v/%v zone: %w", base, zone, err) } @@ -1637,26 +1860,26 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo } // DeleteKeyValPairs deletes all the key-value pairs in a given HTTP zone. -func (client *NginxClient) DeleteKeyValPairs(zone string) error { - return client.deleteKeyValPairs(zone, httpContext) +func (client *NginxClient) DeleteKeyValPairs(ctx context.Context, zone string) error { + return client.deleteKeyValPairs(ctx, zone, httpContext) } // DeleteStreamKeyValPairs deletes all the key-value pairs in a given Stream zone. -func (client *NginxClient) DeleteStreamKeyValPairs(zone string) error { - return client.deleteKeyValPairs(zone, streamContext) +func (client *NginxClient) DeleteStreamKeyValPairs(ctx context.Context, zone string) error { + return client.deleteKeyValPairs(ctx, zone, streamContext) } -func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error { +func (client *NginxClient) deleteKeyValPairs(ctx context.Context, zone string, stream bool) error { base := "http" if stream { base = "stream" } if zone == "" { - return errors.New("zone required") + return fmt.Errorf("zone: %w", ErrParameterRequired) } path := fmt.Sprintf("%v/keyvals/%v", base, zone) - err := client.delete(path, http.StatusNoContent) + err := client.delete(ctx, path, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to remove all key value pairs for %v/%v zone: %w", base, zone, err) } @@ -1664,10 +1887,10 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error { } // UpdateHTTPServer updates the server of the upstream. -func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error { +func (client *NginxClient) UpdateHTTPServer(ctx context.Context, upstream string, server UpstreamServer) error { path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID) server.ID = 0 - err := client.patch(path, &server, http.StatusOK) + err := client.patch(ctx, path, &server, http.StatusOK) if err != nil { return fmt.Errorf("failed to update %v server to %v upstream: %w", server.Server, upstream, err) } @@ -1676,10 +1899,10 @@ func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServ } // UpdateStreamServer updates the stream server of the upstream. -func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error { +func (client *NginxClient) UpdateStreamServer(ctx context.Context, upstream string, server StreamUpstreamServer) error { path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID) server.ID = 0 - err := client.patch(path, &server, http.StatusOK) + err := client.patch(ctx, path, &server, http.StatusOK) if err != nil { return fmt.Errorf("failed to update %v stream server to %v upstream: %w", server.Server, upstream, err) } @@ -1708,39 +1931,39 @@ func addPortToServer(server string) string { return fmt.Sprintf("%v:%v", server, defaultServerPort) } -// GetHTTPLimitReqs returns http/limit_reqs stats. -func (client *NginxClient) GetHTTPLimitReqs() (*HTTPLimitRequests, error) { +// GetHTTPLimitReqs returns http/limit_reqs stats with a context. +func (client *NginxClient) GetHTTPLimitReqs(ctx context.Context) (*HTTPLimitRequests, error) { var limitReqs HTTPLimitRequests if client.apiVersion < 6 { return &limitReqs, nil } - err := client.get("http/limit_reqs", &limitReqs) + err := client.get(ctx, "http/limit_reqs", &limitReqs) if err != nil { return nil, fmt.Errorf("failed to get http limit requests: %w", err) } return &limitReqs, nil } -// GetHTTPConnectionsLimit returns http/limit_conns stats. -func (client *NginxClient) GetHTTPConnectionsLimit() (*HTTPLimitConnections, error) { +// GetHTTPConnectionsLimit returns http/limit_conns stats with a context. +func (client *NginxClient) GetHTTPConnectionsLimit(ctx context.Context) (*HTTPLimitConnections, error) { var limitConns HTTPLimitConnections if client.apiVersion < 6 { return &limitConns, nil } - err := client.get("http/limit_conns", &limitConns) + err := client.get(ctx, "http/limit_conns", &limitConns) if err != nil { return nil, fmt.Errorf("failed to get http connections limit: %w", err) } return &limitConns, nil } -// GetStreamConnectionsLimit returns stream/limit_conns stats. -func (client *NginxClient) GetStreamConnectionsLimit() (*StreamLimitConnections, error) { +// GetStreamConnectionsLimit returns stream/limit_conns stats with a context. +func (client *NginxClient) GetStreamConnectionsLimit(ctx context.Context) (*StreamLimitConnections, error) { var limitConns StreamLimitConnections if client.apiVersion < 6 { return &limitConns, nil } - err := client.get("stream/limit_conns", &limitConns) + err := client.get(ctx, "stream/limit_conns", &limitConns) if err != nil { var ie *internalError if errors.As(err, &ie) { @@ -1754,12 +1977,12 @@ func (client *NginxClient) GetStreamConnectionsLimit() (*StreamLimitConnections, } // GetWorkers returns workers stats. -func (client *NginxClient) GetWorkers() ([]*Workers, error) { +func (client *NginxClient) GetWorkers(ctx context.Context) ([]*Workers, error) { var workers []*Workers if client.apiVersion < 9 { return workers, nil } - err := client.get("workers", &workers) + err := client.get(ctx, "workers", &workers) if err != nil { return nil, fmt.Errorf("failed to get workers: %w", err) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3ccf11596..50b41ee8d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -110,9 +110,9 @@ github.com/nginx/agent/sdk/v2/zip # github.com/nginxinc/nginx-go-crossplane v0.4.48 ## explicit; go 1.19 github.com/nginxinc/nginx-go-crossplane -# github.com/nginxinc/nginx-plus-go-client v1.2.2 -## explicit; go 1.21.2 -github.com/nginxinc/nginx-plus-go-client/client +# github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1 +## explicit; go 1.22.6 +github.com/nginxinc/nginx-plus-go-client/v2/client # github.com/nginxinc/nginx-prometheus-exporter v1.2.0 ## explicit; go 1.21.3 github.com/nginxinc/nginx-prometheus-exporter/client