Skip to content

Commit

Permalink
Add metric sender feature (#453)
Browse files Browse the repository at this point in the history
* Add metric-sender feature so it is now possible to enable/disable as desired
* Add metric-collection feature
* Add metric feature which includes metrics-collection, metrics-throttle and metrics-sender features
* Improve metric plugin can so it can now stream metric reports directly to metric-sender
* Remove unused logger topics
* Linting
* Lint imports
  • Loading branch information
Dean-Coakley authored Aug 28, 2023
1 parent 3e8efee commit 5329b27
Show file tree
Hide file tree
Showing 27 changed files with 264 additions and 138 deletions.
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env *
}
}

if reporter != nil {
if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender) && reporter != nil {
corePlugins = append(corePlugins,
plugins.NewMetricsSender(reporter),
)
Expand All @@ -239,11 +239,12 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env *
corePlugins = append(corePlugins, plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), version))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) {
if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) ||
(len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) {
corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) {
if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) {
corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env))
}

Expand Down
5 changes: 2 additions & 3 deletions sdk/agent/config/config_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ const (
FeatureNginxSSLConfig = "nginx-ssl-config"
FeatureNginxCounting = "nginx-counting"
FeatureMetrics = "metrics"
FeatureMetricsCollection = "metrics-collection"
FeatureMetricsSender = "metrics-sender"
FeatureMetricsThrottle = "metrics-throttle"
FeatureDataPlaneStatus = "dataplane-status"
FeatureProcessWatcher = "process-watcher"
FeatureFileWatcher = "file-watcher"
FeatureFileWatcherThrottle = "file-watch-throttle"
FeatureActivityEvents = "activity-events"
FeatureAgentAPI = "agent-api"
FeatureMetricSender = "metric-sender"

CommanderPlugin = "commander"
ConfigReaderPlugin = "config-reader-plugin"
Expand Down Expand Up @@ -76,7 +77,6 @@ func GetDefaultFeatures() []string {
FeatureNginxSSLConfig,
FeatureNginxCounting,
FeatureMetrics,
FeatureMetricsThrottle,
FeatureDataPlaneStatus,
FeatureProcessWatcher,
FeatureFileWatcher,
Expand All @@ -96,7 +96,6 @@ func DecodeConfig[T interface{}](input interface{}) (output T, err error) {
}

err = decoder.Decode(input)

if err != nil {
return output, err
}
Expand Down
3 changes: 1 addition & 2 deletions sdk/config_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import (
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/nginx/agent/sdk/v2/backoff"
filesSDK "github.com/nginx/agent/sdk/v2/files"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/sdk/v2/zip"

crossplane "github.com/nginxinc/nginx-go-crossplane"
log "github.com/sirupsen/logrus"
)

const (
Expand Down
6 changes: 3 additions & 3 deletions src/core/metrics/collectors/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"context"
"sync"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/metrics"
"github.com/nginx/agent/v2/src/core/metrics/sources"
log "github.com/sirupsen/logrus"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
log "github.com/sirupsen/logrus"
)

type NginxCollector struct {
Expand Down Expand Up @@ -52,7 +52,7 @@ func buildSources(dimensions *metrics.CommonDim, binary core.NginxBinary, collec
nginxSources = append(nginxSources, sources.NewNginxProcess(dimensions, sources.OSSNamespace, binary))
}

if conf.IsFeatureEnabled(agent_config.FeatureMetrics) {
if conf.IsFeatureEnabled(agent_config.FeatureMetrics) || conf.IsFeatureEnabled(agent_config.FeatureMetricsCollection) {
nginxSources = append(nginxSources, sources.NewNginxWorker(dimensions, sources.OSSNamespace, binary, sources.NewNginxWorkerClient(env)))

if collectorConf.StubStatus != "" {
Expand Down
3 changes: 0 additions & 3 deletions src/core/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ const (
AgentConfigChanged = "agent.config.changed"
AgentCollectorsUpdate = "agent.collectors.update"
MetricReport = "metrics.report"
LoggerPrefix = "logger."
LoggerLevel = LoggerPrefix + "level"
LoggerPath = LoggerPrefix + "path"
DataplaneChanged = "dataplane.changed"
DataplaneFilesChanged = "dataplane.fileschanged"
Events = "events"
Expand Down
87 changes: 68 additions & 19 deletions src/plugins/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
package plugins

import (
"github.com/google/uuid"
agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/sdk/v2/client"
sdkGRPC "github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"

"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -54,6 +55,12 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) {
agent_config.FeatureMetricsThrottle: func(data string) []core.Plugin {
return f.enableMetricsThrottleFeature(data)
},
agent_config.FeatureMetricsSender: func(data string) []core.Plugin {
return f.enableMetricsSenderFeature(data)
},
agent_config.FeatureMetricsCollection: func(data string) []core.Plugin {
return f.enableMetricsCollectionFeature(data)
},
agent_config.FeatureDataPlaneStatus: func(data string) []core.Plugin {
return f.enableDataPlaneStatusFeature(data)
},
Expand Down Expand Up @@ -97,12 +104,12 @@ func (f *Features) Process(msg *core.Message) {
if initFeature, ok := f.featureMap[feature]; ok {
featurePlugins := initFeature(feature)
plugins = append(plugins, featurePlugins...)

}
}

err := f.pipeline.Register(agent_config.DefaultPluginSize, plugins, nil)
if err != nil {
log.Warnf("Unable to register feature, %v", err)
log.Warnf("Unable to register features: %v", err)
}

for _, plugin := range plugins {
Expand All @@ -113,6 +120,26 @@ func (f *Features) Process(msg *core.Message) {

func (f *Features) enableMetricsFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {

conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

metrics := NewMetrics(f.conf, f.env, f.binary)
metricsThrottle := NewMetricsThrottle(f.conf, f.env)
metricsSender := NewMetricsSender(f.commander)

return []core.Plugin{metrics, metricsThrottle, metricsSender}
}
return []core.Plugin{}
}

func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) {

conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
Expand All @@ -126,47 +153,66 @@ func (f *Features) enableMetricsFeature(data string) []core.Plugin {
return []core.Plugin{}
}

func (f *Features) enableAgentAPIFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) {
func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) {

conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

api := NewAgentAPI(f.conf, f.env, f.binary)
metricsThrottle := NewMetricsThrottle(f.conf, f.env)

return []core.Plugin{api}
return []core.Plugin{metricsThrottle}
}
return []core.Plugin{}
}

func (f *Features) enableRegistrationFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) {
func (f *Features) enableMetricsSenderFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) {

conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version)
metricsSender := NewMetricsSender(f.commander)

return []core.Plugin{registration}
return []core.Plugin{metricsSender}
}
return []core.Plugin{}
}

func (f *Features) enableMetricsThrottleFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) {
func (f *Features) enableAgentAPIFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) {
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

metricsThrottle := NewMetricsThrottle(f.conf, f.env)
api := NewAgentAPI(f.conf, f.env, f.binary)

return []core.Plugin{metricsThrottle}
return []core.Plugin{api}
}
return []core.Plugin{}
}

func (f *Features) enableRegistrationFeature(data string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) {
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version)

return []core.Plugin{registration}
}
return []core.Plugin{}
}
Expand Down Expand Up @@ -242,17 +288,20 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin {
}
f.conf = conf

if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) {

metrics := NewMetrics(f.conf, f.env, f.binary)

countingPlugins = append(countingPlugins, metrics)
}
nginxCounting := NewNginxCounter(f.conf, f.binary, f.env)

nginxCounting := NewNginxCounter(f.conf, f.binary, f.env)
countingPlugins = append(countingPlugins, nginxCounting)

return countingPlugins
}
return countingPlugins
return []core.Plugin{}
}
return countingPlugins
return []core.Plugin{}
}
27 changes: 17 additions & 10 deletions src/plugins/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@ import (
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
tutils "github.com/nginx/agent/v2/test/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestFeatures_Process(t *testing.T) {
processID := "12345"

processes := []*core.Process{
{
Name: processID,
IsMaster: true,
},
}

testCases := []struct {
testName string
featureKey string
Expand All @@ -47,6 +39,12 @@ func TestFeatures_Process(t *testing.T) {
testName: "Metrics",
featureKey: agent_config.FeatureMetrics,
pluginName: agent_config.FeatureMetrics,
numPlugins: 4,
},
{
testName: "Metrics collection",
featureKey: agent_config.FeatureMetricsCollection,
pluginName: agent_config.FeatureMetrics,
numPlugins: 2,
},
{
Expand All @@ -57,6 +55,15 @@ func TestFeatures_Process(t *testing.T) {
},
}

processID := "12345"

processes := []*core.Process{
{
Name: processID,
IsMaster: true,
},
}

detailsMap := map[string]*proto.NginxDetails{
processID: {
ProcessPath: "/path/to/nginx",
Expand Down Expand Up @@ -94,7 +101,7 @@ func TestFeatures_Process(t *testing.T) {
for _, tc := range testCases {
messagePipe := core.SetupMockMessagePipe(t, ctx, []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{})

assert.Equal(t, 1, len(messagePipe.GetPlugins()))
assert.Len(t, messagePipe.GetPlugins(), 1)
assert.Equal(t, agent_config.FeaturesPlugin, messagePipe.GetPlugins()[0].Info().Name())

messagePipe.Process(core.NewMessage(core.EnableFeature, []string{tc.featureKey}))
Expand Down
Loading

0 comments on commit 5329b27

Please sign in to comment.