Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase timeout period for collecting metrics #755

Merged
merged 19 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 357 additions & 3 deletions src/core/metrics/sources/nginx_plus.go

Large diffs are not rendered by default.

151 changes: 139 additions & 12 deletions src/core/metrics/sources/nginx_plus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ const (
currentPeer2UpstreamHeaderTime = 80
currentPeer1UpstreamResponseTime = 100
currentPeer2UpstreamResponseTime = 80
currentUpstreamResponseTime = 100
currentUpstreamConnectTime = 80
currentUpstreamFirstByteTime = 50
previousUpstreamHeaderTime = 98
previousUpstreamResponseTime = 98
Expand Down Expand Up @@ -97,14 +95,28 @@ const (
workerProcessID = 12345
)

type FakeNginxPlus struct {
*NginxPlus
}

// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api
func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()
stats := plusclient.Stats{
var (
availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"}
stats = plusclient.Stats{
StreamZoneSync: &plusclient.StreamZoneSync{
Zones: map[string]plusclient.SyncZone{
"0": {
RecordsPending: 1,
RecordsTotal: 2,
},
"1": {
RecordsPending: 3,
RecordsTotal: 4,
},
},
Status: plusclient.StreamZoneSyncStatus{
BytesIn: 1,
MsgsIn: 2,
MsgsOut: 3,
BytesOut: 4,
NodesOnline: 5,
},
},
HTTPRequests: plusclient.HTTPRequests{
Total: currentHTTPRequestTotal,
Current: currentHTTPRequestCurrent,
Expand Down Expand Up @@ -387,8 +399,7 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<
},
},
}

prevStats := plusclient.Stats{
prevStats = plusclient.Stats{
HTTPRequests: plusclient.HTTPRequests{
Total: previousHTTPRequestTotal,
Current: previousHTTPRequestCurrent,
Expand Down Expand Up @@ -547,6 +558,15 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<
},
},
}
)

type FakeNginxPlus struct {
*NginxPlus
}

// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api
func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()

f.baseDimensions.NginxType = "plus"
f.baseDimensions.PublishedAPI = f.plusAPI
Expand All @@ -556,6 +576,113 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<
f.sendMetrics(ctx, m, f.collectMetrics(&stats, &prevStats)...)
}

var _ Client = (*MockClient)(nil)

type MockClient struct{}

func (m *MockClient) GetAvailableEndpoints() ([]string, error) {
return []string{"stream"}, nil
}

func (m *MockClient) GetAvailableStreamEndpoints() ([]string, error) {
return availableZones, nil
}

func (m *MockClient) GetStreamServerZones() (*plusclient.StreamServerZones, error) {
return &stats.StreamServerZones, nil
}

func (m *MockClient) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) {
return &stats.StreamUpstreams, nil
}

func (m *MockClient) GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) {
return &stats.StreamLimitConnections, nil
}

func (m *MockClient) GetStreamZoneSync() (*plusclient.StreamZoneSync, error) {
return &plusclient.StreamZoneSync{
Zones: stats.StreamZoneSync.Zones,
Status: stats.StreamZoneSync.Status,
}, nil
}

func (m *MockClient) GetNginxInfo() (*plusclient.NginxInfo, error) {
return &stats.NginxInfo, nil
}

func (m *MockClient) GetCaches() (*plusclient.Caches, error) {
return &stats.Caches, nil
}

func (m *MockClient) GetProcesses() (*plusclient.Processes, error) {
return &stats.Processes, nil
}

func (m *MockClient) GetSlabs() (*plusclient.Slabs, error) {
return &stats.Slabs, nil
}

func (m *MockClient) GetConnections() (*plusclient.Connections, error) {
return &stats.Connections, nil
}

func (m *MockClient) GetHTTPRequests() (*plusclient.HTTPRequests, error) {
return &stats.HTTPRequests, nil
}

func (m *MockClient) GetSSL() (*plusclient.SSL, error) {
return &stats.SSL, nil
}

func (m *MockClient) GetServerZones() (*plusclient.ServerZones, error) {
return &stats.ServerZones, nil
}

func (m *MockClient) GetUpstreams() (*plusclient.Upstreams, error) {
return &stats.Upstreams, nil
}

func (m *MockClient) GetLocationZones() (*plusclient.LocationZones, error) {
return &stats.LocationZones, nil
}

func (m *MockClient) GetResolvers() (*plusclient.Resolvers, error) {
return &stats.Resolvers, nil
}

func (m *MockClient) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) {
return &stats.HTTPLimitRequests, nil
}

func (m *MockClient) GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) {
return &stats.HTTPLimitConnections, nil
}

func (m *MockClient) GetWorkers() ([]*plusclient.Workers, error) {
return stats.Workers, nil
}

func TestGetStats(t *testing.T) {
client := &MockClient{}

source := NewNginxPlus(nil, "", "", "", 9)

tests := []struct {
stats plusclient.Stats
}{
{
stats: stats,
},
}

for _, test := range tests {
resultStats, err := source.getStats(client)
require.NoError(t, err)
assert.Equal(t, test.stats, *resultStats)
}
}

func TestNginxPlusUpdate(t *testing.T) {
nginxPlus := NewNginxPlus(&metrics.CommonDim{}, "test", PlusNamespace, "http://localhost:8080/api", 6)

Expand Down
2 changes: 1 addition & 1 deletion src/core/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro
if urlsLength == 0 || nginxStatus == "" {
stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives)
if err != nil {
log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err)
log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err)
}

nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives)
Expand Down
8 changes: 5 additions & 3 deletions src/plugins/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ func (m *Metrics) metricsGoroutine() {
}

func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) {
// setups a collect duration of half-time of the poll interval
ctx, cancel := context.WithTimeout(m.ctx, m.interval/2)
// set a timeout for a millisecond less than the collection interval
ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1*time.Millisecond))
defer cancel()
// locks the m.collectors to make sure it doesn't get deleted in the middle
// of collection, as we will delete the old one if config changes.
Expand All @@ -232,7 +232,9 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) {
// drain the buf, since our sources/collectors are all done, we can rely on buffer length
select {
case <-ctx.Done():
err := m.ctx.Err()
log.Debugf("context done in %s collectStats", time.Since(start))

err := ctx.Err()
if err != nil {
log.Errorf("error in done context collectStats %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions src/plugins/metrics_throlling.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W
return
case <-r.ticker.C:
reports := r.getAggregatedReports()
log.Debugf("metricsThrottle: metricsReportGoroutine, got %d reports to send", len(reports))
if len(reports) > 0 {
r.messagePipeline.Process(core.NewMessage(core.CommMetrics, reports))
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading