Skip to content

Commit

Permalink
Create dedicated cache and upstream metrics reports (#189)
Browse files Browse the repository at this point in the history
* Create dedicated cache and upstream metrics reports

Split Nginx Plus MetricsReport into three groups of reports of the following types: CACHE_ZONE, UPSTREAMS and SYSTEM.
A CACHE_ZONE report is created for each distinct value of the dimension "cache_zone".
An UPSTREAMS report is created for each distinct value of the dimension "upstream".
A SYSTEM report contains all other metrics.
  • Loading branch information
karlsassenberg authored Mar 27, 2023
1 parent d5bc91f commit 01d2002
Show file tree
Hide file tree
Showing 96 changed files with 944 additions and 693 deletions.
2 changes: 2 additions & 0 deletions docs/proto/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,8 @@ Metric type enum
| SYSTEM | 0 | System metric type |
| INSTANCE | 1 | NGINX instance metric type |
| AGENT | 2 | Agent metric type |
| CACHE_ZONE | 3 | Cache zone metric type |
| UPSTREAMS | 4 | Upstreams metric type |



Expand Down
2 changes: 1 addition & 1 deletion sdk/client/metric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error {
return r.handleGrpcError("Metric Reporter Channel Send", err)
}

log.Tracef("MetricReporter sent metrics report %v", report)
log.Tracef("MetricReporter sent metrics report [Type: %d] %+v", report.Type, report)

return nil
})
Expand Down
76 changes: 43 additions & 33 deletions sdk/proto/metrics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions sdk/proto/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ message MetricsReport {
INSTANCE = 1;
// Agent metric type
AGENT = 2;
// Cache zone metric type
CACHE_ZONE = 3;
// Upstreams metric type
UPSTREAMS = 4;
}
// Provides meta information about the metrics
Metadata meta = 1 [(gogoproto.jsontag) = "meta"];
Expand Down
2 changes: 1 addition & 1 deletion src/core/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
BulkSize: 20,
ReportInterval: 1 * time.Minute,
CollectionInterval: 15 * time.Second,
Mode: "aggregation",
Mode: "aggregated",
},
Features: agent_config.GetDefaultFeatures(),
}
Expand Down
10 changes: 3 additions & 7 deletions src/core/metrics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func dimChecksum(stats *proto.StatsEntity) string {
// SaveCollections loops through one or more reports and get all the raw metrics for the Collections
// Note this function operate on the Collections struct data directly.
func SaveCollections(metricsCollections Collections, reports ...*proto.MetricsReport) Collections {
// could be multiple reports
// could be multiple reports, however they must all be of the same type.
for _, report := range reports {
metricsCollections.Count++
for _, stats := range report.GetData() {
Expand All @@ -65,7 +65,7 @@ func SaveCollections(metricsCollections Collections, reports ...*proto.MetricsRe
return metricsCollections
}

func GenerateMetricsReport(metricsCollections Collections) *proto.MetricsReport {
func GenerateMetrics(metricsCollections Collections) []*proto.StatsEntity {

results := make([]*proto.StatsEntity, 0, 200)

Expand All @@ -77,11 +77,7 @@ func GenerateMetricsReport(metricsCollections Collections) *proto.MetricsReport
))
}

return &proto.MetricsReport{
Meta: &proto.Metadata{},
Type: 0,
Data: results,
}
return results
}

func getAggregatedSimpleMetric(count int, internalMap map[string]float64) (simpleMetrics []*proto.SimpleMetric) {
Expand Down
8 changes: 4 additions & 4 deletions src/core/metrics/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ func TestGenerateAggregationReport(t *testing.T) {
metricsCollections.Data[csum].RunningSumMap["system.cpu.system"] = 200.2
metricsCollections.Data[csum].RunningSumMap["system.undefined_method"] = 1000

report := GenerateMetricsReport(metricsCollections)
log.Info(report)
results := GenerateMetrics(metricsCollections)
log.Info(results)

assert.NotNil(t, report)
for _, stats := range report.GetData() {
assert.NotEmpty(t, results)
for _, stats := range results {
simplemetrics := stats.GetSimplemetrics()
for _, v := range simplemetrics {
switch {
Expand Down
9 changes: 4 additions & 5 deletions src/core/metrics/collectors/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

log "github.com/sirupsen/logrus"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/metrics"
Expand All @@ -27,7 +26,7 @@ var (

type ContainerCollector struct {
sources []metrics.Source
buf chan *proto.StatsEntity
buf chan *metrics.StatsEntityWrapper
dim *metrics.CommonDim
env core.Environment
}
Expand All @@ -42,7 +41,7 @@ func NewContainerCollector(env core.Environment, conf *config.Config) *Container

return &ContainerCollector{
sources: containerSources,
buf: make(chan *proto.StatsEntity, 65535),
buf: make(chan *metrics.StatsEntityWrapper, 65535),
dim: metrics.NewCommonDim(env.NewHostInfo("agentVersion", &conf.Tags, conf.ConfigDirs, false), conf, ""),
env: env,
}
Expand All @@ -59,7 +58,7 @@ func (c *ContainerCollector) collectMetrics(ctx context.Context) {
wg.Wait()
}

func (c *ContainerCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *proto.StatsEntity) {
func (c *ContainerCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()
c.collectMetrics(ctx)

Expand All @@ -69,7 +68,7 @@ func (c *ContainerCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m
case <-ctx.Done():
return
case sample := <-c.buf:
sample.Dimensions = append(commonDims, sample.Dimensions...)
sample.Data.Dimensions = append(commonDims, sample.Data.Dimensions...)

select {
case <-ctx.Done():
Expand Down
8 changes: 4 additions & 4 deletions src/core/metrics/collectors/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ func TestContainerCollector_Collect(t *testing.T) {
mockSource1,
mockSource2,
},
buf: make(chan *proto.StatsEntity),
buf: make(chan *metrics.StatsEntityWrapper),
dim: &metrics.CommonDim{},
}

ctx := context.TODO()
wg := &sync.WaitGroup{}
wg.Add(1)

channel := make(chan *proto.StatsEntity)
channel := make(chan *metrics.StatsEntityWrapper)
go containerCollector.Collect(ctx, wg, channel)

containerCollector.buf <- &proto.StatsEntity{Dimensions: []*proto.Dimension{{Name: "new_dim", Value: "123"}}}
containerCollector.buf <- &metrics.StatsEntityWrapper{Type: proto.MetricsReport_SYSTEM, Data: &proto.StatsEntity{Dimensions: []*proto.Dimension{{Name: "new_dim", Value: "123"}}}}
actual := <-channel

mockSource1.AssertExpectations(t)
Expand All @@ -83,7 +83,7 @@ func TestContainerCollector_Collect(t *testing.T) {
{Name: "nginx_id", Value: ""},
{Name: "new_dim", Value: "123"},
}
assert.Equal(t, expectedDimensions, actual.Dimensions)
assert.Equal(t, expectedDimensions, actual.Data.Dimensions)
}

func TestContainerCollector_UpdateConfig(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions src/core/metrics/collectors/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"context"
"sync"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/metrics"
Expand All @@ -25,7 +24,7 @@ var (

type NginxCollector struct {
sources []metrics.NginxSource
buf chan *proto.StatsEntity
buf chan *metrics.StatsEntityWrapper
dimensions *metrics.CommonDim
collectorConf *metrics.NginxCollectorConfig
env core.Environment
Expand All @@ -40,7 +39,7 @@ func NewNginxCollector(conf *config.Config, env core.Environment, collectorConf

return &NginxCollector{
sources: buildSources(dimensions, binary, collectorConf),
buf: make(chan *proto.StatsEntity, 65535),
buf: make(chan *metrics.StatsEntityWrapper, 65535),
dimensions: dimensions,
collectorConf: collectorConf,
env: env,
Expand Down Expand Up @@ -81,7 +80,7 @@ func (c *NginxCollector) collectMetrics(ctx context.Context) {
wg.Wait()
}

func (c *NginxCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *proto.StatsEntity) {
func (c *NginxCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()
c.collectMetrics(ctx)
for {
Expand Down
2 changes: 1 addition & 1 deletion src/core/metrics/collectors/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestNginxCollector_Collect(t *testing.T) {
ctx := context.TODO()
wg := &sync.WaitGroup{}
wg.Add(1)
go nginxCollector.Collect(ctx, wg, make(chan<- *proto.StatsEntity))
go nginxCollector.Collect(ctx, wg, make(chan<- *metrics.StatsEntityWrapper))

time.Sleep(10 * time.Millisecond)
mockNginxSource1.AssertExpectations(t)
Expand Down
5 changes: 2 additions & 3 deletions src/core/metrics/collectors/source_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"context"
"sync"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core/metrics"
"github.com/stretchr/testify/mock"
)
Expand All @@ -26,7 +25,7 @@ type NginxSourceMock struct {
mock.Mock
}

func (m *NginxSourceMock) Collect(ctx context.Context, wg *sync.WaitGroup, statsChannel chan<- *proto.StatsEntity) {
func (m *NginxSourceMock) Collect(ctx context.Context, wg *sync.WaitGroup, statsChannel chan<- *metrics.StatsEntityWrapper) {
m.Called(ctx, wg, statsChannel)
wg.Done()
}
Expand All @@ -43,7 +42,7 @@ type SourceMock struct {
mock.Mock
}

func (m *SourceMock) Collect(ctx context.Context, wg *sync.WaitGroup, statsChannel chan<- *proto.StatsEntity) {
func (m *SourceMock) Collect(ctx context.Context, wg *sync.WaitGroup, statsChannel chan<- *metrics.StatsEntityWrapper) {
m.Called(ctx, wg, statsChannel)
wg.Done()
}
9 changes: 4 additions & 5 deletions src/core/metrics/collectors/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"context"
"sync"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/metrics"
Expand All @@ -24,7 +23,7 @@ var (

type SystemCollector struct {
sources []metrics.Source
buf chan *proto.StatsEntity
buf chan *metrics.StatsEntityWrapper
dim *metrics.CommonDim
env core.Environment
}
Expand Down Expand Up @@ -53,7 +52,7 @@ func NewSystemCollector(env core.Environment, conf *config.Config) *SystemCollec

return &SystemCollector{
sources: systemSources,
buf: make(chan *proto.StatsEntity, 65535),
buf: make(chan *metrics.StatsEntityWrapper, 65535),
dim: metrics.NewCommonDim(env.NewHostInfo("agentVersion", &conf.Tags, conf.ConfigDirs, false), conf, ""),
env: env,
}
Expand All @@ -70,7 +69,7 @@ func (c *SystemCollector) collectMetrics(ctx context.Context) {
wg.Wait()
}

func (c *SystemCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *proto.StatsEntity) {
func (c *SystemCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
defer wg.Done()
c.collectMetrics(ctx)

Expand All @@ -80,7 +79,7 @@ func (c *SystemCollector) Collect(ctx context.Context, wg *sync.WaitGroup, m cha
case <-ctx.Done():
return
case sample := <-c.buf:
sample.Dimensions = append(commonDims, sample.Dimensions...)
sample.Data.Dimensions = append(commonDims, sample.Data.Dimensions...)

select {
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 01d2002

Please sign in to comment.