diff --git a/main.go b/main.go index 32df876ec..aedae9f34 100644 --- a/main.go +++ b/main.go @@ -222,7 +222,7 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env * } } - if reporter != nil { + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender) && reporter != nil { corePlugins = append(corePlugins, plugins.NewMetricsSender(reporter), ) @@ -239,11 +239,12 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env * corePlugins = append(corePlugins, plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), version)) } - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || + (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary)) } - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env)) } diff --git a/sdk/agent/config/config_helpers.go b/sdk/agent/config/config_helpers.go index 8257c81a8..4d4e39520 100644 --- a/sdk/agent/config/config_helpers.go +++ b/sdk/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" @@ -76,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -96,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/sdk/config_helpers.go b/sdk/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/sdk/config_helpers.go +++ b/sdk/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/src/core/metrics/collectors/nginx.go b/src/core/metrics/collectors/nginx.go index 803822ff5..f2e58ae50 100644 --- a/src/core/metrics/collectors/nginx.go +++ b/src/core/metrics/collectors/nginx.go @@ -11,13 +11,13 @@ import ( "context" "sync" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/sources" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" ) type NginxCollector struct { @@ -52,7 +52,7 @@ func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collec nginxSources = append(nginxSources, sources.NewNginxProcess(dimensions, sources.OSSNamespace, binary)) } - if conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if conf.IsFeatureEnabled(agent_config.FeatureMetrics) || conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { nginxSources = append(nginxSources, sources.NewNginxWorker(dimensions, sources.OSSNamespace, binary, sources.NewNginxWorkerClient(env))) if collectorConf.StubStatus != "" { diff --git a/src/core/topics.go b/src/core/topics.go index 52c1b5e15..6f6308c7c 100644 --- a/src/core/topics.go +++ b/src/core/topics.go @@ -40,9 +40,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" - LoggerPath = LoggerPrefix + "path" DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/src/plugins/features.go b/src/plugins/features.go index db644b838..7a17686a3 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -8,12 +8,13 @@ package plugins import ( - "github.com/google/uuid" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" + + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) @@ -54,6 +55,12 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, + agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + return f.enableMetricsSenderFeature(data) + }, + agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { + return f.enableMetricsCollectionFeature(data) + }, agent_config.FeatureDataPlaneStatus: func(data string) []core.Plugin { return f.enableDataPlaneStatusFeature(data) }, @@ -97,12 +104,12 @@ func (f *Features) Process(msg *core.Message) { if initFeature, ok := f.featureMap[feature]; ok { featurePlugins := initFeature(feature) plugins = append(plugins, featurePlugins...) - } } + err := f.pipeline.Register(agent_config.DefaultPluginSize, plugins, nil) if err != nil { - log.Warnf("Unable to register feature, %v", err) + log.Warnf("Unable to register features: %v", err) } for _, plugin := range plugins { @@ -113,6 +120,26 @@ func (f *Features) Process(msg *core.Message) { func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { + + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + metrics := NewMetrics(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) + metricsSender := NewMetricsSender(f.commander) + + return []core.Plugin{metrics, metricsThrottle, metricsSender} + } + return []core.Plugin{} +} + +func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) @@ -126,47 +153,66 @@ func (f *Features) enableMetricsFeature(data string) []core.Plugin { return []core.Plugin{} } -func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { +func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - api := NewAgentAPI(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) - return []core.Plugin{api} + return []core.Plugin{metricsThrottle} } return []core.Plugin{} } -func (f *Features) enableRegistrationFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { +func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + metricsSender := NewMetricsSender(f.commander) - return []core.Plugin{registration} + return []core.Plugin{metricsSender} } return []core.Plugin{} } -func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { +func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - metricsThrottle := NewMetricsThrottle(f.conf, f.env) + api := NewAgentAPI(f.conf, f.env, f.binary) - return []core.Plugin{metricsThrottle} + return []core.Plugin{api} + } + return []core.Plugin{} +} + +func (f *Features) enableRegistrationFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + + return []core.Plugin{registration} } return []core.Plugin{} } @@ -242,17 +288,20 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { } f.conf = conf - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + metrics := NewMetrics(f.conf, f.env, f.binary) + countingPlugins = append(countingPlugins, metrics) } - nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) + nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) countingPlugins = append(countingPlugins, nginxCounting) return countingPlugins } - return countingPlugins + return []core.Plugin{} } - return countingPlugins + return []core.Plugin{} } diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index 0569758bb..323b52734 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -17,20 +17,12 @@ import ( "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" tutils "github.com/nginx/agent/v2/test/utils" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) func TestFeatures_Process(t *testing.T) { - processID := "12345" - - processes := []*core.Process{ - { - Name: processID, - IsMaster: true, - }, - } - testCases := []struct { testName string featureKey string @@ -47,6 +39,12 @@ func TestFeatures_Process(t *testing.T) { testName: "Metrics", featureKey: agent_config.FeatureMetrics, pluginName: agent_config.FeatureMetrics, + numPlugins: 4, + }, + { + testName: "Metrics collection", + featureKey: agent_config.FeatureMetricsCollection, + pluginName: agent_config.FeatureMetrics, numPlugins: 2, }, { @@ -57,6 +55,15 @@ func TestFeatures_Process(t *testing.T) { }, } + processID := "12345" + + processes := []*core.Process{ + { + Name: processID, + IsMaster: true, + }, + } + detailsMap := map[string]*proto.NginxDetails{ processID: { ProcessPath: "/path/to/nginx", @@ -94,7 +101,7 @@ func TestFeatures_Process(t *testing.T) { for _, tc := range testCases { messagePipe := core.SetupMockMessagePipe(t, ctx, []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) - assert.Equal(t, 1, len(messagePipe.GetPlugins())) + assert.Len(t, messagePipe.GetPlugins(), 1) assert.Equal(t, agent_config.FeaturesPlugin, messagePipe.GetPlugins()[0].Info().Name()) messagePipe.Process(core.NewMessage(core.EnableFeature, []string{tc.featureKey})) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 098bf9e88..77d5d7309 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -12,16 +12,15 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/collectors" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type Metrics struct { @@ -164,6 +163,7 @@ func (m *Metrics) metricsGoroutine() { defer m.wg.Done() log.Info("Metrics waiting for handshake to be completed") m.registerStatsSources() + for { select { case <-m.ctx.Done(): @@ -174,9 +174,28 @@ func (m *Metrics) metricsGoroutine() { return case <-m.ticker.C: stats := m.collectStats() - if bundle := metrics.GenerateMetricsReportBundle(stats); bundle != nil { - m.pipeline.Process(core.NewMessage(core.MetricReport, bundle)) + if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) + } else { + metricBuffer := make([]core.Payload, 0) + + switch bundle := bundlePayload.(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + metricBuffer = append(metricBuffer, report) + } + } + } + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) + default: + log.Errorf("Error converting metric report: %T", bundlePayload) + } + } } + if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) m.collectorsUpdate.Store(false) @@ -227,7 +246,7 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { func (m *Metrics) registerStatsSources() { tempCollectors := make([]metrics.Collector, 0) - if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { tempCollectors = append(tempCollectors, collectors.NewSystemCollector(m.env, m.conf), ) diff --git a/src/plugins/metrics_sender.go b/src/plugins/metrics_sender.go index a5933f24c..38bb2af93 100644 --- a/src/plugins/metrics_sender.go +++ b/src/plugins/metrics_sender.go @@ -11,15 +11,15 @@ import ( "context" "strings" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" "github.com/nginx/agent/sdk/v2/proto" models "github.com/nginx/agent/sdk/v2/proto/events" "github.com/nginx/agent/v2/src/core" + + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type MetricsSender struct { @@ -55,7 +55,7 @@ func (r *MetricsSender) Close() { } func (r *MetricsSender) Info() *core.Info { - return core.NewInfo(agent_config.FeatureMetricSender, "v0.0.1") + return core.NewInfo(agent_config.FeatureMetricsSender, "v0.0.1") } func (r *MetricsSender) Process(msg *core.Message) { diff --git a/src/plugins/metrics_sender_test.go b/src/plugins/metrics_sender_test.go index 156444601..3ecf51874 100644 --- a/src/plugins/metrics_sender_test.go +++ b/src/plugins/metrics_sender_test.go @@ -14,14 +14,14 @@ import ( "testing" "time" - "github.com/gogo/protobuf/types" "github.com/nginx/agent/sdk/v2/backoff" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" tutils "github.com/nginx/agent/v2/test/utils" + + "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestMetricsSenderSendMetrics(t *testing.T) { diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index bde794c13..64c943933 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -12,15 +12,15 @@ import ( "sort" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/collectors" tutils "github.com/nginx/agent/v2/test/utils" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) var ( @@ -215,7 +215,7 @@ func TestMetrics_Process_AgentConfigChanged(t *testing.T) { updatedTags: true, }, { - testName: "NoValuesToUpate", + testName: "NoValuesToUpdate", config: &config.Config{ ClientID: "12345", Tags: tutils.InitialConfTags, diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 9c6e90fd1..ee0cf6bd5 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -12,16 +12,15 @@ import ( "sync" "time" - "github.com/gogo/protobuf/types" - - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" + + "github.com/gogo/protobuf/types" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) const ( @@ -88,6 +87,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.syncAgentConfigChange() r.collectorsUpdate.Store(true) return + case msg.Exact(core.MetricReport): if r.metricsAggregation { switch bundle := msg.Data().(type) { @@ -133,7 +133,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } func (r *MetricsThrottle) Subscriptions() []string { - return []string{core.MetricReport, core.AgentConfigChanged, core.LoggerLevel} + return []string{core.MetricReport, core.AgentConfigChanged} } func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.WaitGroup) { @@ -204,5 +204,5 @@ func (r *MetricsThrottle) getAggregatedReports() (reports []core.Payload) { } } - return + return reports } diff --git a/src/plugins/metrics_throlling_test.go b/src/plugins/metrics_throlling_test.go index 2e7496ef7..96d411b51 100644 --- a/src/plugins/metrics_throlling_test.go +++ b/src/plugins/metrics_throlling_test.go @@ -11,14 +11,13 @@ import ( "context" "testing" - "github.com/nginx/agent/v2/src/core/metrics" - "github.com/nginx/agent/sdk/v2/proto" - "github.com/stretchr/testify/assert" - "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" + "github.com/nginx/agent/v2/src/core/metrics" tutils "github.com/nginx/agent/v2/test/utils" + + "github.com/stretchr/testify/assert" ) func TestMetricsThrottle_Process(t *testing.T) { @@ -133,7 +132,7 @@ func TestMetricsThrottle_Process(t *testing.T) { } func TestMetricsThrottle_Subscriptions(t *testing.T) { - subs := []string{core.MetricReport, core.AgentConfigChanged, core.LoggerLevel} + subs := []string{core.MetricReport, core.AgentConfigChanged} pluginUnderTest := NewMetricsThrottle(tutils.GetMockAgentConfig(), &tutils.MockEnvironment{}) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 8257c81a8..4d4e39520 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" @@ -76,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -96,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go index 52c1b5e15..6f6308c7c 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -40,9 +40,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" - LoggerPath = LoggerPrefix + "path" DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/test/performance/metrics_test.go b/test/performance/metrics_test.go index 5debaa095..786f40a09 100644 --- a/test/performance/metrics_test.go +++ b/test/performance/metrics_test.go @@ -317,9 +317,8 @@ func startNginxAgent(b *testing.B) { fmt.Printf("Unable to connect to control plane: %v", err) return } - var corePlugins []core.Plugin - corePlugins = append(corePlugins, + corePlugins := []core.Plugin{ plugins.NewConfigReader(loadedConfig), plugins.NewNginx(commander, binary, env, &config.Config{}), plugins.NewCommander(commander, loadedConfig), @@ -328,7 +327,7 @@ func startNginxAgent(b *testing.B) { plugins.NewMetrics(loadedConfig, env, binary), plugins.NewMetricsThrottle(loadedConfig, env), plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env, "1.0.0"), - ) + } messagePipe := core.NewMessagePipe(ctx) err = messagePipe.Register(100, corePlugins, []core.ExtensionPlugin{}) diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 8257c81a8..4d4e39520 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" @@ -76,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -96,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0e3205ae4..eccc5b749 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -23,14 +23,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go index 803822ff5..f2e58ae50 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/nginx.go @@ -11,13 +11,13 @@ import ( "context" "sync" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/sources" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" ) type NginxCollector struct { @@ -52,7 +52,7 @@ func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collec nginxSources = append(nginxSources, sources.NewNginxProcess(dimensions, sources.OSSNamespace, binary)) } - if conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if conf.IsFeatureEnabled(agent_config.FeatureMetrics) || conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { nginxSources = append(nginxSources, sources.NewNginxWorker(dimensions, sources.OSSNamespace, binary, sources.NewNginxWorkerClient(env))) if collectorConf.StubStatus != "" { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go index 52c1b5e15..6f6308c7c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -40,9 +40,6 @@ const ( AgentConfigChanged = "agent.config.changed" AgentCollectorsUpdate = "agent.collectors.update" MetricReport = "metrics.report" - LoggerPrefix = "logger." - LoggerLevel = LoggerPrefix + "level" - LoggerPath = LoggerPrefix + "path" DataplaneChanged = "dataplane.changed" DataplaneFilesChanged = "dataplane.fileschanged" Events = "events" diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index db644b838..7a17686a3 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -8,12 +8,13 @@ package plugins import ( - "github.com/google/uuid" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" + + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) @@ -54,6 +55,12 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) { agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin { return f.enableMetricsThrottleFeature(data) }, + agent_config.FeatureMetricsSender: func(data string) []core.Plugin { + return f.enableMetricsSenderFeature(data) + }, + agent_config.FeatureMetricsCollection: func(data string) []core.Plugin { + return f.enableMetricsCollectionFeature(data) + }, agent_config.FeatureDataPlaneStatus: func(data string) []core.Plugin { return f.enableDataPlaneStatusFeature(data) }, @@ -97,12 +104,12 @@ func (f *Features) Process(msg *core.Message) { if initFeature, ok := f.featureMap[feature]; ok { featurePlugins := initFeature(feature) plugins = append(plugins, featurePlugins...) - } } + err := f.pipeline.Register(agent_config.DefaultPluginSize, plugins, nil) if err != nil { - log.Warnf("Unable to register feature, %v", err) + log.Warnf("Unable to register features: %v", err) } for _, plugin := range plugins { @@ -113,6 +120,26 @@ func (f *Features) Process(msg *core.Message) { func (f *Features) enableMetricsFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { + + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + metrics := NewMetrics(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) + metricsSender := NewMetricsSender(f.commander) + + return []core.Plugin{metrics, metricsThrottle, metricsSender} + } + return []core.Plugin{} +} + +func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) @@ -126,47 +153,66 @@ func (f *Features) enableMetricsFeature(data string) []core.Plugin { return []core.Plugin{} } -func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { +func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - api := NewAgentAPI(f.conf, f.env, f.binary) + metricsThrottle := NewMetricsThrottle(f.conf, f.env) - return []core.Plugin{api} + return []core.Plugin{metricsThrottle} } return []core.Plugin{} } -func (f *Features) enableRegistrationFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { +func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) { + conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + metricsSender := NewMetricsSender(f.commander) - return []core.Plugin{registration} + return []core.Plugin{metricsSender} } return []core.Plugin{} } -func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin { - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) { +func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) { conf, err := config.GetConfig(f.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) } f.conf = conf - metricsThrottle := NewMetricsThrottle(f.conf, f.env) + api := NewAgentAPI(f.conf, f.env, f.binary) - return []core.Plugin{metricsThrottle} + return []core.Plugin{api} + } + return []core.Plugin{} +} + +func (f *Features) enableRegistrationFeature(data string) []core.Plugin { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) { + conf, err := config.GetConfig(f.conf.ClientID) + if err != nil { + log.Warnf("Unable to get agent config, %v", err) + } + f.conf = conf + + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + + return []core.Plugin{registration} } return []core.Plugin{} } @@ -242,17 +288,20 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { } f.conf = conf - if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) { + if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && + !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { + metrics := NewMetrics(f.conf, f.env, f.binary) + countingPlugins = append(countingPlugins, metrics) } - nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) + nginxCounting := NewNginxCounter(f.conf, f.binary, f.env) countingPlugins = append(countingPlugins, nginxCounting) return countingPlugins } - return countingPlugins + return []core.Plugin{} } - return countingPlugins + return []core.Plugin{} } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 098bf9e88..77d5d7309 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -12,16 +12,15 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" + agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" "github.com/nginx/agent/v2/src/core/metrics/collectors" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type Metrics struct { @@ -164,6 +163,7 @@ func (m *Metrics) metricsGoroutine() { defer m.wg.Done() log.Info("Metrics waiting for handshake to be completed") m.registerStatsSources() + for { select { case <-m.ctx.Done(): @@ -174,9 +174,28 @@ func (m *Metrics) metricsGoroutine() { return case <-m.ticker.C: stats := m.collectStats() - if bundle := metrics.GenerateMetricsReportBundle(stats); bundle != nil { - m.pipeline.Process(core.NewMessage(core.MetricReport, bundle)) + if bundlePayload := metrics.GenerateMetricsReportBundle(stats); bundlePayload != nil { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + m.pipeline.Process(core.NewMessage(core.MetricReport, bundlePayload)) + } else { + metricBuffer := make([]core.Payload, 0) + + switch bundle := bundlePayload.(type) { + case *metrics.MetricsReportBundle: + if len(bundle.Data) > 0 { + for _, report := range bundle.Data { + if len(report.Data) > 0 { + metricBuffer = append(metricBuffer, report) + } + } + } + m.pipeline.Process(core.NewMessage(core.CommMetrics, metricBuffer)) + default: + log.Errorf("Error converting metric report: %T", bundlePayload) + } + } } + if m.collectorsUpdate.Load() { m.ticker = time.NewTicker(m.conf.AgentMetrics.CollectionInterval) m.collectorsUpdate.Store(false) @@ -227,7 +246,7 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) { func (m *Metrics) registerStatsSources() { tempCollectors := make([]metrics.Collector, 0) - if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) { + if m.conf.IsFeatureEnabled(agent_config.FeatureMetrics) || m.conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) { tempCollectors = append(tempCollectors, collectors.NewSystemCollector(m.env, m.conf), ) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go index a5933f24c..38bb2af93 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go @@ -11,15 +11,15 @@ import ( "context" "strings" - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - "github.com/nginx/agent/sdk/v2" agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/client" "github.com/nginx/agent/sdk/v2/proto" models "github.com/nginx/agent/sdk/v2/proto/events" "github.com/nginx/agent/v2/src/core" + + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type MetricsSender struct { @@ -55,7 +55,7 @@ func (r *MetricsSender) Close() { } func (r *MetricsSender) Info() *core.Info { - return core.NewInfo(agent_config.FeatureMetricSender, "v0.0.1") + return core.NewInfo(agent_config.FeatureMetricsSender, "v0.0.1") } func (r *MetricsSender) Process(msg *core.Message) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 9c6e90fd1..ee0cf6bd5 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -12,16 +12,15 @@ import ( "sync" "time" - "github.com/gogo/protobuf/types" - - log "github.com/sirupsen/logrus" - "go.uber.org/atomic" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/metrics" + + "github.com/gogo/protobuf/types" + log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) const ( @@ -88,6 +87,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { r.syncAgentConfigChange() r.collectorsUpdate.Store(true) return + case msg.Exact(core.MetricReport): if r.metricsAggregation { switch bundle := msg.Data().(type) { @@ -133,7 +133,7 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } func (r *MetricsThrottle) Subscriptions() []string { - return []string{core.MetricReport, core.AgentConfigChanged, core.LoggerLevel} + return []string{core.MetricReport, core.AgentConfigChanged} } func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.WaitGroup) { @@ -204,5 +204,5 @@ func (r *MetricsThrottle) getAggregatedReports() (reports []core.Payload) { } } - return + return reports } diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go index 8257c81a8..4d4e39520 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/config/config_helpers.go @@ -24,6 +24,8 @@ const ( FeatureNginxSSLConfig = "nginx-ssl-config" FeatureNginxCounting = "nginx-counting" FeatureMetrics = "metrics" + FeatureMetricsCollection = "metrics-collection" + FeatureMetricsSender = "metrics-sender" FeatureMetricsThrottle = "metrics-throttle" FeatureDataPlaneStatus = "dataplane-status" FeatureProcessWatcher = "process-watcher" @@ -31,7 +33,6 @@ const ( FeatureFileWatcherThrottle = "file-watch-throttle" FeatureActivityEvents = "activity-events" FeatureAgentAPI = "agent-api" - FeatureMetricSender = "metric-sender" CommanderPlugin = "commander" ConfigReaderPlugin = "config-reader-plugin" @@ -76,7 +77,6 @@ func GetDefaultFeatures() []string { FeatureNginxSSLConfig, FeatureNginxCounting, FeatureMetrics, - FeatureMetricsThrottle, FeatureDataPlaneStatus, FeatureProcessWatcher, FeatureFileWatcher, @@ -96,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) { } err = decoder.Decode(input) - if err != nil { return output, err } diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 1a68ea0b6..eccc5b749 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -298,8 +298,8 @@ func updateNginxConfigWithCert( directoryMap *DirectoryMap, allowedDirectories map[string]struct{}, ) error { - if strings.HasPrefix("$", file) { - // variable loading, not actual cert file + if strings.Contains(file, "$") { + // cannot process any filepath with variables return nil }