From b8fc5c485da19969e101f5731e67be4e46460d60 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Thu, 13 Jul 2023 15:26:52 +0100 Subject: [PATCH 01/22] Lint imports --- src/plugins/features.go | 3 ++- src/plugins/features_test.go | 1 + src/plugins/metrics.go | 7 +++---- src/plugins/metrics_sender.go | 6 +++--- src/plugins/metrics_sender_test.go | 8 ++++---- src/plugins/metrics_test.go | 6 +++--- src/plugins/metrics_throlling.go | 9 ++++----- src/plugins/metrics_throlling_test.go | 7 +++---- .../github.com/nginx/agent/v2/src/plugins/features.go | 3 ++- .../github.com/nginx/agent/v2/src/plugins/metrics.go | 7 +++---- .../nginx/agent/v2/src/plugins/metrics_sender.go | 6 +++--- .../nginx/agent/v2/src/plugins/metrics_throlling.go | 9 ++++----- 12 files changed, 35 insertions(+), 37 deletions(-) diff --git a/src/plugins/features.go b/src/plugins/features.go index a4195302e..cb0d26b26 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -8,12 +8,13 @@ package plugins import ( - "github.com/google/uuid" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" + + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index b7db6d567..fd7ab15a4 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -17,6 +17,7 @@ import ( "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" tutils "github.com/nginx/agent/v2/test/utils" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 6a8825114..0cfc1cda1 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -12,16 +12,15 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/collectors" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type Metrics struct { diff --git a/src/plugins/metrics_sender.go b/src/plugins/metrics_sender.go index ab78a1216..7f0a90538 100644 --- a/src/plugins/metrics_sender.go +++ b/src/plugins/metrics_sender.go @@ -11,15 +11,15 @@ import ( "context" "strings" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" "github.com/nginx/agent/sdk/v2/proto" models "github.com/nginx/agent/sdk/v2/proto/events" "github.com/nginx/agent/v2/src/core" + + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type MetricsSender struct { diff --git a/src/plugins/metrics_sender_test.go b/src/plugins/metrics_sender_test.go index 274c8612c..db4b5598f 100644 --- a/src/plugins/metrics_sender_test.go +++ b/src/plugins/metrics_sender_test.go @@ -14,14 +14,14 @@ import ( "testing" "time" - "github.com/gogo/protobuf/types" "github.com/nginx/agent/sdk/v2/backoff" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" tutils "github.com/nginx/agent/v2/test/utils" + + "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestMetricsSenderSendMetrics(t *testing.T) { diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index 58353fff4..5f5b0df09 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -12,15 +12,15 @@ import ( "sort" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/collectors" tutils "github.com/nginx/agent/v2/test/utils" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) var ( diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 9c6e90fd1..8d56d5840 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -12,16 +12,15 @@ import ( "sync" "time" - "github.com/gogo/protobuf/types" - - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" + + "github.com/gogo/protobuf/types" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) const ( diff --git a/src/plugins/metrics_throlling_test.go b/src/plugins/metrics_throlling_test.go index 2e7496ef7..cfbd2055b 100644 --- a/src/plugins/metrics_throlling_test.go +++ b/src/plugins/metrics_throlling_test.go @@ -11,14 +11,13 @@ import ( "context" "testing" - "github.com/nginx/agent/v2/src/core/metrics" - "github.com/nginx/agent/sdk/v2/proto" - "github.com/stretchr/testify/assert" - "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" + "github.com/nginx/agent/v2/src/core/metrics" tutils "github.com/nginx/agent/v2/test/utils" + + "github.com/stretchr/testify/assert" ) func TestMetricsThrottle_Process(t *testing.T) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index a4195302e..cb0d26b26 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -8,12 +8,13 @@ package plugins import ( - "github.com/google/uuid" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" + + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 6a8825114..0cfc1cda1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -12,16 +12,15 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/collectors" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type Metrics struct { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go index ab78a1216..7f0a90538 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go @@ -11,15 +11,15 @@ import ( "context" "strings" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" "github.com/nginx/agent/sdk/v2/proto" models "github.com/nginx/agent/sdk/v2/proto/events" "github.com/nginx/agent/v2/src/core" + + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type MetricsSender struct { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 9c6e90fd1..8d56d5840 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -12,16 +12,15 @@ import ( "sync" "time" - "github.com/gogo/protobuf/types" - - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" + + "github.com/gogo/protobuf/types" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) const ( From 4eb5253af39cb9c9ea1ef263980ffb9382cffd0c Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Mon, 17 Jul 2023 16:55:15 +0100 Subject: [PATCH 02/22] Wip --- sdk/agent/config/config_helpers.go | 3 ++- src/core/topics.go | 5 +++-- src/plugins/features.go | 14 +++++++++----- src/plugins/features_test.go | 18 +++++++++--------- src/plugins/metrics_sender.go | 2 +- src/plugins/metrics_throlling.go | 16 +++++++++++++++- src/plugins/metrics_throlling_test.go | 2 +- 7 files changed, 40 insertions(+), 20 deletions(-) diff --git a/sdk/agent/config/config_helpers.go b/sdk/agent/config/config_helpers.go index 8257c81a8..772ecc0c5 100644 --- a/sdk/agent/config/config_helpers.go +++ b/sdk/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" diff --git a/src/core/topics.go b/src/core/topics.go index 3c9f2fdb0..6973392ff 100644 --- a/src/core/topics.go +++ b/src/core/topics.go @@ -41,9 +41,10 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" + MetricReportStream = "metrics.report.stream" LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" - LoggerPath = LoggerPrefix + "path" + LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? + LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/src/plugins/features.go b/src/plugins/features.go index cb0d26b26..4b8c0848b 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -101,12 +101,12 @@ func (f *Features) Process(msg *core.Message) { if initFeature, ok := f.featureMap[feature]; ok { featurePlugins := initFeature(feature) plugins = append(plugins, featurePlugins...) - } } + err := f.pipeline.Register(agent_config.DefaultPluginSize, plugins, nil) if err != nil { - log.Warnf("Unable to register feature, %v", err) + log.Warnf("Unable to register features: %v", err) } for _, plugin := range plugins { @@ -131,6 +131,10 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { + + // metrics-sender + // metrics-throttle becomes just throttling + // metrics-collection just collecting conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) @@ -264,13 +268,13 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { metrics := NewMetrics(f.conf, f.env, f.binary) countingPlugins = append(countingPlugins, metrics) } - nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) + nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) countingPlugins = append(countingPlugins, nginxCounting) return countingPlugins } - return countingPlugins + return []core.Plugin{} } - return countingPlugins + return []core.Plugin{} } diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index fd7ab15a4..df47d67dd 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -23,15 +23,6 @@ import ( ) func TestFeatures_Process(t *testing.T) { - processID := "12345" - - processes := []core.Process{ - { - Name: processID, - IsMaster: true, - }, - } - testCases := []struct { testName string featureKey string @@ -64,6 +55,15 @@ func TestFeatures_Process(t *testing.T) { }, } + processID := "12345" + + processes := []core.Process{ + { + Name: processID, + IsMaster: true, + }, + } + detailsMap := map[string]*proto.NginxDetails{ processID: { ProcessPath: "/path/to/nginx", diff --git a/src/plugins/metrics_sender.go b/src/plugins/metrics_sender.go index 7f0a90538..551c80eb6 100644 --- a/src/plugins/metrics_sender.go +++ b/src/plugins/metrics_sender.go @@ -55,7 +55,7 @@ func (r *MetricsSender) Close() { } func (r *MetricsSender) Info() *core.Info { - return core.NewInfo(agent_config.FeatureMetricSender, "v0.0.1") + return core.NewInfo(agent_config.FeatureMetricsSender, "v0.0.1") } func (r *MetricsSender) Process(msg *core.Message) { diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 8d56d5840..77966894b 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -87,6 +87,20 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.syncAgentConfigChange() r.collectorsUpdate.Store(true) return + case msg.Exact(core.MetricReportStream): + switch bundle := msg.Data().(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + r.metricBuffer = append(r.metricBuffer, report) + } + } + } + } + r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) + r.metricBuffer = make([]core.Payload, 0) + case msg.Exact(core.MetricReport): if r.metricsAggregation { switch bundle := msg.Data().(type) { @@ -132,7 +146,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } func (r *MetricsThrottle) Subscriptions() []string { - return []string{core.MetricReport, core.AgentConfigChanged, core.LoggerLevel} + return []string{core.MetricReport, core.MetricReportStream, core.AgentConfigChanged} } func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.WaitGroup) { diff --git a/src/plugins/metrics_throlling_test.go b/src/plugins/metrics_throlling_test.go index cfbd2055b..96d411b51 100644 --- a/src/plugins/metrics_throlling_test.go +++ b/src/plugins/metrics_throlling_test.go @@ -132,7 +132,7 @@ func TestMetricsThrottle_Process(t *testing.T) { } func TestMetricsThrottle_Subscriptions(t *testing.T) { - subs := []string{core.MetricReport, core.AgentConfigChanged, core.LoggerLevel} + subs := []string{core.MetricReport, core.AgentConfigChanged} pluginUnderTest := NewMetricsThrottle(tutils.GetMockAgentConfig(), &tutils.MockEnvironment{}) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) From a0427bcaceb7c55117dec795b8b7173af4e58ee6 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Wed, 9 Aug 2023 11:16:34 +0100 Subject: [PATCH 03/22] Wip send stream directly to sender --- main.go | 6 +- sdk/agent/config/config_helpers.go | 2 - src/plugins/features.go | 76 ++++++++++++++----- src/plugins/metrics.go | 44 ++++++++++- src/plugins/metrics_test.go | 14 ++-- src/plugins/metrics_throlling.go | 3 +- src/plugins/metrics_throlling_test.go | 19 ++++- .../sdk/v2/agent/config/config_helpers.go | 3 +- .../sdk/v2/agent/config/config_helpers.go | 3 +- .../nginx/agent/v2/src/core/topics.go | 5 +- .../nginx/agent/v2/src/plugins/features.go | 14 ++-- .../agent/v2/src/plugins/metrics_sender.go | 2 +- .../agent/v2/src/plugins/metrics_throlling.go | 16 +++- .../sdk/v2/agent/config/config_helpers.go | 3 +- 14 files changed, 161 insertions(+), 49 deletions(-) diff --git a/main.go b/main.go index 32df876ec..8080a6a24 100644 --- a/main.go +++ b/main.go @@ -240,11 +240,7 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env * } if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { - corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary)) - } - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { - corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env)) + corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary, false)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { diff --git a/sdk/agent/config/config_helpers.go b/sdk/agent/config/config_helpers.go index 772ecc0c5..4d4e39520 100644 --- a/sdk/agent/config/config_helpers.go +++ b/sdk/agent/config/config_helpers.go @@ -77,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -97,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/src/plugins/features.go b/src/plugins/features.go index 4b8c0848b..3a4e7da65 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -55,6 +55,12 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, + agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + return f.enableMetricsSenderFeature(data) + }, + agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { + return f.enableMetricsCollectionFeature(data) + }, agent_config.FeatureDataPlaneStatus: func(data string) []core.Plugin { return f.enableDataPlaneStatusFeature(data) }, @@ -132,63 +138,96 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { - // metrics-sender - // metrics-throttle becomes just throttling - // metrics-collection just collecting conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, false) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) + + return []core.Plugin{metrics, metricsThrottle} + } + return []core.Plugin{} +} + +func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + metrics := NewMetrics(f.conf, f.env, f.binary, false) return []core.Plugin{metrics} } return []core.Plugin{} } -func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { +func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - api := NewAgentAPI(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) - return []core.Plugin{api} + return []core.Plugin{metricsThrottle} } return []core.Plugin{} } -func (f *Features) enableRegistrationFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { +func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + metricsSender := NewMetricsSender(f.commander) - return []core.Plugin{registration} + return []core.Plugin{metricsSender} } return []core.Plugin{} } -func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { +func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - metricsThrottle := NewMetricsThrottle(f.conf, f.env) + api := NewAgentAPI(f.conf, f.env, f.binary) - return []core.Plugin{metricsThrottle} + return []core.Plugin{api} + } + return []core.Plugin{} +} + +func (f *Features) enableRegistrationFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + + return []core.Plugin{registration} } return []core.Plugin{} } @@ -264,8 +303,9 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { } f.conf = conf - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { - metrics := NewMetrics(f.conf, f.env, f.binary) + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + metrics := NewMetrics(f.conf, f.env, f.binary, false) countingPlugins = append(countingPlugins, metrics) } diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 0cfc1cda1..a4566bb2b 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -39,9 +39,10 @@ type Metrics struct { env core.Environment conf *config.Config binary core.NginxBinary + passthrough bool } -func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { +func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, passthrough bool) *Metrics { collectorConfigsMap := createCollectorConfigsMap(config, env, binary) return &Metrics{ collectorsUpdate: atomic.NewBool(false), @@ -56,6 +57,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi env: env, conf: config, binary: binary, + passthrough: passthrough, } } @@ -158,12 +160,28 @@ func (m *Metrics) Subscriptions() []string { } } +// switch bundle := msg.Data().(type) { +// case *metrics.MetricsReportBundle: +// if len(bundle.Data) > 0 { +// for _, report := range bundle.Data { +// if len(report.Data) > 0 { +// r.metricBuffer = append(r.metricBuffer, report) +// } +// } +// } +// } +// r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) +// r.metricBuffer = make([]core.Payload, 0) + func (m *Metrics) metricsGoroutine() { m.wg.Add(1) defer m.ticker.Stop() defer m.wg.Done() log.Info("Metrics waiting for handshake to be completed") m.registerStatsSources() + + metricBuffer := make([]core.Payload, 0) + for { select { case <-m.ctx.Done(): @@ -174,8 +192,28 @@ func (m *Metrics) metricsGoroutine() { return case <-m.ticker.C: stats := m.collectStats() - if bundle := metrics.GenerateMetricsReportBundle(stats); bundle != nil { - m.pipeline.Process(core.NewMessage(core.MetricReport, bundle)) + if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { + // topic := core.MetricReport + // if m.passthrough { + // topic = core.MetricReportStream + // } + // m.pipeline.Process(core.NewMessage(topic, bundle)) + + metricBuffer = make([]core.Payload, 0) + + switch bundle := bundlePayload.(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + metricBuffer = append(metricBuffer, report) + } + } + } + default: + log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) + } + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) } if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index 5f5b0df09..47be8bd61 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -163,7 +163,7 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { }, }) - metricsPlugin := NewMetrics(config, env, binary) + metricsPlugin := NewMetrics(config, env, binary, false) metricsPlugin.collectors = []metrics.Collector{ collectors.NewNginxCollector(config, env, metricsPlugin.collectorConfigsMap[firstNginxId], binary), } @@ -215,7 +215,7 @@ func TestMetrics_Process_AgentConfigChanged(t *testing.T) { updatedTags: true, }, { - testName: "NoValuesToUpate", + testName: "NoValuesToUpdate", config: &config.Config{ ClientID: "12345", Tags: tutils.InitialConfTags, @@ -263,7 +263,7 @@ func TestMetrics_Process_AgentConfigChanged(t *testing.T) { }) // Setup metrics and mock pipeline - metricsPlugin := NewMetrics(tc.config, env, binary) + metricsPlugin := NewMetrics(tc.config, env, binary, false) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{metricsPlugin}, []core.ExtensionPlugin{}) messagePipe.RunWithoutInit() @@ -294,7 +294,7 @@ func TestMetrics_Process_AgentCollectorsUpdate(t *testing.T) { env := tutils.GetMockEnvWithProcess() env.On("IsContainer").Return(false) - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, tutils.GetMockNginxBinary()) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, tutils.GetMockNginxBinary(), false) pluginUnderTest.Process(core.NewMessage(core.AgentCollectorsUpdate, nil)) assert.True(t, pluginUnderTest.collectorsUpdate.Load()) @@ -307,7 +307,7 @@ func TestMetrics_Process_NginxStatusAPIUpdate_AgentConfigChanged(t *testing.T) { env := tutils.GetMockEnvWithHostAndProcess() env.On("IsContainer").Return(false) - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, binary) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, binary, false) pluginUnderTest.Process(core.NewMessage(core.NginxPluginConfigured, nil)) conf := pluginUnderTest.collectorConfigsMap[secondNginxId] @@ -329,7 +329,7 @@ func TestMetrics_Process_NginxStatusAPIUpdate_AgentConfigChanged(t *testing.T) { } func TestMetrics_Info(t *testing.T) { - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), false) assert.Equal(t, "metrics", pluginUnderTest.Info().Name()) } @@ -343,6 +343,6 @@ func TestMetrics_Subscriptions(t *testing.T) { core.NginxDetailProcUpdate, core.NginxConfigApplySucceeded, } - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), false) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) } diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 77966894b..7c1e5159d 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -88,6 +88,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.collectorsUpdate.Store(true) return case msg.Exact(core.MetricReportStream): + // TODO: Is r.metricsAggregation ineffectual in this case? switch bundle := msg.Data().(type) { case *metrics.MetricsReportBundle: if len(bundle.Data) > 0 { @@ -217,5 +218,5 @@ func (r *MetricsThrottle) getAggregatedReports() (reports []core.Payload) { } } - return + return reports } diff --git a/src/plugins/metrics_throlling_test.go b/src/plugins/metrics_throlling_test.go index 96d411b51..b3b031047 100644 --- a/src/plugins/metrics_throlling_test.go +++ b/src/plugins/metrics_throlling_test.go @@ -101,6 +101,23 @@ func TestMetricsThrottle_Process(t *testing.T) { }, config: tutils.GetMockAgentConfig(), }, + { + name: "metrics passthrough", + msgs: []*core.Message{ + core.NewMessage(core.MetricReportStream, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{{}}}), + core.NewMessage(core.MetricReportStream, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{{}}}), + core.NewMessage(core.MetricReportStream, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{{}}}), + }, + msgTopics: []string{ + core.MetricReportStream, + core.MetricReportStream, + core.MetricReportStream, + core.CommMetrics, + core.CommMetrics, + core.CommMetrics, + }, + config: tutils.GetMockAgentConfig(), + }, { name: "config changed", msgs: []*core.Message{ @@ -132,7 +149,7 @@ func TestMetricsThrottle_Process(t *testing.T) { } func TestMetricsThrottle_Subscriptions(t *testing.T) { - subs := []string{core.MetricReport, core.AgentConfigChanged} + subs := []string{core.MetricReport, core.MetricReportStream, core.AgentConfigChanged} pluginUnderTest := NewMetricsThrottle(tutils.GetMockAgentConfig(), &tutils.MockEnvironment{}) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 8257c81a8..772ecc0c5 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 8257c81a8..772ecc0c5 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go index 3c9f2fdb0..6973392ff 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -41,9 +41,10 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" + MetricReportStream = "metrics.report.stream" LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" - LoggerPath = LoggerPrefix + "path" + LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? + LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index cb0d26b26..4b8c0848b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -101,12 +101,12 @@ func (f *Features) Process(msg *core.Message) { if initFeature, ok := f.featureMap[feature]; ok { featurePlugins := initFeature(feature) plugins = append(plugins, featurePlugins...) - } } + err := f.pipeline.Register(agent_config.DefaultPluginSize, plugins, nil) if err != nil { - log.Warnf("Unable to register feature, %v", err) + log.Warnf("Unable to register features: %v", err) } for _, plugin := range plugins { @@ -131,6 +131,10 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { + + // metrics-sender + // metrics-throttle becomes just throttling + // metrics-collection just collecting conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) @@ -264,13 +268,13 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { metrics := NewMetrics(f.conf, f.env, f.binary) countingPlugins = append(countingPlugins, metrics) } - nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) + nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) countingPlugins = append(countingPlugins, nginxCounting) return countingPlugins } - return countingPlugins + return []core.Plugin{} } - return countingPlugins + return []core.Plugin{} } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go index 7f0a90538..551c80eb6 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go @@ -55,7 +55,7 @@ func (r *MetricsSender) Close() { } func (r *MetricsSender) Info() *core.Info { - return core.NewInfo(agent_config.FeatureMetricSender, "v0.0.1") + return core.NewInfo(agent_config.FeatureMetricsSender, "v0.0.1") } func (r *MetricsSender) Process(msg *core.Message) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 8d56d5840..77966894b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -87,6 +87,20 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.syncAgentConfigChange() r.collectorsUpdate.Store(true) return + case msg.Exact(core.MetricReportStream): + switch bundle := msg.Data().(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + r.metricBuffer = append(r.metricBuffer, report) + } + } + } + } + r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) + r.metricBuffer = make([]core.Payload, 0) + case msg.Exact(core.MetricReport): if r.metricsAggregation { switch bundle := msg.Data().(type) { @@ -132,7 +146,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } func (r *MetricsThrottle) Subscriptions() []string { - return []string{core.MetricReport, core.AgentConfigChanged, core.LoggerLevel} + return []string{core.MetricReport, core.MetricReportStream, core.AgentConfigChanged} } func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.WaitGroup) { diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 8257c81a8..772ecc0c5 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" From de0dfb413c57d3d9e84775820f6e367c89707841 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Wed, 9 Aug 2023 11:17:57 +0100 Subject: [PATCH 04/22] Vendor --- .../sdk/v2/agent/config/config_helpers.go | 2 - .../sdk/v2/agent/config/config_helpers.go | 2 - .../nginx/agent/v2/src/plugins/features.go | 76 ++++++++++++++----- .../nginx/agent/v2/src/plugins/metrics.go | 44 ++++++++++- .../agent/v2/src/plugins/metrics_throlling.go | 3 +- .../sdk/v2/agent/config/config_helpers.go | 2 - 6 files changed, 101 insertions(+), 28 deletions(-) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 772ecc0c5..4d4e39520 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -77,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -97,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 772ecc0c5..4d4e39520 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -77,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -97,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index 4b8c0848b..3a4e7da65 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -55,6 +55,12 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, + agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + return f.enableMetricsSenderFeature(data) + }, + agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { + return f.enableMetricsCollectionFeature(data) + }, agent_config.FeatureDataPlaneStatus: func(data string) []core.Plugin { return f.enableDataPlaneStatusFeature(data) }, @@ -132,63 +138,96 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { - // metrics-sender - // metrics-throttle becomes just throttling - // metrics-collection just collecting conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, false) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) + + return []core.Plugin{metrics, metricsThrottle} + } + return []core.Plugin{} +} + +func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + metrics := NewMetrics(f.conf, f.env, f.binary, false) return []core.Plugin{metrics} } return []core.Plugin{} } -func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { +func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - api := NewAgentAPI(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) - return []core.Plugin{api} + return []core.Plugin{metricsThrottle} } return []core.Plugin{} } -func (f *Features) enableRegistrationFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { +func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + metricsSender := NewMetricsSender(f.commander) - return []core.Plugin{registration} + return []core.Plugin{metricsSender} } return []core.Plugin{} } -func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { +func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - metricsThrottle := NewMetricsThrottle(f.conf, f.env) + api := NewAgentAPI(f.conf, f.env, f.binary) - return []core.Plugin{metricsThrottle} + return []core.Plugin{api} + } + return []core.Plugin{} +} + +func (f *Features) enableRegistrationFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + + return []core.Plugin{registration} } return []core.Plugin{} } @@ -264,8 +303,9 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { } f.conf = conf - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { - metrics := NewMetrics(f.conf, f.env, f.binary) + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + metrics := NewMetrics(f.conf, f.env, f.binary, false) countingPlugins = append(countingPlugins, metrics) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 0cfc1cda1..a4566bb2b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -39,9 +39,10 @@ type Metrics struct { env core.Environment conf *config.Config binary core.NginxBinary + passthrough bool } -func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { +func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, passthrough bool) *Metrics { collectorConfigsMap := createCollectorConfigsMap(config, env, binary) return &Metrics{ collectorsUpdate: atomic.NewBool(false), @@ -56,6 +57,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi env: env, conf: config, binary: binary, + passthrough: passthrough, } } @@ -158,12 +160,28 @@ func (m *Metrics) Subscriptions() []string { } } +// switch bundle := msg.Data().(type) { +// case *metrics.MetricsReportBundle: +// if len(bundle.Data) > 0 { +// for _, report := range bundle.Data { +// if len(report.Data) > 0 { +// r.metricBuffer = append(r.metricBuffer, report) +// } +// } +// } +// } +// r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) +// r.metricBuffer = make([]core.Payload, 0) + func (m *Metrics) metricsGoroutine() { m.wg.Add(1) defer m.ticker.Stop() defer m.wg.Done() log.Info("Metrics waiting for handshake to be completed") m.registerStatsSources() + + metricBuffer := make([]core.Payload, 0) + for { select { case <-m.ctx.Done(): @@ -174,8 +192,28 @@ func (m *Metrics) metricsGoroutine() { return case <-m.ticker.C: stats := m.collectStats() - if bundle := metrics.GenerateMetricsReportBundle(stats); bundle != nil { - m.pipeline.Process(core.NewMessage(core.MetricReport, bundle)) + if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { + // topic := core.MetricReport + // if m.passthrough { + // topic = core.MetricReportStream + // } + // m.pipeline.Process(core.NewMessage(topic, bundle)) + + metricBuffer = make([]core.Payload, 0) + + switch bundle := bundlePayload.(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + metricBuffer = append(metricBuffer, report) + } + } + } + default: + log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) + } + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) } if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 77966894b..7c1e5159d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -88,6 +88,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.collectorsUpdate.Store(true) return case msg.Exact(core.MetricReportStream): + // TODO: Is r.metricsAggregation ineffectual in this case? switch bundle := msg.Data().(type) { case *metrics.MetricsReportBundle: if len(bundle.Data) > 0 { @@ -217,5 +218,5 @@ func (r *MetricsThrottle) getAggregatedReports() (reports []core.Payload) { } } - return + return reports } diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 772ecc0c5..4d4e39520 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -77,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -97,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } From c7a726d43ad9878c8cdd48de5906907463e82244 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Wed, 9 Aug 2023 11:26:20 +0100 Subject: [PATCH 05/22] wip: fix passthrough toggle --- src/plugins/metrics.go | 48 +++++++------------ .../nginx/agent/v2/src/plugins/metrics.go | 48 +++++++------------ 2 files changed, 32 insertions(+), 64 deletions(-) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index a4566bb2b..df904e961 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -160,19 +160,6 @@ func (m *Metrics) Subscriptions() []string { } } -// switch bundle := msg.Data().(type) { -// case *metrics.MetricsReportBundle: -// if len(bundle.Data) > 0 { -// for _, report := range bundle.Data { -// if len(report.Data) > 0 { -// r.metricBuffer = append(r.metricBuffer, report) -// } -// } -// } -// } -// r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) -// r.metricBuffer = make([]core.Payload, 0) - func (m *Metrics) metricsGoroutine() { m.wg.Add(1) defer m.ticker.Stop() @@ -180,8 +167,6 @@ func (m *Metrics) metricsGoroutine() { log.Info("Metrics waiting for handshake to be completed") m.registerStatsSources() - metricBuffer := make([]core.Payload, 0) - for { select { case <-m.ctx.Done(): @@ -193,27 +178,26 @@ func (m *Metrics) metricsGoroutine() { case <-m.ticker.C: stats := m.collectStats() if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { - // topic := core.MetricReport - // if m.passthrough { - // topic = core.MetricReportStream - // } - // m.pipeline.Process(core.NewMessage(topic, bundle)) - - metricBuffer = make([]core.Payload, 0) - - switch bundle := bundlePayload.(type) { - case *metrics.MetricsReportBundle: - if len(bundle.Data) > 0 { - for _, report := range bundle.Data { - if len(report.Data) > 0 { - metricBuffer = append(metricBuffer, report) + if m.passthrough { + metricBuffer := make([]core.Payload, 0) + + switch bundle := bundlePayload.(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + metricBuffer = append(metricBuffer, report) + } } } + default: + log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) } - default: - log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) + + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) + } else { + m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } - m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) } if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index a4566bb2b..df904e961 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -160,19 +160,6 @@ func (m *Metrics) Subscriptions() []string { } } -// switch bundle := msg.Data().(type) { -// case *metrics.MetricsReportBundle: -// if len(bundle.Data) > 0 { -// for _, report := range bundle.Data { -// if len(report.Data) > 0 { -// r.metricBuffer = append(r.metricBuffer, report) -// } -// } -// } -// } -// r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) -// r.metricBuffer = make([]core.Payload, 0) - func (m *Metrics) metricsGoroutine() { m.wg.Add(1) defer m.ticker.Stop() @@ -180,8 +167,6 @@ func (m *Metrics) metricsGoroutine() { log.Info("Metrics waiting for handshake to be completed") m.registerStatsSources() - metricBuffer := make([]core.Payload, 0) - for { select { case <-m.ctx.Done(): @@ -193,27 +178,26 @@ func (m *Metrics) metricsGoroutine() { case <-m.ticker.C: stats := m.collectStats() if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { - // topic := core.MetricReport - // if m.passthrough { - // topic = core.MetricReportStream - // } - // m.pipeline.Process(core.NewMessage(topic, bundle)) - - metricBuffer = make([]core.Payload, 0) - - switch bundle := bundlePayload.(type) { - case *metrics.MetricsReportBundle: - if len(bundle.Data) > 0 { - for _, report := range bundle.Data { - if len(report.Data) > 0 { - metricBuffer = append(metricBuffer, report) + if m.passthrough { + metricBuffer := make([]core.Payload, 0) + + switch bundle := bundlePayload.(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + metricBuffer = append(metricBuffer, report) + } } } + default: + log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) } - default: - log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) + + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) + } else { + m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } - m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) } if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) From b815154091c5e5db81d6102b2ce0fdadcd715dd6 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Wed, 9 Aug 2023 11:52:53 +0100 Subject: [PATCH 06/22] vendor --- .../vendor/github.com/nginx/agent/v2/src/core/topics.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go index 3c9f2fdb0..6973392ff 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -41,9 +41,10 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" + MetricReportStream = "metrics.report.stream" LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" - LoggerPath = LoggerPrefix + "path" + LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? + LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" From b6cccd8050010e7cb70630c7773a3c5522a2ab6e Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Thu, 10 Aug 2023 09:51:17 +0100 Subject: [PATCH 07/22] wip --- src/plugins/features_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index 17853bf32..792b4b6e8 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -94,14 +94,15 @@ func TestFeatures_Process(t *testing.T) { cmdr := tutils.NewMockCommandClient() - configuration, _ := config.GetConfig("1234") + configuration, err := config.GetConfig("1234") + assert.NoError(t, err) pluginUnderTest := NewFeatures(cmdr, configuration, env, binary, "agentVersion") for _, tc := range testCases { messagePipe := core.SetupMockMessagePipe(t, ctx, []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) - assert.Equal(t, 1, len(messagePipe.GetPlugins())) + assert.Len(t, messagePipe.GetPlugins(), 1) assert.Equal(t, agent_config.FeaturesPlugin, messagePipe.GetPlugins()[0].Info().Name()) messagePipe.Process(core.NewMessage(core.EnableFeature, []string{tc.featureKey})) From 23bb76ddb8cdbf50ad99d9508105dc8445fd896d Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Thu, 10 Aug 2023 14:10:57 +0100 Subject: [PATCH 08/22] Debug --- main.go | 4 ++++ sdk/agent/config/config_helpers.go | 4 +++- src/plugins/features.go | 32 +++++++++++++++--------------- src/plugins/metrics.go | 3 ++- 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/main.go b/main.go index 6a0941dfc..23f6fc4e5 100644 --- a/main.go +++ b/main.go @@ -253,6 +253,10 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env * corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary, false)) } + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env)) + } + if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { corePlugins = append(corePlugins, plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env, version)) } diff --git a/sdk/agent/config/config_helpers.go b/sdk/agent/config/config_helpers.go index 4d4e39520..0d89065b9 100644 --- a/sdk/agent/config/config_helpers.go +++ b/sdk/agent/config/config_helpers.go @@ -76,7 +76,9 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - FeatureMetrics, + // FeatureMetrics, + FeatureMetricsCollection, + FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/src/plugins/features.go b/src/plugins/features.go index 3a4e7da65..9b8517be0 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -43,9 +43,9 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { f.pipeline = pipeline f.featureMap = map[string]func(data string) []core.Plugin{ - agent_config.FeatureMetrics: func(data string) []core.Plugin { - return f.enableMetricsFeature(data) - }, + // agent_config.FeatureMetrics: func(data string) []core.Plugin { + // return f.enableMetricsFeature(data) + // }, agent_config.FeatureAgentAPI: func(data string) []core.Plugin { return f.enableAgentAPIFeature(data) }, @@ -135,22 +135,22 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { return []core.Plugin{} } -func (f *Features) enableMetricsFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { +// func (f *Features) enableMetricsFeature(data string) []core.Plugin { +// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { - conf, err := config.GetConfig(f.conf.ClientID) - if err != nil { - log.Warnf("Unable to get agent config, %v", err) - } - f.conf = conf +// conf, err := config.GetConfig(f.conf.ClientID) +// if err != nil { +// log.Warnf("Unable to get agent config, %v", err) +// } +// f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary, false) - metricsThrottle := NewMetricsThrottle(f.conf, f.env) +// metrics := NewMetrics(f.conf, f.env, f.binary, false) +// metricsThrottle := NewMetricsThrottle(f.conf, f.env) - return []core.Plugin{metrics, metricsThrottle} - } - return []core.Plugin{} -} +// return []core.Plugin{metrics, metricsThrottle} +// } +// return []core.Plugin{} +// } func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index df904e961..b1fee3a78 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -190,11 +190,12 @@ func (m *Metrics) metricsGoroutine() { } } } + + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) default: log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) } - m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) } else { m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } From 76844359bbcfbae6f020bce53fe20a313ad9d347 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Thu, 10 Aug 2023 14:58:41 +0100 Subject: [PATCH 09/22] Debug unit tests --- src/core/metrics/collectors/nginx.go | 4 +- src/plugins/features.go | 38 +++++----- src/plugins/metrics_throlling.go | 14 ---- .../sdk/v2/agent/config/config_helpers.go | 4 +- test/performance/metrics_test.go | 2 +- .../sdk/v2/agent/config/config_helpers.go | 4 +- .../v2/src/core/metrics/collectors/nginx.go | 4 +- .../nginx/agent/v2/src/plugins/features.go | 70 ++++++++++--------- .../nginx/agent/v2/src/plugins/metrics.go | 3 +- .../agent/v2/src/plugins/metrics_throlling.go | 14 ---- .../sdk/v2/agent/config/config_helpers.go | 4 +- 11 files changed, 76 insertions(+), 85 deletions(-) diff --git a/src/core/metrics/collectors/nginx.go b/src/core/metrics/collectors/nginx.go index 803822ff5..84c5c15fc 100644 --- a/src/core/metrics/collectors/nginx.go +++ b/src/core/metrics/collectors/nginx.go @@ -11,13 +11,13 @@ import ( "context" "sync" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/sources" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" ) type NginxCollector struct { diff --git a/src/plugins/features.go b/src/plugins/features.go index 9b8517be0..d7db307f6 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -55,9 +55,10 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, - agent_config.FeatureMetricsSender: func(data string) []core.Plugin { - return f.enableMetricsSenderFeature(data) - }, + // TODO: Not required? + // agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + // return f.enableMetricsSenderFeature(data) + // }, agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { return f.enableMetricsCollectionFeature(data) }, @@ -162,7 +163,11 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary, false) + passthrough := false + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + passthrough = true + } + metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) return []core.Plugin{metrics} } @@ -186,21 +191,22 @@ func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { return []core.Plugin{} } -func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { +// TODO: Not required? +// func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { +// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { - conf, err := config.GetConfig(f.conf.ClientID) - if err != nil { - log.Warnf("Unable to get agent config, %v", err) - } - f.conf = conf +// conf, err := config.GetConfig(f.conf.ClientID) +// if err != nil { +// log.Warnf("Unable to get agent config, %v", err) +// } +// f.conf = conf - metricsSender := NewMetricsSender(f.commander) +// metricsSender := NewMetricsSender(f.commander) - return []core.Plugin{metricsSender} - } - return []core.Plugin{} -} +// return []core.Plugin{metricsSender} +// } +// return []core.Plugin{} +// } func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 7c1e5159d..f037f559d 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -87,20 +87,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.syncAgentConfigChange() r.collectorsUpdate.Store(true) return - case msg.Exact(core.MetricReportStream): - // TODO: Is r.metricsAggregation ineffectual in this case? - switch bundle := msg.Data().(type) { - case *metrics.MetricsReportBundle: - if len(bundle.Data) > 0 { - for _, report := range bundle.Data { - if len(report.Data) > 0 { - r.metricBuffer = append(r.metricBuffer, report) - } - } - } - } - r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) - r.metricBuffer = make([]core.Payload, 0) case msg.Exact(core.MetricReport): if r.metricsAggregation { diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 4d4e39520..0d89065b9 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -76,7 +76,9 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - FeatureMetrics, + // FeatureMetrics, + FeatureMetricsCollection, + FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/test/performance/metrics_test.go b/test/performance/metrics_test.go index 5debaa095..42509b140 100644 --- a/test/performance/metrics_test.go +++ b/test/performance/metrics_test.go @@ -325,7 +325,7 @@ func startNginxAgent(b *testing.B) { plugins.NewCommander(commander, loadedConfig), plugins.NewMetricsSender(reporter), plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.New().String()), "1.0.0"), - plugins.NewMetrics(loadedConfig, env, binary), + plugins.NewMetrics(loadedConfig, env, binary, false), plugins.NewMetricsThrottle(loadedConfig, env), plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env, "1.0.0"), ) diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 4d4e39520..0d89065b9 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -76,7 +76,9 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - FeatureMetrics, + // FeatureMetrics, + FeatureMetricsCollection, + FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go index 803822ff5..84c5c15fc 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go @@ -11,13 +11,13 @@ import ( "context" "sync" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/sources" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" ) type NginxCollector struct { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index 3a4e7da65..d7db307f6 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -43,9 +43,9 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { f.pipeline = pipeline f.featureMap = map[string]func(data string) []core.Plugin{ - agent_config.FeatureMetrics: func(data string) []core.Plugin { - return f.enableMetricsFeature(data) - }, + // agent_config.FeatureMetrics: func(data string) []core.Plugin { + // return f.enableMetricsFeature(data) + // }, agent_config.FeatureAgentAPI: func(data string) []core.Plugin { return f.enableAgentAPIFeature(data) }, @@ -55,9 +55,10 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, - agent_config.FeatureMetricsSender: func(data string) []core.Plugin { - return f.enableMetricsSenderFeature(data) - }, + // TODO: Not required? + // agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + // return f.enableMetricsSenderFeature(data) + // }, agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { return f.enableMetricsCollectionFeature(data) }, @@ -135,22 +136,22 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { return []core.Plugin{} } -func (f *Features) enableMetricsFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { +// func (f *Features) enableMetricsFeature(data string) []core.Plugin { +// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { - conf, err := config.GetConfig(f.conf.ClientID) - if err != nil { - log.Warnf("Unable to get agent config, %v", err) - } - f.conf = conf +// conf, err := config.GetConfig(f.conf.ClientID) +// if err != nil { +// log.Warnf("Unable to get agent config, %v", err) +// } +// f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary, false) - metricsThrottle := NewMetricsThrottle(f.conf, f.env) +// metrics := NewMetrics(f.conf, f.env, f.binary, false) +// metricsThrottle := NewMetricsThrottle(f.conf, f.env) - return []core.Plugin{metrics, metricsThrottle} - } - return []core.Plugin{} -} +// return []core.Plugin{metrics, metricsThrottle} +// } +// return []core.Plugin{} +// } func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || @@ -162,7 +163,11 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary, false) + passthrough := false + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + passthrough = true + } + metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) return []core.Plugin{metrics} } @@ -186,21 +191,22 @@ func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { return []core.Plugin{} } -func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { +// TODO: Not required? +// func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { +// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { - conf, err := config.GetConfig(f.conf.ClientID) - if err != nil { - log.Warnf("Unable to get agent config, %v", err) - } - f.conf = conf +// conf, err := config.GetConfig(f.conf.ClientID) +// if err != nil { +// log.Warnf("Unable to get agent config, %v", err) +// } +// f.conf = conf - metricsSender := NewMetricsSender(f.commander) +// metricsSender := NewMetricsSender(f.commander) - return []core.Plugin{metricsSender} - } - return []core.Plugin{} -} +// return []core.Plugin{metricsSender} +// } +// return []core.Plugin{} +// } func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index df904e961..b1fee3a78 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -190,11 +190,12 @@ func (m *Metrics) metricsGoroutine() { } } } + + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) default: log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) } - m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) } else { m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 7c1e5159d..f037f559d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -87,20 +87,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.syncAgentConfigChange() r.collectorsUpdate.Store(true) return - case msg.Exact(core.MetricReportStream): - // TODO: Is r.metricsAggregation ineffectual in this case? - switch bundle := msg.Data().(type) { - case *metrics.MetricsReportBundle: - if len(bundle.Data) > 0 { - for _, report := range bundle.Data { - if len(report.Data) > 0 { - r.metricBuffer = append(r.metricBuffer, report) - } - } - } - } - r.messagePipeline.Process(core.NewMessage(core.CommMetrics, r.metricBuffer)) - r.metricBuffer = make([]core.Payload, 0) case msg.Exact(core.MetricReport): if r.metricsAggregation { diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 4d4e39520..0d89065b9 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -76,7 +76,9 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - FeatureMetrics, + // FeatureMetrics, + FeatureMetricsCollection, + FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, From 4c84640ae9863e1a80766a9b1aa094e5dc50c4e0 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Thu, 10 Aug 2023 15:33:09 +0100 Subject: [PATCH 10/22] Cleanup --- src/plugins/features.go | 14 ++++++++++---- src/plugins/metrics_throlling.go | 2 +- src/plugins/metrics_throlling_test.go | 19 +------------------ .../nginx/agent/v2/src/plugins/features.go | 14 ++++++++++---- .../agent/v2/src/plugins/metrics_throlling.go | 2 +- 5 files changed, 23 insertions(+), 28 deletions(-) diff --git a/src/plugins/features.go b/src/plugins/features.go index d7db307f6..5e9ad22ab 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -154,7 +154,7 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { // } func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { conf, err := config.GetConfig(f.conf.ClientID) @@ -175,7 +175,7 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { conf, err := config.GetConfig(f.conf.ClientID) @@ -309,9 +309,15 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { } f.conf = conf - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { - metrics := NewMetrics(f.conf, f.env, f.binary, false) + + passthrough := false + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + passthrough = true + } + metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) + countingPlugins = append(countingPlugins, metrics) } diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index f037f559d..ee0cf6bd5 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -133,7 +133,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } func (r *MetricsThrottle) Subscriptions() []string { - return []string{core.MetricReport, core.MetricReportStream, core.AgentConfigChanged} + return []string{core.MetricReport, core.AgentConfigChanged} } func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.WaitGroup) { diff --git a/src/plugins/metrics_throlling_test.go b/src/plugins/metrics_throlling_test.go index b3b031047..96d411b51 100644 --- a/src/plugins/metrics_throlling_test.go +++ b/src/plugins/metrics_throlling_test.go @@ -101,23 +101,6 @@ func TestMetricsThrottle_Process(t *testing.T) { }, config: tutils.GetMockAgentConfig(), }, - { - name: "metrics passthrough", - msgs: []*core.Message{ - core.NewMessage(core.MetricReportStream, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{{}}}), - core.NewMessage(core.MetricReportStream, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{{}}}), - core.NewMessage(core.MetricReportStream, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{{}}}), - }, - msgTopics: []string{ - core.MetricReportStream, - core.MetricReportStream, - core.MetricReportStream, - core.CommMetrics, - core.CommMetrics, - core.CommMetrics, - }, - config: tutils.GetMockAgentConfig(), - }, { name: "config changed", msgs: []*core.Message{ @@ -149,7 +132,7 @@ func TestMetricsThrottle_Process(t *testing.T) { } func TestMetricsThrottle_Subscriptions(t *testing.T) { - subs := []string{core.MetricReport, core.MetricReportStream, core.AgentConfigChanged} + subs := []string{core.MetricReport, core.AgentConfigChanged} pluginUnderTest := NewMetricsThrottle(tutils.GetMockAgentConfig(), &tutils.MockEnvironment{}) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index d7db307f6..5e9ad22ab 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -154,7 +154,7 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { // } func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { conf, err := config.GetConfig(f.conf.ClientID) @@ -175,7 +175,7 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { conf, err := config.GetConfig(f.conf.ClientID) @@ -309,9 +309,15 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { } f.conf = conf - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) || + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { - metrics := NewMetrics(f.conf, f.env, f.binary, false) + + passthrough := false + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + passthrough = true + } + metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) + countingPlugins = append(countingPlugins, metrics) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index f037f559d..ee0cf6bd5 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -133,7 +133,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } func (r *MetricsThrottle) Subscriptions() []string { - return []string{core.MetricReport, core.MetricReportStream, core.AgentConfigChanged} + return []string{core.MetricReport, core.AgentConfigChanged} } func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.WaitGroup) { From 71f32ea0b45f699d5b35e7510420f3daa9adf437 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 11 Aug 2023 15:16:20 +0100 Subject: [PATCH 11/22] Revert test change --- src/plugins/features_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index 792b4b6e8..804f3fb8d 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -94,8 +94,7 @@ func TestFeatures_Process(t *testing.T) { cmdr := tutils.NewMockCommandClient() - configuration, err := config.GetConfig("1234") - assert.NoError(t, err) + configuration, _ := config.GetConfig("1234") pluginUnderTest := NewFeatures(cmdr, configuration, env, binary, "agentVersion") From 712a25cc702d1aa44ccfe67e04b801ac1d7c7f4c Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Mon, 21 Aug 2023 11:28:46 +0100 Subject: [PATCH 12/22] Remove trailing topic --- src/core/topics.go | 1 - .../vendor/github.com/nginx/agent/v2/src/core/topics.go | 1 - .../vendor/github.com/nginx/agent/v2/src/core/topics.go | 1 - 3 files changed, 3 deletions(-) diff --git a/src/core/topics.go b/src/core/topics.go index 6973392ff..76caa00bb 100644 --- a/src/core/topics.go +++ b/src/core/topics.go @@ -41,7 +41,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - MetricReportStream = "metrics.report.stream" LoggerPrefix = "logger." LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go index 6973392ff..76caa00bb 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -41,7 +41,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - MetricReportStream = "metrics.report.stream" LoggerPrefix = "logger." LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go index 6973392ff..76caa00bb 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -41,7 +41,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - MetricReportStream = "metrics.report.stream" LoggerPrefix = "logger." LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? From cc0264ce73df905e435f89ebc827c7a6bb214d20 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 15:04:58 +0100 Subject: [PATCH 13/22] vendor --- vendor/github.com/nginx/agent/sdk/v2/config_helpers.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 1a68ea0b6..0e3205ae4 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -23,13 +23,14 @@ import ( "strings" "time" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" - log "github.com/sirupsen/logrus" ) const ( @@ -298,8 +299,8 @@ func updateNginxConfigWithCert( directoryMap *DirectoryMap, allowedDirectories map[string]struct{}, ) error { - if strings.HasPrefix("$", file) { - // variable loading, not actual cert file + if strings.Contains(file, "$") { + // cannot process any filepath with variables return nil } From 5220beb29ea2089c475c39a2ea3ef08196e64e87 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 15:56:36 +0100 Subject: [PATCH 14/22] Internal feature checks. Fix standalone load --- main.go | 9 ++- sdk/agent/config/config_helpers.go | 4 +- sdk/config_helpers.go | 3 +- src/plugins/features.go | 78 +++++++++---------- src/plugins/metrics.go | 12 ++- .../sdk/v2/agent/config/config_helpers.go | 4 +- .../nginx/agent/sdk/v2/config_helpers.go | 3 +- .../sdk/v2/agent/config/config_helpers.go | 4 +- .../nginx/agent/sdk/v2/config_helpers.go | 3 +- .../nginx/agent/v2/src/plugins/features.go | 78 +++++++++---------- .../nginx/agent/v2/src/plugins/metrics.go | 12 ++- .../sdk/v2/agent/config/config_helpers.go | 4 +- .../nginx/agent/sdk/v2/config_helpers.go | 3 +- 13 files changed, 93 insertions(+), 124 deletions(-) diff --git a/main.go b/main.go index 4096bf1bc..aedae9f34 100644 --- a/main.go +++ b/main.go @@ -222,7 +222,7 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env * } } - if reporter != nil { + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender) && reporter != nil { corePlugins = append(corePlugins, plugins.NewMetricsSender(reporter), ) @@ -239,11 +239,12 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env * corePlugins = append(corePlugins, plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), version)) } - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { - corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary, false)) + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || + (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { + corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary)) } - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env)) } diff --git a/sdk/agent/config/config_helpers.go b/sdk/agent/config/config_helpers.go index 0d89065b9..4d4e39520 100644 --- a/sdk/agent/config/config_helpers.go +++ b/sdk/agent/config/config_helpers.go @@ -76,9 +76,7 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - // FeatureMetrics, - FeatureMetricsCollection, - FeatureMetricsThrottle, + FeatureMetrics, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/sdk/config_helpers.go b/sdk/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/sdk/config_helpers.go +++ b/sdk/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/src/plugins/features.go b/src/plugins/features.go index 50cf80933..64b00626a 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -43,9 +43,9 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { f.pipeline = pipeline f.featureMap = map[string]func(data string) []core.Plugin{ - // agent_config.FeatureMetrics: func(data string) []core.Plugin { - // return f.enableMetricsFeature(data) - // }, + agent_config.FeatureMetrics: func(data string) []core.Plugin { + return f.enableMetricsFeature(data) + }, agent_config.FeatureAgentAPI: func(data string) []core.Plugin { return f.enableAgentAPIFeature(data) }, @@ -55,10 +55,9 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, - // TODO: Not required? - // agent_config.FeatureMetricsSender: func(data string) []core.Plugin { - // return f.enableMetricsSenderFeature(data) - // }, + agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + return f.enableMetricsSenderFeature(data) + }, agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { return f.enableMetricsCollectionFeature(data) }, @@ -133,22 +132,23 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { return []core.Plugin{} } -// func (f *Features) enableMetricsFeature(data string) []core.Plugin { -// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { +func (f *Features) enableMetricsFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { -// conf, err := config.GetConfig(f.conf.ClientID) -// if err != nil { -// log.Warnf("Unable to get agent config, %v", err) -// } -// f.conf = conf + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf -// metrics := NewMetrics(f.conf, f.env, f.binary, false) -// metricsThrottle := NewMetricsThrottle(f.conf, f.env) + metrics := NewMetrics(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) + metricsSender := NewMetricsSender(f.commander) -// return []core.Plugin{metrics, metricsThrottle} -// } -// return []core.Plugin{} -// } + return []core.Plugin{metrics, metricsThrottle, metricsSender} + } + return []core.Plugin{} +} func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && @@ -160,11 +160,7 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } f.conf = conf - passthrough := false - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { - passthrough = true - } - metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) + metrics := NewMetrics(f.conf, f.env, f.binary) return []core.Plugin{metrics} } @@ -188,22 +184,22 @@ func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { return []core.Plugin{} } -// TODO: Not required? -// func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { -// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { +func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { -// conf, err := config.GetConfig(f.conf.ClientID) -// if err != nil { -// log.Warnf("Unable to get agent config, %v", err) -// } -// f.conf = conf + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf -// metricsSender := NewMetricsSender(f.commander) + metricsSender := NewMetricsSender(f.commander) -// return []core.Plugin{metricsSender} -// } -// return []core.Plugin{} -// } + return []core.Plugin{metricsSender} + } + return []core.Plugin{} +} func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { @@ -309,11 +305,7 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { - passthrough := false - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { - passthrough = true - } - metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) + metrics := NewMetrics(f.conf, f.env, f.binary) countingPlugins = append(countingPlugins, metrics) } diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 01edb4b92..182ea1e1a 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -39,10 +39,9 @@ type Metrics struct { env core.Environment conf *config.Config binary core.NginxBinary - passthrough bool } -func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, passthrough bool) *Metrics { +func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { collectorConfigsMap := createCollectorConfigsMap(config, env, binary) return &Metrics{ collectorsUpdate: atomic.NewBool(false), @@ -57,7 +56,6 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi env: env, conf: config, binary: binary, - passthrough: passthrough, } } @@ -177,7 +175,9 @@ func (m *Metrics) metricsGoroutine() { case <-m.ticker.C: stats := m.collectStats() if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { - if m.passthrough { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) + } else { metricBuffer := make([]core.Payload, 0) switch bundle := bundlePayload.(type) { @@ -194,11 +194,9 @@ func (m *Metrics) metricsGoroutine() { default: log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) } - - } else { - m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } } + if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) m.collectorsUpdate.Store(false) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 0d89065b9..4d4e39520 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -76,9 +76,7 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - // FeatureMetrics, - FeatureMetricsCollection, - FeatureMetricsThrottle, + FeatureMetrics, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 0d89065b9..4d4e39520 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -76,9 +76,7 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - // FeatureMetrics, - FeatureMetricsCollection, - FeatureMetricsThrottle, + FeatureMetrics, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index 50cf80933..64b00626a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -43,9 +43,9 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { f.pipeline = pipeline f.featureMap = map[string]func(data string) []core.Plugin{ - // agent_config.FeatureMetrics: func(data string) []core.Plugin { - // return f.enableMetricsFeature(data) - // }, + agent_config.FeatureMetrics: func(data string) []core.Plugin { + return f.enableMetricsFeature(data) + }, agent_config.FeatureAgentAPI: func(data string) []core.Plugin { return f.enableAgentAPIFeature(data) }, @@ -55,10 +55,9 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, - // TODO: Not required? - // agent_config.FeatureMetricsSender: func(data string) []core.Plugin { - // return f.enableMetricsSenderFeature(data) - // }, + agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + return f.enableMetricsSenderFeature(data) + }, agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { return f.enableMetricsCollectionFeature(data) }, @@ -133,22 +132,23 @@ func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { return []core.Plugin{} } -// func (f *Features) enableMetricsFeature(data string) []core.Plugin { -// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { +func (f *Features) enableMetricsFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { -// conf, err := config.GetConfig(f.conf.ClientID) -// if err != nil { -// log.Warnf("Unable to get agent config, %v", err) -// } -// f.conf = conf + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf -// metrics := NewMetrics(f.conf, f.env, f.binary, false) -// metricsThrottle := NewMetricsThrottle(f.conf, f.env) + metrics := NewMetrics(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) + metricsSender := NewMetricsSender(f.commander) -// return []core.Plugin{metrics, metricsThrottle} -// } -// return []core.Plugin{} -// } + return []core.Plugin{metrics, metricsThrottle, metricsSender} + } + return []core.Plugin{} +} func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && @@ -160,11 +160,7 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } f.conf = conf - passthrough := false - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { - passthrough = true - } - metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) + metrics := NewMetrics(f.conf, f.env, f.binary) return []core.Plugin{metrics} } @@ -188,22 +184,22 @@ func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { return []core.Plugin{} } -// TODO: Not required? -// func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { -// if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { +func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { -// conf, err := config.GetConfig(f.conf.ClientID) -// if err != nil { -// log.Warnf("Unable to get agent config, %v", err) -// } -// f.conf = conf + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf -// metricsSender := NewMetricsSender(f.commander) + metricsSender := NewMetricsSender(f.commander) -// return []core.Plugin{metricsSender} -// } -// return []core.Plugin{} -// } + return []core.Plugin{metricsSender} + } + return []core.Plugin{} +} func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { @@ -309,11 +305,7 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { - passthrough := false - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { - passthrough = true - } - metrics := NewMetrics(f.conf, f.env, f.binary, passthrough) + metrics := NewMetrics(f.conf, f.env, f.binary) countingPlugins = append(countingPlugins, metrics) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 01edb4b92..182ea1e1a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -39,10 +39,9 @@ type Metrics struct { env core.Environment conf *config.Config binary core.NginxBinary - passthrough bool } -func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, passthrough bool) *Metrics { +func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { collectorConfigsMap := createCollectorConfigsMap(config, env, binary) return &Metrics{ collectorsUpdate: atomic.NewBool(false), @@ -57,7 +56,6 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi env: env, conf: config, binary: binary, - passthrough: passthrough, } } @@ -177,7 +175,9 @@ func (m *Metrics) metricsGoroutine() { case <-m.ticker.C: stats := m.collectStats() if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { - if m.passthrough { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) + } else { metricBuffer := make([]core.Payload, 0) switch bundle := bundlePayload.(type) { @@ -194,11 +194,9 @@ func (m *Metrics) metricsGoroutine() { default: log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) } - - } else { - m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } } + if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) m.collectorsUpdate.Store(false) diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 0d89065b9..4d4e39520 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -76,9 +76,7 @@ func GetDefaultFeatures() []string { FeatureNginxConfigAsync, FeatureNginxSSLConfig, FeatureNginxCounting, - // FeatureMetrics, - FeatureMetricsCollection, - FeatureMetricsThrottle, + FeatureMetrics, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( From 42fe6d569e40cd894ede3fbf82077fbfb89f4680 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 16:05:03 +0100 Subject: [PATCH 15/22] Fix test --- src/plugins/metrics_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index 38492194d..64c943933 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -163,7 +163,7 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { }, }) - metricsPlugin := NewMetrics(config, env, binary, false) + metricsPlugin := NewMetrics(config, env, binary) metricsPlugin.collectors = []metrics.Collector{ collectors.NewNginxCollector(config, env, metricsPlugin.collectorConfigsMap[firstNginxId], binary), } @@ -263,7 +263,7 @@ func TestMetrics_Process_AgentConfigChanged(t *testing.T) { }) // Setup metrics and mock pipeline - metricsPlugin := NewMetrics(tc.config, env, binary, false) + metricsPlugin := NewMetrics(tc.config, env, binary) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{metricsPlugin}, []core.ExtensionPlugin{}) messagePipe.RunWithoutInit() @@ -294,7 +294,7 @@ func TestMetrics_Process_AgentCollectorsUpdate(t *testing.T) { env := tutils.GetMockEnvWithProcess() env.On("IsContainer").Return(false) - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, tutils.GetMockNginxBinary(), false) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, tutils.GetMockNginxBinary()) pluginUnderTest.Process(core.NewMessage(core.AgentCollectorsUpdate, nil)) assert.True(t, pluginUnderTest.collectorsUpdate.Load()) @@ -307,7 +307,7 @@ func TestMetrics_Process_NginxConfigApplySucceeded_AgentConfigChanged(t *testing env := tutils.GetMockEnvWithHostAndProcess() env.On("IsContainer").Return(false) - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, binary, false) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, binary) pluginUnderTest.Process(core.NewMessage(core.NginxPluginConfigured, nil)) conf := pluginUnderTest.collectorConfigsMap[secondNginxId] @@ -329,7 +329,7 @@ func TestMetrics_Process_NginxConfigApplySucceeded_AgentConfigChanged(t *testing } func TestMetrics_Info(t *testing.T) { - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), false) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) assert.Equal(t, "metrics", pluginUnderTest.Info().Name()) } @@ -342,6 +342,6 @@ func TestMetrics_Subscriptions(t *testing.T) { core.NginxDetailProcUpdate, core.NginxConfigApplySucceeded, } - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), false) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) } From d9d367fbc13a9f75ce2a6c96f7d24a9032e47033 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 16:26:18 +0100 Subject: [PATCH 16/22] Fix integration tests --- test/performance/metrics_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/performance/metrics_test.go b/test/performance/metrics_test.go index 42509b140..786f40a09 100644 --- a/test/performance/metrics_test.go +++ b/test/performance/metrics_test.go @@ -317,18 +317,17 @@ func startNginxAgent(b *testing.B) { fmt.Printf("Unable to connect to control plane: %v", err) return } - var corePlugins []core.Plugin - corePlugins = append(corePlugins, + corePlugins := []core.Plugin{ plugins.NewConfigReader(loadedConfig), plugins.NewNginx(commander, binary, env, &config.Config{}), plugins.NewCommander(commander, loadedConfig), plugins.NewMetricsSender(reporter), plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.New().String()), "1.0.0"), - plugins.NewMetrics(loadedConfig, env, binary, false), + plugins.NewMetrics(loadedConfig, env, binary), plugins.NewMetricsThrottle(loadedConfig, env), plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env, "1.0.0"), - ) + } messagePipe := core.NewMessagePipe(ctx) err = messagePipe.Register(100, corePlugins, []core.ExtensionPlugin{}) From f7efc7118c87fb0866e9397c0abbdb4ce1253d1c Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 17:08:49 +0100 Subject: [PATCH 17/22] Fix collection of metrics in stream mode --- src/core/metrics/collectors/nginx.go | 2 +- src/plugins/metrics.go | 5 ++--- .../nginx/agent/v2/src/core/metrics/collectors/nginx.go | 2 +- .../vendor/github.com/nginx/agent/v2/src/plugins/metrics.go | 5 ++--- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/core/metrics/collectors/nginx.go b/src/core/metrics/collectors/nginx.go index 84c5c15fc..f2e58ae50 100644 --- a/src/core/metrics/collectors/nginx.go +++ b/src/core/metrics/collectors/nginx.go @@ -52,7 +52,7 @@ func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collec nginxSources = append(nginxSources, sources.NewNginxProcess(dimensions, sources.OSSNamespace, binary)) } - if conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if conf.IsFeatureEnabled(agent_config.FeatureMetrics) || conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { nginxSources = append(nginxSources, sources.NewNginxWorker(dimensions, sources.OSSNamespace, binary, sources.NewNginxWorkerClient(env))) if collectorConf.StubStatus != "" { diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 182ea1e1a..48e210f3e 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -189,10 +189,9 @@ func (m *Metrics) metricsGoroutine() { } } } - m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) default: - log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) + log.Errorf("Error converting metric report: %T", bundlePayload) } } } @@ -247,7 +246,7 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { func (m *Metrics) registerStatsSources() { tempCollectors := make([]metrics.Collector, 0) - if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { tempCollectors = append(tempCollectors, collectors.NewSystemCollector(m.env, m.conf), ) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go index 84c5c15fc..f2e58ae50 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go @@ -52,7 +52,7 @@ func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collec nginxSources = append(nginxSources, sources.NewNginxProcess(dimensions, sources.OSSNamespace, binary)) } - if conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if conf.IsFeatureEnabled(agent_config.FeatureMetrics) || conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { nginxSources = append(nginxSources, sources.NewNginxWorker(dimensions, sources.OSSNamespace, binary, sources.NewNginxWorkerClient(env))) if collectorConf.StubStatus != "" { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 182ea1e1a..48e210f3e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -189,10 +189,9 @@ func (m *Metrics) metricsGoroutine() { } } } - m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) default: - log.Errorf("BUNDLE TYPE COERCION FAILED: %T", bundlePayload) + log.Errorf("Error converting metric report: %T", bundlePayload) } } } @@ -247,7 +246,7 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { func (m *Metrics) registerStatsSources() { tempCollectors := make([]metrics.Collector, 0) - if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { tempCollectors = append(tempCollectors, collectors.NewSystemCollector(m.env, m.conf), ) From 01261107bfce515ad120e51dc21bc0621466f17a Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 17:13:05 +0100 Subject: [PATCH 18/22] Fix throttling feature toggle --- src/plugins/metrics.go | 2 +- .../vendor/github.com/nginx/agent/v2/src/plugins/metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 48e210f3e..77d5d7309 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -175,7 +175,7 @@ func (m *Metrics) metricsGoroutine() { case <-m.ticker.C: stats := m.collectStats() if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { - if m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } else { metricBuffer := make([]core.Payload, 0) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 48e210f3e..77d5d7309 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -175,7 +175,7 @@ func (m *Metrics) metricsGoroutine() { case <-m.ticker.C: stats := m.collectStats() if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { - if m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) } else { metricBuffer := make([]core.Payload, 0) From 7f686a987ab20dec8889dcf3689ba5e700024f76 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 17:34:39 +0100 Subject: [PATCH 19/22] Fix features test --- src/plugins/features_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index 2980be02b..9a1e841b0 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -36,9 +36,9 @@ func TestFeatures_Process(t *testing.T) { numPlugins: 2, }, { - testName: "Metrics", - featureKey: agent_config.FeatureMetrics, - pluginName: agent_config.FeatureMetrics, + testName: "Metrics collection", + featureKey: agent_config.FeatureMetricsCollection, + pluginName: agent_config.FeatureMetricsCollection, numPlugins: 2, }, { @@ -51,7 +51,7 @@ func TestFeatures_Process(t *testing.T) { processID := "12345" - processes := []core.Process{ + processes := []*core.Process{ { Name: processID, IsMaster: true, From 3da6c7fd63b57b418cde06e4f541718e3f7c35ba Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 25 Aug 2023 17:43:26 +0100 Subject: [PATCH 20/22] Fix test and extend coverage --- src/plugins/features_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index 9a1e841b0..323b52734 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -35,10 +35,16 @@ func TestFeatures_Process(t *testing.T) { pluginName: agent_config.FeatureAgentAPI, numPlugins: 2, }, + { + testName: "Metrics", + featureKey: agent_config.FeatureMetrics, + pluginName: agent_config.FeatureMetrics, + numPlugins: 4, + }, { testName: "Metrics collection", featureKey: agent_config.FeatureMetricsCollection, - pluginName: agent_config.FeatureMetricsCollection, + pluginName: agent_config.FeatureMetrics, numPlugins: 2, }, { From b0ea6a742113d6e83d4037016377761f047daf7b Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Mon, 28 Aug 2023 10:18:46 +0100 Subject: [PATCH 21/22] Remove unused logger topics --- src/core/topics.go | 3 --- .../vendor/github.com/nginx/agent/v2/src/core/topics.go | 3 --- .../vendor/github.com/nginx/agent/v2/src/core/topics.go | 3 --- 3 files changed, 9 deletions(-) diff --git a/src/core/topics.go b/src/core/topics.go index e77853886..6f6308c7c 100644 --- a/src/core/topics.go +++ b/src/core/topics.go @@ -40,9 +40,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? - LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go index e77853886..6f6308c7c 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -40,9 +40,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? - LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go index e77853886..6f6308c7c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -40,9 +40,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" // TODO: Never set/used? - LoggerPath = LoggerPrefix + "path" // TODO: Never set/used? DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" From 1625b769879dc6f3dc307db44ddf8fadbd6e74c1 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Mon, 28 Aug 2023 10:49:31 +0100 Subject: [PATCH 22/22] Remove trailing async config feature --- src/plugins/features.go | 14 -------------- .../nginx/agent/v2/src/plugins/features.go | 14 -------------- 2 files changed, 28 deletions(-) diff --git a/src/plugins/features.go b/src/plugins/features.go index 64b00626a..7a17686a3 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -118,20 +118,6 @@ func (f *Features) Process(msg *core.Message) { } } -func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureNginxConfigAsync) { - conf, err := config.GetConfig(f.conf.ClientID) - if err != nil { - log.Warnf("Unable to get agent config, %v", err) - } - f.conf = conf - - nginx := NewNginx(f.commander, f.binary, f.env, f.conf) - return []core.Plugin{nginx} - } - return []core.Plugin{} -} - func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index 64b00626a..7a17686a3 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -118,20 +118,6 @@ func (f *Features) Process(msg *core.Message) { } } -func (f *Features) enableNginxConfigAsyncFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureNginxConfigAsync) { - conf, err := config.GetConfig(f.conf.ClientID) - if err != nil { - log.Warnf("Unable to get agent config, %v", err) - } - f.conf = conf - - nginx := NewNginx(f.commander, f.binary, f.env, f.conf) - return []core.Plugin{nginx} - } - return []core.Plugin{} -} - func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {