diff --git a/main.go b/main.go index d541742e1..eea2e7d0c 100644 --- a/main.go +++ b/main.go @@ -10,26 +10,19 @@ package main import ( "context" "os" - "os/signal" - "runtime" "strconv" - "strings" - "syscall" - "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - "github.com/nginx/agent/sdk/v2/client" + "github.com/nginx/agent/sdk/v2/agent/events" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/logger" - "github.com/nginx/agent/v2/src/extensions" "github.com/nginx/agent/v2/src/plugins" - "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "google.golang.org/grpc" ) var ( @@ -39,19 +32,7 @@ var ( ) func init() { - config.SetVersion(version, commit) - config.SetDefaults() - config.RegisterFlags() - dynamicConfigPath := config.DynamicConfigFileAbsPath - if runtime.GOOS == "freebsd" { - dynamicConfigPath = config.DynamicConfigFileAbsFreeBsdPath - } - configPath, err := config.RegisterConfigFile(dynamicConfigPath, config.ConfigFileName, config.ConfigFilePaths()...) - if err != nil { - log.Fatalf("Failed to load configuration file: %v", err) - } - log.Debugf("Configuration file loaded %v", configPath) - config.Viper.Set(config.ConfigPathKey, configPath) + config.InitConfiguration(version, commit) } func main() { @@ -83,7 +64,7 @@ func main() { version, commit, os.Getpid(), loadedConfig.ClientID, loadedConfig.DisplayName, loadedConfig.Features) sdkGRPC.InitMeta(loadedConfig.ClientID, loadedConfig.CloudAccountID) - controller, commander, reporter := createGrpcClients(ctx, loadedConfig) + controller, commander, reporter := core.CreateGrpcClients(ctx, loadedConfig) if controller != nil { if err := controller.Connect(); err != nil { @@ -94,15 +75,21 @@ func main() { binary := core.NewNginxBinary(env, loadedConfig) - corePlugins, extensionPlugins := loadPlugins(commander, binary, env, reporter, loadedConfig) + corePlugins, extensionPlugins := plugins.LoadPlugins(commander, binary, env, reporter, loadedConfig) - pipe := initializeMessagePipe(ctx, corePlugins, extensionPlugins) + pipe := core.InitializePipe(ctx, corePlugins, extensionPlugins, agent_config.DefaultPluginSize) - pipe.Process(core.NewMessage(core.AgentStarted, - plugins.NewAgentEventMeta(version, strconv.Itoa(os.Getpid()))), - ) + event := events.NewAgentEventMeta( + config.MODULE, + version, + strconv.Itoa(os.Getpid()), + env.GetHostname(), + env.GetSystemUUID(), + loadedConfig.InstanceGroup, + loadedConfig.Tags) - handleSignals(ctx, commander, loadedConfig, env, pipe, cancel, controller) + pipe.Process(core.NewMessage(core.AgentStarted, event)) + core.HandleSignals(ctx, commander, loadedConfig, env, pipe, cancel, controller) pipe.Run() }) @@ -111,213 +98,3 @@ func main() { log.Fatal(err) } } - -// handleSignals handles signals to attempt graceful shutdown -// for now it also handles sending the agent stopped event because as of today we don't have a mechanism for synchronizing -// tasks between multiple plugins from outside a plugin -func handleSignals( - ctx context.Context, - cmder client.Commander, - loadedConfig *config.Config, - env core.Environment, - pipe core.MessagePipeInterface, - cancel context.CancelFunc, - controller client.Controller, -) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - go func() { - select { - case <-sigChan: - stopCmd := plugins.GenerateAgentStopEventCommand( - plugins.NewAgentEventMeta(version, strconv.Itoa(os.Getpid())), loadedConfig, env, - ) - log.Debugf("Sending agent stopped event: %v", stopCmd) - - if cmder == nil { - log.Warn("Command channel not configured. Skipping sending AgentStopped event") - } else if err := cmder.Send(ctx, client.MessageFromCommand(stopCmd)); err != nil { - log.Errorf("Error sending AgentStopped event to command channel: %v", err) - } - - if controller != nil { - if err := controller.Close(); err != nil { - log.Warnf("Unable to close controller: %v", err) - } - } - - log.Warn("NGINX Agent exiting") - cancel() - - timeout := time.Second * 5 - time.Sleep(timeout) - log.Fatalf("Failed to gracefully shutdown within timeout of %v. Exiting", timeout) - case <-ctx.Done(): - } - }() -} - -func createGrpcClients(ctx context.Context, loadedConfig *config.Config) (client.Controller, client.Commander, client.MetricReporter) { - if !loadedConfig.IsGrpcServerConfigured() { - log.Info("GRPC clients not created due to missing server config") - return nil, nil, nil - } - - grpcDialOptions := setDialOptions(loadedConfig) - secureMetricsDialOpts, err := sdkGRPC.SecureDialOptions( - loadedConfig.TLS.Enable, - loadedConfig.TLS.Cert, - loadedConfig.TLS.Key, - loadedConfig.TLS.Ca, - loadedConfig.Server.Metrics, - loadedConfig.TLS.SkipVerify) - if err != nil { - log.Fatalf("Failed to load secure metric gRPC dial options: %v", err) - } - - secureCmdDialOpts, err := sdkGRPC.SecureDialOptions( - loadedConfig.TLS.Enable, - loadedConfig.TLS.Cert, - loadedConfig.TLS.Key, - loadedConfig.TLS.Ca, - loadedConfig.Server.Command, - loadedConfig.TLS.SkipVerify) - if err != nil { - log.Fatalf("Failed to load secure command gRPC dial options: %v", err) - } - - controller := client.NewClientController() - controller.WithContext(ctx) - commander := client.NewCommanderClient() - commander.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) - - commander.WithServer(loadedConfig.Server.Target) - commander.WithDialOptions(append(grpcDialOptions, secureCmdDialOpts)...) - - reporter := client.NewMetricReporterClient() - reporter.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) - reporter.WithServer(loadedConfig.Server.Target) - reporter.WithDialOptions(append(grpcDialOptions, secureMetricsDialOpts)...) - - controller.WithClient(commander) - controller.WithClient(reporter) - - return controller, commander, reporter -} - -func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env *core.EnvironmentType, reporter client.MetricReporter, loadedConfig *config.Config) ([]core.Plugin, []core.ExtensionPlugin) { - var corePlugins []core.Plugin - var extensionPlugins []core.ExtensionPlugin - - if commander != nil { - corePlugins = append(corePlugins, - plugins.NewCommander(commander, loadedConfig), - ) - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureFileWatcher) { - corePlugins = append(corePlugins, - plugins.NewFileWatcher(loadedConfig, env), - plugins.NewFileWatchThrottle(), - ) - } - } - - if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil { - corePlugins = append(corePlugins, - plugins.NewMetricsSender(reporter), - ) - } - - corePlugins = append(corePlugins, - plugins.NewConfigReader(loadedConfig), - plugins.NewNginx(commander, binary, env, loadedConfig), - plugins.NewExtensions(loadedConfig, env), - plugins.NewFeatures(commander, loadedConfig, env, binary, version), - ) - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) { - corePlugins = append(corePlugins, plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), version)) - } - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || - (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { - corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary)) - } - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { - corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env)) - } - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { - corePlugins = append(corePlugins, plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env, version)) - } - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureProcessWatcher) { - corePlugins = append(corePlugins, plugins.NewProcessWatcher(env, binary)) - } - - if loadedConfig.IsFeatureEnabled(agent_config.FeatureActivityEvents) { - corePlugins = append(corePlugins, plugins.NewEvents(loadedConfig, env, sdkGRPC.NewMessageMeta(uuid.NewString()), binary)) - } - - if loadedConfig.AgentAPI.Port != 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureAgentAPI) { - corePlugins = append(corePlugins, plugins.NewAgentAPI(loadedConfig, env, binary)) - } else { - log.Info("Agent API not configured") - } - - if len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting) { - corePlugins = append(corePlugins, plugins.NewNginxCounter(loadedConfig, binary, env)) - } - - if loadedConfig.Extensions != nil && len(loadedConfig.Extensions) > 0 { - for _, extension := range loadedConfig.Extensions { - switch { - case extension == agent_config.AdvancedMetricsExtensionPlugin: - advancedMetricsExtensionPlugin := extensions.NewAdvancedMetrics(env, loadedConfig, config.Viper.Get(agent_config.AdvancedMetricsExtensionPluginConfigKey)) - extensionPlugins = append(extensionPlugins, advancedMetricsExtensionPlugin) - case extension == agent_config.NginxAppProtectExtensionPlugin: - nginxAppProtectExtensionPlugin, err := extensions.NewNginxAppProtect(loadedConfig, env, config.Viper.Get(agent_config.NginxAppProtectExtensionPluginConfigKey)) - if err != nil { - log.Errorf("Unable to load the Nginx App Protect plugin due to the following error: %v", err) - } else { - extensionPlugins = append(extensionPlugins, nginxAppProtectExtensionPlugin) - } - case extension == agent_config.NginxAppProtectMonitoringExtensionPlugin: - nginxAppProtectMonitoringExtensionPlugin, err := extensions.NewNAPMonitoring(env, loadedConfig, config.Viper.Get(agent_config.NginxAppProtectMonitoringExtensionPluginConfigKey)) - if err != nil { - log.Errorf("Unable to load the Nginx App Protect Monitoring plugin due to the following error: %v", err) - } else { - extensionPlugins = append(extensionPlugins, nginxAppProtectMonitoringExtensionPlugin) - } - case extension == agent_config.PhpFpmMetricsExtensionPlugin: - phpFpmMetricstExtensionPlugin, err := extensions.NewPhpFpmMetrics(env, loadedConfig) - if err != nil { - log.Errorf("Unable to load the PhpFpm Metrics plugin due to the following error: %v", err) - } else { - extensionPlugins = append(extensionPlugins, phpFpmMetricstExtensionPlugin) - } - default: - log.Warnf("unknown extension configured: %s", extension) - } - } - } - - return corePlugins, extensionPlugins -} - -func initializeMessagePipe(ctx context.Context, corePlugins []core.Plugin, extensionPlugins []core.ExtensionPlugin) core.MessagePipeInterface { - pipe := core.NewMessagePipe(ctx) - err := pipe.Register(agent_config.DefaultPluginSize, corePlugins, extensionPlugins) - if err != nil { - log.Warnf("Failed to start agent successfully, error loading plugins %v", err) - } - return pipe -} - -func setDialOptions(loadedConfig *config.Config) []grpc.DialOption { - grpcDialOptions := []grpc.DialOption{grpc.WithUserAgent("nginx-agent/" + strings.TrimPrefix(version, "v"))} - grpcDialOptions = append(grpcDialOptions, sdkGRPC.DefaultClientDialOptions...) - grpcDialOptions = append(grpcDialOptions, sdkGRPC.DataplaneConnectionDialOptions(loadedConfig.Server.Token, sdkGRPC.NewMessageMeta(uuid.NewString()))...) - return grpcDialOptions -} diff --git a/sdk/agent/events/meta.go b/sdk/agent/events/meta.go new file mode 100644 index 000000000..a6943f3a4 --- /dev/null +++ b/sdk/agent/events/meta.go @@ -0,0 +1,203 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package events + +import ( + "fmt" + "strings" + + "github.com/gogo/protobuf/types" + "github.com/google/uuid" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/sdk/v2/proto" + commonProto "github.com/nginx/agent/sdk/v2/proto/common" + eventsProto "github.com/nginx/agent/sdk/v2/proto/events" +) + +type AgentEventMeta struct { + module string + version string + pid string + hostname string + systemUuid string + instanceGroup string + tags string + tagsRaw []string +} + +func NewAgentEventMeta( + module, version, pid, hostname, systemUuid, instanceGroup string, + tags []string, +) *AgentEventMeta { + return &AgentEventMeta{ + module: module, + version: version, + pid: pid, + hostname: hostname, + systemUuid: systemUuid, + instanceGroup: instanceGroup, + tagsRaw: tags, + tags: strings.Join(tags, ","), + } +} + +func (aem *AgentEventMeta) GetVersion() string { + return aem.version +} + +func (aem *AgentEventMeta) GetPid() string { + return aem.pid +} + +func (aem *AgentEventMeta) GenerateAgentStartEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_START_MESSAGE, aem.version, aem.hostname, aem.pid), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) GenerateAgentStopEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_STOP_MESSAGE, aem.version, aem.pid, aem.hostname), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) CreateAgentEvent(timestamp *types.Timestamp, level, message, correlationId, module string) *eventsProto.Event { + activityEvent := aem.CreateActivityEvent(message, "") // blank nginxId, this relates to agent not it's nginx instances + + return &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: correlationId, + Module: module, + Timestamp: timestamp, + EventLevel: level, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } +} + +func (aem *AgentEventMeta) CreateActivityEvent(message string, nginxId string) *eventsProto.ActivityEvent { + activityEvent := &eventsProto.ActivityEvent{ + Message: message, + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + if nginxId != "" { + nginxDim := []*commonProto.Dimension{{Name: "nginx_id", Value: nginxId}} + activityEvent.Dimensions = append(nginxDim, activityEvent.Dimensions...) + } + + return activityEvent +} diff --git a/sdk/agent/events/meta_test.go b/sdk/agent/events/meta_test.go new file mode 100644 index 000000000..40854f436 --- /dev/null +++ b/sdk/agent/events/meta_test.go @@ -0,0 +1,151 @@ +package events + +import ( + "fmt" + "strings" + "testing" + + "github.com/nginx/agent/sdk/v2/proto" + commonProto "github.com/nginx/agent/sdk/v2/proto/common" + eventsProto "github.com/nginx/agent/sdk/v2/proto/events" + "github.com/stretchr/testify/assert" +) + +func TestNewAgentEventMeta(t *testing.T) { + // Create an instance of AgentEventMeta using the constructor + module := "nginx-agent" + version := "v1.0" + pid := "12345" + hostname := "example-host" + systemUuid := "system-uuid" + instanceGroup := "group1" + tags := []string{"tag1", "tag2"} + + meta := NewAgentEventMeta(module, version, pid, hostname, systemUuid, instanceGroup, tags) + + assert.NotNil(t, meta) + + assert.Equal(t, version, meta.version) + assert.Equal(t, pid, meta.pid) + assert.Equal(t, hostname, meta.hostname) + assert.Equal(t, systemUuid, meta.systemUuid) + assert.Equal(t, instanceGroup, meta.instanceGroup) + assert.Equal(t, tags, meta.tagsRaw) + assert.Equal(t, strings.Join(tags, ","), meta.tags) +} + +func TestGenerateAgentStartEventCommand(t *testing.T) { + agentEvent := NewAgentEventMeta( + "agent-module", + "v2.0", + "54321", + "test-host", + "test-uuid", + "group2", + []string{"tag3", "tag4"}, + ) + + expectedActivityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf("%s %s started on %s with pid %s", "nginx-agent", "v2.0", "test-host", "54321"), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: "test-uuid", + }, + { + Name: "hostname", + Value: "test-host", + }, + { + Name: "instance_group", + Value: "group2", + }, + { + Name: "system.tags", + Value: strings.Join([]string{"tag3", "tag4"}, ","), + }, + }, + } + + expected := &eventsProto.EventReport{ + Events: []*eventsProto.Event{ + { + Metadata: &eventsProto.Metadata{ + Module: agentEvent.module, + Type: AGENT_EVENT_TYPE, + Category: CONFIG_CATEGORY, + EventLevel: ERROR_EVENT_LEVEL, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: expectedActivityEvent, + }, + }, + }, + } + + cmd := agentEvent.GenerateAgentStartEventCommand() + assert.NotNil(t, cmd) + assert.NotNil(t, cmd.Meta) + assert.Equal(t, proto.Command_NORMAL, cmd.Type) + assert.NotNil(t, cmd.GetData()) + + assert.Equal(t, expected.GetEvents()[0].GetData(), cmd.GetData().(*proto.Command_EventReport).EventReport.GetEvents()[0].GetData()) +} + +func TestGenerateAgentStopEventCommand(t *testing.T) { + agentEvent := NewAgentEventMeta( + "agent-module", + "v2.0", + "54321", + "test-host", + "test-uuid", + "group2", + []string{"tag3", "tag4"}, + ) + + expectedActivityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf("%s %s (pid: %s) stopped on %s", "nginx-agent", "v2.0", "54321", "test-host"), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: "test-uuid", + }, + { + Name: "hostname", + Value: "test-host", + }, + { + Name: "instance_group", + Value: "group2", + }, + { + Name: "system.tags", + Value: strings.Join([]string{"tag3", "tag4"}, ","), + }, + }, + } + + expected := &eventsProto.EventReport{ + Events: []*eventsProto.Event{ + { + Metadata: &eventsProto.Metadata{ + Module: agentEvent.module, + Type: AGENT_EVENT_TYPE, + Category: CONFIG_CATEGORY, + EventLevel: ERROR_EVENT_LEVEL, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: expectedActivityEvent, + }, + }, + }, + } + + cmd := agentEvent.GenerateAgentStopEventCommand() + assert.NotNil(t, cmd) + assert.NotNil(t, cmd.Meta) + assert.Equal(t, proto.Command_NORMAL, cmd.Type) + assert.NotNil(t, cmd.GetData()) + + assert.Equal(t, expected.GetEvents()[0].GetData(), cmd.GetData().(*proto.Command_EventReport).EventReport.GetEvents()[0].GetData()) +} diff --git a/sdk/agent/events/types.go b/sdk/agent/events/types.go new file mode 100644 index 000000000..8f55f5d34 --- /dev/null +++ b/sdk/agent/events/types.go @@ -0,0 +1,23 @@ +package events + +const ( + // Types + NGINX_EVENT_TYPE = "Nginx" + AGENT_EVENT_TYPE = "Agent" + + // Categories + STATUS_CATEGORY = "Status" + CONFIG_CATEGORY = "Config" + APP_PROTECT_CATEGORY = "AppProtect" + + // Event Levels + INFO_EVENT_LEVEL = "INFO" + DEBUG_EVENT_LEVEL = "DEBUG" + WARN_EVENT_LEVEL = "WARN" + ERROR_EVENT_LEVEL = "ERROR" + CRITICAL_EVENT_LEVEL = "CRITICAL" + + // Messages + AGENT_START_MESSAGE = "nginx-agent %s started on %s with pid %s" + AGENT_STOP_MESSAGE = "nginx-agent %s (pid: %s) stopped on %s" +) diff --git a/sdk/client/controller.go b/sdk/client/controller.go index b89efd01b..7117269d0 100644 --- a/sdk/client/controller.go +++ b/sdk/client/controller.go @@ -18,6 +18,7 @@ func NewClientController() Controller { type ctrl struct { ctx context.Context + cncl context.CancelFunc clients []Client } @@ -28,8 +29,7 @@ func (c *ctrl) WithClient(client Client) Controller { } func (c *ctrl) WithContext(ctx context.Context) Controller { - c.ctx = ctx - + c.ctx, c.cncl = context.WithCancel(ctx) return c } @@ -49,6 +49,7 @@ func (c *ctrl) Connect() error { } func (c *ctrl) Close() error { + defer c.cncl() var retErr error for _, client := range c.clients { if err := client.Close(); err != nil { diff --git a/sdk/client/controller_test.go b/sdk/client/controller_test.go index 2962331d9..4a445c044 100644 --- a/sdk/client/controller_test.go +++ b/sdk/client/controller_test.go @@ -22,7 +22,7 @@ func TestControllerContext(t *testing.T) { controller := NewClientController() controller.WithContext(ctx) - assert.Equal(t, ctx, controller.Context()) + assert.NotNil(t, controller.Context()) t.Cleanup(func() { controller.Close() diff --git a/src/core/config/config.go b/src/core/config/config.go index 638ebff64..84dd39582 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -55,6 +55,7 @@ var Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) func SetVersion(version, commit string) { ROOT_COMMAND.Version = version + "-" + commit + Viper.SetDefault(VersionKey, version) } func Execute() error { @@ -62,6 +63,22 @@ func Execute() error { return ROOT_COMMAND.Execute() } +func InitConfiguration(version, commit string) { + SetVersion(version, commit) + SetDefaults() + RegisterFlags() + dynamicConfigPath := DynamicConfigFileAbsPath + if runtime.GOOS == "freebsd" { + dynamicConfigPath = DynamicConfigFileAbsFreeBsdPath + } + configPath, err := RegisterConfigFile(dynamicConfigPath, ConfigFileName, ConfigFilePaths()...) + if err != nil { + log.Fatalf("Failed to load configuration file: %v", err) + } + log.Debugf("Configuration file loaded %v", configPath) + Viper.Set(ConfigPathKey, configPath) +} + func SetDefaults() { // CLOUDACCOUNTID DEFAULT Viper.SetDefault(CloudAccountIdKey, Defaults.CloudAccountID) @@ -165,6 +182,7 @@ func GetConfig(clientId string) (*Config, error) { } config := &Config{ + Version: Viper.GetString(VersionKey), Path: Viper.GetString(ConfigPathKey), DynamicConfigPath: Viper.GetString(DynamicConfigPathKey), ClientID: clientId, diff --git a/src/core/config/defaults.go b/src/core/config/defaults.go index 4c7adfccb..de88138c1 100644 --- a/src/core/config/defaults.go +++ b/src/core/config/defaults.go @@ -93,6 +93,8 @@ var ( ) const ( + MODULE = "NGINX-AGENT" + DynamicConfigFileName = "agent-dynamic.conf" DynamicConfigFileAbsPath = "/var/lib/nginx-agent/agent-dynamic.conf" DynamicConfigFileAbsFreeBsdPath = "/var/db/nginx-agent/agent-dynamic.conf" @@ -102,6 +104,7 @@ const ( ConfigPathKey = "path" DynamicConfigPathKey = "dynamic-config-path" + VersionKey = "version" CloudAccountIdKey = "cloudaccountid" LocationKey = "location" DisplayNameKey = "display_name" diff --git a/src/core/config/types.go b/src/core/config/types.go index 57497ebe6..a7ba6a7fd 100644 --- a/src/core/config/types.go +++ b/src/core/config/types.go @@ -14,6 +14,7 @@ import ( ) type Config struct { + Version string Path string `yaml:"-"` DynamicConfigPath string `yaml:"-"` ClientID string `mapstructure:"agent_id" yaml:"-"` diff --git a/src/core/grpc.go b/src/core/grpc.go new file mode 100644 index 000000000..0d22474f7 --- /dev/null +++ b/src/core/grpc.go @@ -0,0 +1,75 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package core + +import ( + "context" + "strings" + + "github.com/google/uuid" + "github.com/nginx/agent/sdk/v2/client" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/v2/src/core/config" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func CreateGrpcClients(ctx context.Context, loadedConfig *config.Config) (client.Controller, client.Commander, client.MetricReporter) { + if !loadedConfig.IsGrpcServerConfigured() { + log.Info("GRPC clients not created due to missing server config") + return nil, nil, nil + } + + grpcDialOptions := setDialOptions(loadedConfig) + secureMetricsDialOpts, err := sdkGRPC.SecureDialOptions( + loadedConfig.TLS.Enable, + loadedConfig.TLS.Cert, + loadedConfig.TLS.Key, + loadedConfig.TLS.Ca, + loadedConfig.Server.Metrics, + loadedConfig.TLS.SkipVerify) + if err != nil { + log.Fatalf("Failed to load secure metric gRPC dial options: %v", err) + } + + secureCmdDialOpts, err := sdkGRPC.SecureDialOptions( + loadedConfig.TLS.Enable, + loadedConfig.TLS.Cert, + loadedConfig.TLS.Key, + loadedConfig.TLS.Ca, + loadedConfig.Server.Command, + loadedConfig.TLS.SkipVerify) + if err != nil { + log.Fatalf("Failed to load secure command gRPC dial options: %v", err) + } + + controller := client.NewClientController() + controller.WithContext(ctx) + commander := client.NewCommanderClient() + commander.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) + + commander.WithServer(loadedConfig.Server.Target) + commander.WithDialOptions(append(grpcDialOptions, secureCmdDialOpts)...) + + reporter := client.NewMetricReporterClient() + reporter.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) + reporter.WithServer(loadedConfig.Server.Target) + reporter.WithDialOptions(append(grpcDialOptions, secureMetricsDialOpts)...) + + controller.WithClient(commander) + controller.WithClient(reporter) + + return controller, commander, reporter +} + +func setDialOptions(loadedConfig *config.Config) []grpc.DialOption { + grpcDialOptions := []grpc.DialOption{grpc.WithUserAgent("nginx-agent/" + strings.TrimPrefix(version, "v"))} + grpcDialOptions = append(grpcDialOptions, sdkGRPC.DefaultClientDialOptions...) + grpcDialOptions = append(grpcDialOptions, sdkGRPC.DataplaneConnectionDialOptions(loadedConfig.Server.Token, sdkGRPC.NewMessageMeta(uuid.NewString()))...) + return grpcDialOptions +} diff --git a/src/core/grpc_test.go b/src/core/grpc_test.go new file mode 100644 index 000000000..b52ab51e5 --- /dev/null +++ b/src/core/grpc_test.go @@ -0,0 +1,48 @@ +package core + +import ( + "context" + "testing" + + "github.com/nginx/agent/v2/src/core/config" + "github.com/stretchr/testify/assert" +) + +func TestCreateGrpcClients(t *testing.T) { + loadedConfig := &config.Config{ + TLS: config.TLSConfig{ + Enable: true, + SkipVerify: false, + }, + Server: config.Server{ + GrpcPort: 6789, + Host: "192.0.2.4", + }, + } + + ctx := context.Background() + + controller, commander, reporter := CreateGrpcClients(ctx, loadedConfig) + + // Assert that the returned clients are not nil + assert.NotNil(t, controller) + assert.NotNil(t, commander) + assert.NotNil(t, reporter) +} + +func TestSetDialOptions(t *testing.T) { + loadedConfig := &config.Config{ + TLS: config.TLSConfig{ + Enable: true, + SkipVerify: false, + }, + Server: config.Server{ + GrpcPort: 67890, + Host: "192.0.2.5", + }, + } + + dialOptions := setDialOptions(loadedConfig) + + assert.NotEmpty(t, dialOptions) +} diff --git a/src/core/metrics/sources/nginx_worker_test.go b/src/core/metrics/sources/nginx_worker_test.go index 119377df2..fda857e0a 100644 --- a/src/core/metrics/sources/nginx_worker_test.go +++ b/src/core/metrics/sources/nginx_worker_test.go @@ -16,7 +16,7 @@ import ( "github.com/google/uuid" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core/metrics" - "github.com/nginx/agent/v2/test/utils" + tutils "github.com/nginx/agent/v2/test/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -27,13 +27,7 @@ var ( instanceGroup = "my_instances" nginxId = uuid.New().String() systemId = uuid.New().String() - procMap = map[string][]*proto.NginxDetails{ - "1": { - { - ProcessId: "1", - }, - }, - } + procMap = tutils.GetProcessMap() ) type MockWorkerClient struct { @@ -58,14 +52,14 @@ func TestNginxWorkerCollector(t *testing.T) { NginxId: nginxId, } - mockBinary := &utils.MockNginxBinary{} + mockBinary := &tutils.MockNginxBinary{} mockClient := &MockWorkerClient{} n := NewNginxWorker(dimensions, OSSNamespace, mockBinary, mockClient) mockBinary.On("GetChildProcesses").Return(procMap) - mockClient.On("GetWorkerStats", procMap["1"]).Return(&WorkerStats{ + mockClient.On("GetWorkerStats", procMap["12345"]).Return(&WorkerStats{ Workers: &Workers{ Count: 1.00, KbsR: 1.00, diff --git a/src/core/pipe.go b/src/core/pipe.go index 9b9bbdead..1ded89e02 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -54,6 +54,15 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { } } +func InitializePipe(ctx context.Context, corePlugins []Plugin, extensionPlugins []ExtensionPlugin, size int) MessagePipeInterface { + pipe := NewMessagePipe(ctx) + err := pipe.Register(size, corePlugins, extensionPlugins) + if err != nil { + log.Warnf("Failed to start agent successfully, error loading plugins %v", err) + } + return pipe +} + func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []ExtensionPlugin) error { p.mu.Lock() diff --git a/src/core/signals.go b/src/core/signals.go new file mode 100644 index 000000000..c8a1ed8d8 --- /dev/null +++ b/src/core/signals.go @@ -0,0 +1,74 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package core + +import ( + "context" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/nginx/agent/sdk/v2/agent/events" + "github.com/nginx/agent/sdk/v2/client" + "github.com/nginx/agent/v2/src/core/config" + log "github.com/sirupsen/logrus" +) + +// handleSignals handles signals to attempt graceful shutdown +// for now it also handles sending the agent stopped event because as of today we don't have a mechanism for synchronizing +// tasks between multiple plugins from outside a plugin +func HandleSignals( + ctx context.Context, + cmder client.Commander, + loadedConfig *config.Config, + env Environment, + pipe MessagePipeInterface, + cancel context.CancelFunc, + controller client.Controller, +) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + select { + case <-sigChan: + event := events.NewAgentEventMeta( + config.MODULE, + version, + strconv.Itoa(os.Getpid()), + env.GetHostname(), + env.GetSystemUUID(), + loadedConfig.InstanceGroup, + loadedConfig.Tags) + + stopCmd := event.GenerateAgentStopEventCommand() + log.Debugf("Sending agent stopped event: %v", stopCmd) + + if cmder == nil { + log.Warn("Command channel not configured. Skipping sending AgentStopped event") + } else if err := cmder.Send(ctx, client.MessageFromCommand(stopCmd)); err != nil { + log.Errorf("Error sending AgentStopped event to command channel: %v", err) + } + + if controller != nil { + if err := controller.Close(); err != nil { + log.Warnf("Unable to close controller: %v", err) + } + } + + log.Warn("NGINX Agent exiting") + cancel() + + timeout := time.Second * 5 + time.Sleep(timeout) + log.Fatalf("Failed to gracefully shutdown within timeout of %v. Exiting", timeout) + case <-ctx.Done(): + } + }() +} diff --git a/src/plugins/commander.go b/src/plugins/commander.go index 994c952a1..8ce701833 100644 --- a/src/plugins/commander.go +++ b/src/plugins/commander.go @@ -100,7 +100,7 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { } func (c *Commander) agentRegistered(cmd *proto.Command) { - switch commandData := cmd.Data.(type) { + switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: log.Infof("config command %v", commandData) @@ -112,7 +112,7 @@ func (c *Commander) agentRegistered(cmd *proto.Command) { } default: - log.Debugf("unhandled command: %T", cmd.Data) + log.Debugf("unhandled command: %T", cmd.GetData()) } } @@ -152,24 +152,24 @@ func (c *Commander) dispatchLoop() { log.Debugf("Command msg from data plane: %v", cmd) var topic string - switch cmd.Data.(type) { + switch cmd.GetData().(type) { case *proto.Command_NginxConfig, *proto.Command_NginxConfigResponse: topic = core.CommNginxConfig case *proto.Command_AgentConnectRequest, *proto.Command_AgentConnectResponse: topic = core.AgentConnected case *proto.Command_AgentConfigRequest, *proto.Command_AgentConfig: - log.Debugf("agent config %T command data type received and ignored", cmd.Data) + log.Debugf("agent config %T command data type received and ignored", cmd.GetData()) topic = core.AgentConfig case *proto.Command_CmdStatus: - data := cmd.Data.(*proto.Command_CmdStatus) + data := cmd.GetData().(*proto.Command_CmdStatus) if data.CmdStatus.Status != proto.CommandStatusResponse_CMD_OK { - log.Debugf("command status %T :: %+v", cmd.Data, cmd.Data) + log.Debugf("command status %T :: %+v", cmd.GetData(), cmd.GetData()) } topic = core.UNKNOWN continue default: - if cmd.Data != nil { - log.Infof("unknown %T command data type received", cmd.Data) + if cmd.GetData() != nil { + log.Infof("unknown %T command data type received", cmd.GetData()) } topic = core.UNKNOWN continue diff --git a/src/plugins/common.go b/src/plugins/common.go new file mode 100644 index 000000000..045fa1709 --- /dev/null +++ b/src/plugins/common.go @@ -0,0 +1,116 @@ +package plugins + +import ( + "github.com/nginx/agent/sdk/v2/client" + "github.com/nginx/agent/v2/src/core" + "github.com/nginx/agent/v2/src/core/config" + "github.com/nginx/agent/v2/src/extensions" + log "github.com/sirupsen/logrus" + + agent_config "github.com/nginx/agent/sdk/v2/agent/config" + + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + + "github.com/google/uuid" +) + +func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.Environment, reporter client.MetricReporter, loadedConfig *config.Config) ([]core.Plugin, []core.ExtensionPlugin) { + var corePlugins []core.Plugin + var extensionPlugins []core.ExtensionPlugin + + if commander != nil { + corePlugins = append(corePlugins, + NewCommander(commander, loadedConfig), + ) + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureFileWatcher) { + corePlugins = append(corePlugins, + NewFileWatcher(loadedConfig, env), + NewFileWatchThrottle(), + ) + } + } + + if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil { + corePlugins = append(corePlugins, + NewMetricsSender(reporter), + ) + } + + corePlugins = append(corePlugins, + NewConfigReader(loadedConfig), + NewNginx(commander, binary, env, loadedConfig), + NewExtensions(loadedConfig, env), + NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version), + ) + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) { + corePlugins = append(corePlugins, NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()))) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || + (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { + corePlugins = append(corePlugins, NewMetrics(loadedConfig, env, binary)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + corePlugins = append(corePlugins, NewMetricsThrottle(loadedConfig, env)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { + corePlugins = append(corePlugins, NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureProcessWatcher) { + corePlugins = append(corePlugins, NewProcessWatcher(env, binary)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureActivityEvents) { + corePlugins = append(corePlugins, NewEvents(loadedConfig, env, sdkGRPC.NewMessageMeta(uuid.NewString()), binary)) + } + + if loadedConfig.AgentAPI.Port != 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureAgentAPI) { + corePlugins = append(corePlugins, NewAgentAPI(loadedConfig, env, binary)) + } else { + log.Info("Agent API not configured") + } + + if len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting) { + corePlugins = append(corePlugins, NewNginxCounter(loadedConfig, binary, env)) + } + + if loadedConfig.Extensions != nil && len(loadedConfig.Extensions) > 0 { + for _, extension := range loadedConfig.Extensions { + switch { + case extension == agent_config.AdvancedMetricsExtensionPlugin: + advancedMetricsExtensionPlugin := extensions.NewAdvancedMetrics(env, loadedConfig, config.Viper.Get(agent_config.AdvancedMetricsExtensionPluginConfigKey)) + extensionPlugins = append(extensionPlugins, advancedMetricsExtensionPlugin) + case extension == agent_config.NginxAppProtectExtensionPlugin: + nginxAppProtectExtensionPlugin, err := extensions.NewNginxAppProtect(loadedConfig, env, config.Viper.Get(agent_config.NginxAppProtectExtensionPluginConfigKey)) + if err != nil { + log.Errorf("Unable to load the Nginx App Protect plugin due to the following error: %v", err) + } else { + extensionPlugins = append(extensionPlugins, nginxAppProtectExtensionPlugin) + } + case extension == agent_config.NginxAppProtectMonitoringExtensionPlugin: + nginxAppProtectMonitoringExtensionPlugin, err := extensions.NewNAPMonitoring(env, loadedConfig, config.Viper.Get(agent_config.NginxAppProtectMonitoringExtensionPluginConfigKey)) + if err != nil { + log.Errorf("Unable to load the Nginx App Protect Monitoring plugin due to the following error: %v", err) + } else { + extensionPlugins = append(extensionPlugins, nginxAppProtectMonitoringExtensionPlugin) + } + case extension == agent_config.PhpFpmMetricsExtensionPlugin: + phpFpmMetricstExtensionPlugin, err := extensions.NewPhpFpmMetrics(env, loadedConfig) + if err != nil { + log.Errorf("Unable to load the PhpFpm Metrics plugin due to the following error: %v", err) + } else { + extensionPlugins = append(extensionPlugins, phpFpmMetricstExtensionPlugin) + } + default: + log.Warnf("unknown extension configured: %s", extension) + } + } + } + + return corePlugins, extensionPlugins +} diff --git a/src/plugins/common_test.go b/src/plugins/common_test.go new file mode 100644 index 000000000..73bc9375e --- /dev/null +++ b/src/plugins/common_test.go @@ -0,0 +1,67 @@ +package plugins + +import ( + "testing" + + sdk "github.com/nginx/agent/sdk/v2/agent/config" + "github.com/nginx/agent/v2/src/core/config" + tutils "github.com/nginx/agent/v2/test/utils" + "github.com/stretchr/testify/assert" +) + +func TestLoadPlugins(t *testing.T) { + binary := tutils.GetMockNginxBinary() + env := tutils.GetMockEnvWithHostAndProcess() + cmdr := tutils.NewMockCommandClient() + reporter := tutils.NewMockMetricsReportClient() + + tests := []struct { + name string + loadedConfig *config.Config + expectedPluginSize int + expectedExtensionSize int + }{ + { + name: "default plugins", + loadedConfig: &config.Config{}, + expectedPluginSize: 5, + expectedExtensionSize: 0, + }, + { + name: "no plugins or extensions", + loadedConfig: &config.Config{ + Features: []string{}, + Extensions: []string{}, + }, + expectedPluginSize: 5, + expectedExtensionSize: 0, + }, + { + name: "all plugins and extensions", + loadedConfig: &config.Config{ + Features: sdk.GetDefaultFeatures(), + // temporarily to figure out what's going on with the monitoring extension + Extensions: sdk.GetKnownExtensions()[:len(sdk.GetKnownExtensions())-1], + AgentMetrics: config.AgentMetrics{ + BulkSize: 1, + ReportInterval: 1, + CollectionInterval: 1, + Mode: "aggregated", + }, + }, + expectedPluginSize: 14, + // temporarily to figure out what's going on with the monitoring extension + expectedExtensionSize: len(sdk.GetKnownExtensions()[:len(sdk.GetKnownExtensions())-1]), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + corePlugins, extensionPlugins := LoadPlugins(cmdr, binary, env, reporter, tt.loadedConfig) + + assert.NotNil(t, corePlugins) + assert.Len(t, corePlugins, tt.expectedPluginSize) + assert.Len(t, extensionPlugins, tt.expectedExtensionSize) + }) + } +} diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 2160e50e7..d1aef6ae7 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -48,7 +48,7 @@ const ( defaultMinInterval = time.Second * 30 ) -func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment, version string) *DataPlaneStatus { +func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment) *DataPlaneStatus { log.Tracef("Dataplane status interval %s", config.Dataplane.Status.PollInterval) pollInt := config.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { @@ -62,7 +62,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core meta: meta, binary: binary, env: env, - version: version, + version: config.Version, tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, diff --git a/src/plugins/dataplane_status_test.go b/src/plugins/dataplane_status_test.go index b66e7bd55..5fce85134 100644 --- a/src/plugins/dataplane_status_test.go +++ b/src/plugins/dataplane_status_test.go @@ -176,7 +176,7 @@ func TestDataPlaneStatus(t *testing.T) { Tags: []string{}, } - dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env, "") + dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env) messagePipe := core.NewMockMessagePipe(context.Background()) err := messagePipe.Register(10, []core.Plugin{dataPlaneStatus}, []core.ExtensionPlugin{}) @@ -273,7 +273,7 @@ func TestDPSSyncAgentConfigChange(t *testing.T) { defer cleanupFunc() // Setup data plane status and mock pipeline - dataPlaneStatus := NewDataPlaneStatus(tc.config, grpc.NewMessageMeta(uuid.New().String()), binary, env, "") + dataPlaneStatus := NewDataPlaneStatus(tc.config, grpc.NewMessageMeta(uuid.New().String()), binary, env) messagePipe := core.NewMockMessagePipe(context.Background()) err = messagePipe.Register(10, []core.Plugin{dataPlaneStatus}, []core.ExtensionPlugin{}) @@ -350,7 +350,7 @@ func TestDPSSyncNAPDetails(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { // Setup DataPlaneStatus - dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env, "") + dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env) dataPlaneStatus.softwareDetails[agent_config.NginxAppProtectExtensionPlugin] = &proto.DataplaneSoftwareDetails{Data: tc.initialNAPDetails} defer dataPlaneStatus.Close() @@ -416,7 +416,7 @@ func TestDataPlaneSubscriptions(t *testing.T) { Tags: []string{}, } - dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env, "") + dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env) assert.Equal(t, expectedSubscriptions, dataPlaneStatus.Subscriptions()) } diff --git a/src/plugins/events.go b/src/plugins/events.go index 2050e5353..56404ce31 100644 --- a/src/plugins/events.go +++ b/src/plugins/events.go @@ -10,26 +10,20 @@ package plugins import ( "context" "fmt" - "strings" "github.com/gogo/protobuf/types" "github.com/google/uuid" log "github.com/sirupsen/logrus" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/sdk/v2/agent/events" "github.com/nginx/agent/sdk/v2/proto" - commonProto "github.com/nginx/agent/sdk/v2/proto/common" eventsProto "github.com/nginx/agent/sdk/v2/proto/events" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" ) const ( - MODULE = "NGINX-AGENT" - - AGENT_START_MESSAGE = "nginx-agent %s started on %s with pid %s" - AGENT_STOP_MESSAGE = "nginx-agent %s (pid: %s) stopped on %s" NGINX_FOUND_MESSAGE = "nginx-v%s master process was found with a pid %s" NGINX_STOP_MESSAGE = "nginx-v%s master process (pid: %s) stopped" NGINX_RELOAD_SUCCESS_MESSAGE = "nginx-v%s master process (pid: %s) reloaded successfully" @@ -40,31 +34,16 @@ const ( CONFIG_APPLY_FAILURE_MESSAGE = "failed to apply nginx config on %s" CONFIG_ROLLBACK_SUCCESS_MESSAGE = "nginx config was rolled back on %s" CONFIG_ROLLBACK_FAILURE_MESSAGE = "failed to rollback nginx config on %s" - - // Types - NGINX_EVENT_TYPE = "Nginx" - AGENT_EVENT_TYPE = "Agent" - - // Categories - STATUS_CATEGORY = "Status" - CONFIG_CATEGORY = "Config" - APP_PROTECT_CATEGORY = "AppProtect" - - // Event Levels - INFO_EVENT_LEVEL = "INFO" - DEBUG_EVENT_LEVEL = "DEBUG" - WARN_EVENT_LEVEL = "WARN" - ERROR_EVENT_LEVEL = "ERROR" - CRITICAL_EVENT_LEVEL = "CRITICAL" ) type Events struct { - pipeline core.MessagePipeInterface - ctx context.Context - conf *config.Config - env core.Environment - meta *proto.Metadata - nginxBinary core.NginxBinary + pipeline core.MessagePipeInterface + ctx context.Context + conf *config.Config + env core.Environment + meta *proto.Metadata + nginxBinary core.NginxBinary + agentEventsMeta *events.AgentEventMeta } func NewEvents(conf *config.Config, env core.Environment, meta *proto.Metadata, nginxBinary core.NginxBinary) *Events { @@ -130,29 +109,17 @@ func (a *Events) Subscriptions() []string { } func (a *Events) sendAgentStartedEvent(msg *core.Message) { - agentEventMeta, ok := msg.Data().(*AgentEventMeta) + agentEventMeta, ok := msg.Data().(*events.AgentEventMeta) if !ok { log.Warnf("Invalid message received, %T, for topic, %s", msg.Data(), msg.Topic()) return } - event := a.createAgentEvent( - types.TimestampNow(), - INFO_EVENT_LEVEL, - fmt.Sprintf(AGENT_START_MESSAGE, agentEventMeta.version, a.env.GetHostname(), agentEventMeta.pid), - uuid.NewString(), - ) + event := agentEventMeta.GenerateAgentStartEventCommand() log.Debugf("Created event: %v", event) - a.pipeline.Process(core.NewMessage(core.Events, &proto.Command{ - Meta: a.meta, - Type: proto.Command_NORMAL, - Data: &proto.Command_EventReport{ - EventReport: &eventsProto.EventReport{ - Events: []*eventsProto.Event{event}, - }, - }, - })) + a.pipeline.Process(core.NewMessage(core.Events, event)) + a.agentEventsMeta = agentEventMeta } func (a *Events) sendNingxFoundEvent(msg *core.Message) { @@ -162,19 +129,19 @@ func (a *Events) sendNingxFoundEvent(msg *core.Message) { return } - events := []*eventsProto.Event{} + protoEvents := []*eventsProto.Event{} for _, nginxDetail := range nginxDetailsMap { event := a.createNginxEvent( nginxDetail.GetNginxId(), &types.Timestamp{Seconds: nginxDetail.GetStartTime() / 1000, Nanos: int32(nginxDetail.GetStartTime() % 1000)}, - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_FOUND_MESSAGE, nginxDetail.GetVersion(), nginxDetail.GetProcessId()), uuid.NewString(), ) log.Debugf("Created event: %v", event) - events = append(events, event) + protoEvents = append(protoEvents, event) } a.pipeline.Process(core.NewMessage(core.Events, &proto.Command{ @@ -182,7 +149,7 @@ func (a *Events) sendNingxFoundEvent(msg *core.Message) { Type: proto.Command_NORMAL, Data: &proto.Command_EventReport{ EventReport: &eventsProto.EventReport{ - Events: events, + Events: protoEvents, }, }, })) @@ -200,7 +167,7 @@ func (a *Events) sendNginxReloadEvent(msg *core.Message) { event = a.createNginxEvent( nginxReload.nginxDetails.GetNginxId(), nginxReload.timestamp, - WARN_EVENT_LEVEL, + events.WARN_EVENT_LEVEL, fmt.Sprintf(NGINX_RELOAD_SUCCESS_MESSAGE, nginxReload.nginxDetails.GetVersion(), nginxReload.nginxDetails.GetProcessId()), nginxReload.correlationId, ) @@ -208,7 +175,7 @@ func (a *Events) sendNginxReloadEvent(msg *core.Message) { event = a.createNginxEvent( nginxReload.nginxDetails.GetNginxId(), nginxReload.timestamp, - ERROR_EVENT_LEVEL, + events.ERROR_EVENT_LEVEL, fmt.Sprintf(NGINX_RELOAD_FAILED_MESSAGE, nginxReload.nginxDetails.GetVersion(), nginxReload.nginxDetails.GetProcessId()), nginxReload.correlationId, ) @@ -249,7 +216,7 @@ func (a *Events) sendConfigApplyEvent(msg *core.Message) { event = a.createConfigApplyEvent( nginxConfigResponse.GetConfigData().GetNginxId(), command.GetMeta().GetTimestamp(), - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(CONFIG_APPLY_SUCCESS_MESSAGE, a.env.GetHostname()), command.Meta.GetMessageId(), ) @@ -257,7 +224,7 @@ func (a *Events) sendConfigApplyEvent(msg *core.Message) { event = a.createConfigApplyEvent( nginxConfigResponse.GetConfigData().GetNginxId(), command.GetMeta().GetTimestamp(), - ERROR_EVENT_LEVEL, + events.ERROR_EVENT_LEVEL, fmt.Sprintf(CONFIG_APPLY_FAILURE_MESSAGE, a.env.GetHostname()), command.Meta.GetMessageId(), ) @@ -291,7 +258,7 @@ func (a *Events) sendConfigRollbackEvent(msg *core.Message) { event = a.createConfigApplyEvent( configRollbackResponse.nginxDetails.GetNginxId(), configRollbackResponse.timestamp, - WARN_EVENT_LEVEL, + events.WARN_EVENT_LEVEL, fmt.Sprintf(CONFIG_ROLLBACK_SUCCESS_MESSAGE, a.env.GetHostname()), configRollbackResponse.correlationId, ) @@ -299,7 +266,7 @@ func (a *Events) sendConfigRollbackEvent(msg *core.Message) { event = a.createConfigApplyEvent( configRollbackResponse.nginxDetails.GetNginxId(), configRollbackResponse.timestamp, - ERROR_EVENT_LEVEL, + events.ERROR_EVENT_LEVEL, fmt.Sprintf(CONFIG_ROLLBACK_FAILURE_MESSAGE, a.env.GetHostname()), configRollbackResponse.correlationId, ) @@ -328,7 +295,7 @@ func (a *Events) sendNginxStartEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), &types.Timestamp{Seconds: nginxDetails.GetStartTime() / 1000, Nanos: int32(nginxDetails.GetStartTime() % 1000)}, - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_FOUND_MESSAGE, nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -355,7 +322,7 @@ func (a *Events) sendNginxStopEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), types.TimestampNow(), - WARN_EVENT_LEVEL, + events.WARN_EVENT_LEVEL, fmt.Sprintf(NGINX_STOP_MESSAGE, nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -382,7 +349,7 @@ func (a *Events) sendNginxWorkerStartEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), &types.Timestamp{Seconds: nginxDetails.GetStartTime() / 1000, Nanos: int32(nginxDetails.GetStartTime() % 1000)}, - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_WORKER_START_MESSAGE, nginxDetails.GetProcessId(), nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -409,7 +376,7 @@ func (a *Events) sendNginxWorkerStopEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), types.TimestampNow(), - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_WORKER_STOP_MESSAGE, nginxDetails.GetProcessId(), nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -427,17 +394,17 @@ func (a *Events) sendNginxWorkerStopEvent(msg *core.Message) { } func (e *Events) createNginxEvent(nginxId string, timestamp *types.Timestamp, level string, message string, correlationId string) *eventsProto.Event { - activityEvent := e.createActivityEvent(message, nginxId) + activityEvent := e.agentEventsMeta.CreateActivityEvent(message, nginxId) return &eventsProto.Event{ Metadata: &eventsProto.Metadata{ UUID: uuid.NewString(), CorrelationID: correlationId, - Module: MODULE, + Module: config.MODULE, Timestamp: timestamp, EventLevel: level, - Type: NGINX_EVENT_TYPE, - Category: STATUS_CATEGORY, + Type: events.NGINX_EVENT_TYPE, + Category: events.STATUS_CATEGORY, }, Data: &eventsProto.Event_ActivityEvent{ ActivityEvent: activityEvent, @@ -446,131 +413,20 @@ func (e *Events) createNginxEvent(nginxId string, timestamp *types.Timestamp, le } func (e *Events) createConfigApplyEvent(nginxId string, timestamp *types.Timestamp, level string, message string, correlationId string) *eventsProto.Event { - activityEvent := e.createActivityEvent(message, nginxId) - - return &eventsProto.Event{ - Metadata: &eventsProto.Metadata{ - UUID: uuid.NewString(), - CorrelationID: correlationId, - Module: MODULE, - Timestamp: timestamp, - EventLevel: level, - Type: AGENT_EVENT_TYPE, - Category: CONFIG_CATEGORY, - }, - Data: &eventsProto.Event_ActivityEvent{ - ActivityEvent: activityEvent, - }, - } -} - -func (e *Events) createAgentEvent(timestamp *types.Timestamp, level string, message string, correlationId string) *eventsProto.Event { - activityEvent := e.createActivityEvent(message, "") // blank nginxId, this relates to agent not it's nginx instances + activityEvent := e.agentEventsMeta.CreateActivityEvent(message, nginxId) return &eventsProto.Event{ Metadata: &eventsProto.Metadata{ UUID: uuid.NewString(), CorrelationID: correlationId, - Module: MODULE, + Module: config.MODULE, Timestamp: timestamp, EventLevel: level, - Type: AGENT_EVENT_TYPE, - Category: STATUS_CATEGORY, - }, - Data: &eventsProto.Event_ActivityEvent{ - ActivityEvent: activityEvent, - }, - } -} - -func (e *Events) createActivityEvent(message string, nginxId string) *eventsProto.ActivityEvent { - activityEvent := &eventsProto.ActivityEvent{ - Message: message, - Dimensions: []*commonProto.Dimension{ - { - Name: "system_id", - Value: e.env.GetSystemUUID(), - }, - { - Name: "hostname", - Value: e.env.GetHostname(), - }, - { - Name: "instance_group", - Value: e.conf.InstanceGroup, - }, - { - Name: "system.tags", - Value: strings.Join(e.conf.Tags, ","), - }, - }, - } - - if nginxId != "" { - nginxDim := []*commonProto.Dimension{{Name: "nginx_id", Value: nginxId}} - activityEvent.Dimensions = append(nginxDim, activityEvent.Dimensions...) - } - - return activityEvent -} - -type AgentEventMeta struct { - version string - pid string -} - -func NewAgentEventMeta(version string, pid string) *AgentEventMeta { - return &AgentEventMeta{ - version: version, - pid: pid, - } -} - -func GenerateAgentStopEventCommand(agentEvent *AgentEventMeta, conf *config.Config, env core.Environment) *proto.Command { - activityEvent := &eventsProto.ActivityEvent{ - Message: fmt.Sprintf(AGENT_STOP_MESSAGE, agentEvent.version, agentEvent.pid, env.GetHostname()), - Dimensions: []*commonProto.Dimension{ - { - Name: "system_id", - Value: env.GetSystemUUID(), - }, - { - Name: "hostname", - Value: env.GetHostname(), - }, - { - Name: "instance_group", - Value: conf.InstanceGroup, - }, - { - Name: "system.tags", - Value: strings.Join(conf.Tags, ","), - }, - }, - } - - event := &eventsProto.Event{ - Metadata: &eventsProto.Metadata{ - UUID: uuid.NewString(), - CorrelationID: uuid.NewString(), - Module: MODULE, - Timestamp: types.TimestampNow(), - EventLevel: WARN_EVENT_LEVEL, - Type: AGENT_EVENT_TYPE, - Category: STATUS_CATEGORY, + Type: events.AGENT_EVENT_TYPE, + Category: events.CONFIG_CATEGORY, }, Data: &eventsProto.Event_ActivityEvent{ ActivityEvent: activityEvent, }, } - - return &proto.Command{ - Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), - Type: proto.Command_NORMAL, - Data: &proto.Command_EventReport{ - EventReport: &eventsProto.EventReport{ - Events: []*eventsProto.Event{event}, - }, - }, - } } diff --git a/src/plugins/events_test.go b/src/plugins/events_test.go index 3ee4a1305..be36f134c 100644 --- a/src/plugins/events_test.go +++ b/src/plugins/events_test.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/types" "github.com/google/uuid" + "github.com/nginx/agent/sdk/v2/agent/events" "github.com/nginx/agent/sdk/v2/grpc" "github.com/nginx/agent/sdk/v2/proto" commonProto "github.com/nginx/agent/sdk/v2/proto/common" @@ -62,8 +63,10 @@ func TestActivityEvents_Process(t *testing.T) { name: "test NginxInstancesFound message", message: core.NewMessage(core.NginxInstancesFound, tutils.GetDetailsMap()), msgTopics: []string{ + core.AgentStarted, core.NginxInstancesFound, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -93,8 +96,10 @@ func TestActivityEvents_Process(t *testing.T) { correlationId: uuid.NewString(), }), msgTopics: []string{ + core.AgentStarted, core.NginxReloadComplete, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -123,8 +128,10 @@ func TestActivityEvents_Process(t *testing.T) { correlationId: uuid.NewString(), }), msgTopics: []string{ + core.AgentStarted, core.NginxReloadComplete, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -160,8 +167,10 @@ func TestActivityEvents_Process(t *testing.T) { }, }), msgTopics: []string{ + core.AgentStarted, core.CommResponse, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -197,8 +206,10 @@ func TestActivityEvents_Process(t *testing.T) { }, }), msgTopics: []string{ + core.AgentStarted, core.CommResponse, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -234,8 +245,10 @@ func TestActivityEvents_Process(t *testing.T) { }, }), msgTopics: []string{ + core.AgentStarted, core.CommResponse, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -269,7 +282,9 @@ func TestActivityEvents_Process(t *testing.T) { }, }), msgTopics: []string{ + core.AgentStarted, core.CommResponse, + core.Events, }, expectedEventReport: nil, }, @@ -281,8 +296,10 @@ func TestActivityEvents_Process(t *testing.T) { correlationId: uuid.NewString(), }), msgTopics: []string{ + core.AgentStarted, core.ConfigRollbackResponse, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -311,8 +328,10 @@ func TestActivityEvents_Process(t *testing.T) { correlationId: uuid.NewString(), }), msgTopics: []string{ + core.AgentStarted, core.ConfigRollbackResponse, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -334,8 +353,16 @@ func TestActivityEvents_Process(t *testing.T) { }, }, { - name: "test AgentStart message", - message: core.NewMessage(core.AgentStarted, &AgentEventMeta{version: "v0.0.1", pid: "75231"}), + name: "test AgentStart message", + message: core.NewMessage(core.AgentStarted, events.NewAgentEventMeta( + config.MODULE, + "v0.0.1", + "75231", + "test-host", + "12345678", + "group-a", + []string{"tag-a", "tag-b"}, + )), msgTopics: []string{ core.AgentStarted, core.Events, @@ -363,8 +390,10 @@ func TestActivityEvents_Process(t *testing.T) { name: "test NginxMasterProcCreated message", message: core.NewMessage(core.NginxMasterProcCreated, &proto.NginxDetails{Version: "1.0.1", ProcessId: "75231"}), msgTopics: []string{ + core.AgentStarted, core.NginxMasterProcCreated, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -389,8 +418,10 @@ func TestActivityEvents_Process(t *testing.T) { name: "test NginxMasterProcKilled message", message: core.NewMessage(core.NginxMasterProcKilled, &proto.NginxDetails{Version: "1.0.1", ProcessId: "75231"}), msgTopics: []string{ + core.AgentStarted, core.NginxMasterProcKilled, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -415,8 +446,10 @@ func TestActivityEvents_Process(t *testing.T) { name: "test NginxWorkerProcCreated message", message: core.NewMessage(core.NginxWorkerProcCreated, &proto.NginxDetails{Version: "1.0.1", ProcessId: "75231"}), msgTopics: []string{ + core.AgentStarted, core.NginxWorkerProcCreated, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -441,8 +474,10 @@ func TestActivityEvents_Process(t *testing.T) { name: "test NginxWorkerProcKilled message", message: core.NewMessage(core.NginxWorkerProcKilled, &proto.NginxDetails{Version: "1.0.1", ProcessId: "75231"}), msgTopics: []string{ + core.AgentStarted, core.NginxWorkerProcKilled, core.Events, + core.Events, }, expectedEventReport: &eventsProto.EventReport{ Events: []*eventsProto.Event{ @@ -467,7 +502,9 @@ func TestActivityEvents_Process(t *testing.T) { name: "test unknown message", message: core.NewMessage(core.UNKNOWN, "unknown message"), msgTopics: []string{ + core.AgentStarted, core.UNKNOWN, + core.Events, }, }, } @@ -497,9 +534,21 @@ func TestActivityEvents_Process(t *testing.T) { } pluginUnderTest := NewEvents(config, env, grpc.NewMessageMeta(uuid.New().String()), core.NewNginxBinary(env, config)) - messagePipe := core.SetupMockMessagePipe(t, ctx, []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) + if test.name != "test AgentStart message" { + agentMeta := events.NewAgentEventMeta( + "NGINX-AGENT", + "v0.0.1", + "75231", + "test-host", + "12345678", + "group-a", + []string{"tag-a", "tag-b"}) + + messagePipe.Process(core.NewMessage(core.AgentStarted, agentMeta)) + } + messagePipe.Process(test.message) messagePipe.Run() time.Sleep(250 * time.Millisecond) @@ -513,9 +562,22 @@ func TestActivityEvents_Process(t *testing.T) { tt.Errorf("unexpected message topic: %s :: should have been: %s", msg.Topic(), test.msgTopics[idx]) } if test.expectedEventReport != nil && msg.Exact(core.Events) { - expectedEvent := test.expectedEventReport.Events[0] + var expectedEvent *eventsProto.Event + if len(test.expectedEventReport.GetEvents()) == 1 { + expectedEvent = test.expectedEventReport.Events[0] + } else { + // get the latest event + events := test.expectedEventReport.Events[:len(test.expectedEventReport.GetEvents())-1] + expectedEvent = events[0] + } actualEvent := msg.Data().(*proto.Command).GetEventReport().Events[0] + if actualEvent.GetMetadata().GetType() != expectedEvent.GetMetadata().GetType() || + actualEvent.GetMetadata().GetCategory() != expectedEvent.GetMetadata().GetCategory() || + actualEvent.GetMetadata().GetEventLevel() != expectedEvent.GetMetadata().GetEventLevel() { + continue + } + // assert metadata assert.Equal(tt, expectedEvent.Metadata.Module, actualEvent.Metadata.Module) assert.Equal(tt, expectedEvent.Metadata.Category, actualEvent.Metadata.Category) @@ -597,9 +659,17 @@ func TestGenerateAgentStopEvent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { env := tutils.NewMockEnvironment() - - agentStopCmd := GenerateAgentStopEventCommand(&AgentEventMeta{version: tt.agentVersion, pid: tt.pid}, tt.conf, env) - actualEvent := agentStopCmd.GetEventReport().Events[0] + meta := events.NewAgentEventMeta( + config.MODULE, + tt.agentVersion, + tt.pid, + env.GetHostname(), + env.GetSystemUUID(), + tt.conf.InstanceGroup, + tt.conf.Tags, + ) + agentStopCmd := meta.GenerateAgentStopEventCommand() + actualEvent := agentStopCmd.GetEventReport().GetEvents()[0] // assert metadata assert.Equal(t, tt.expectedEvent.Metadata.Module, actualEvent.Metadata.Module) @@ -608,8 +678,8 @@ func TestGenerateAgentStopEvent(t *testing.T) { assert.Equal(t, tt.expectedEvent.Metadata.EventLevel, actualEvent.Metadata.EventLevel) // assert activity event - assert.Equal(t, tt.expectedEvent.GetActivityEvent().Message, actualEvent.GetActivityEvent().Message) - assert.Equal(t, tt.expectedEvent.GetActivityEvent().Dimensions, actualEvent.GetActivityEvent().Dimensions) + assert.Equal(t, tt.expectedEvent.GetActivityEvent().GetMessage(), actualEvent.GetActivityEvent().GetMessage()) + assert.Equal(t, tt.expectedEvent.GetActivityEvent().GetDimensions(), actualEvent.GetActivityEvent().GetDimensions()) }) } } diff --git a/src/plugins/features.go b/src/plugins/features.go index 7a17686a3..51f1867a2 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -210,7 +210,7 @@ func (f *Features) enableRegistrationFeature(data string) []core.Plugin { } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString())) return []core.Plugin{registration} } @@ -225,7 +225,7 @@ func (f *Features) enableDataPlaneStatusFeature(data string) []core.Plugin { } f.conf = conf - dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env, f.version) + dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env) return []core.Plugin{dataPlaneStatus} } diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index 64c943933..a3932c372 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -155,13 +155,7 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { }, }).Once() env.On("IsContainer").Return(false) - env.On("GetChildProcesses").Return(map[string][]*proto.NginxDetails{ - "1": { - { - ProcessId: "1", - }, - }, - }) + env.On("GetChildProcesses").Return(tutils.GetProcessMap()) metricsPlugin := NewMetrics(config, env, binary) metricsPlugin.collectors = []metrics.Collector{ diff --git a/src/plugins/registration.go b/src/plugins/registration.go index 8d94b736d..c57209fc0 100644 --- a/src/plugins/registration.go +++ b/src/plugins/registration.go @@ -49,13 +49,12 @@ func NewOneTimeRegistration( binary core.NginxBinary, env core.Environment, meta *proto.Metadata, - version string, ) *OneTimeRegistration { // this might be slow so do on startup - host := env.NewHostInfo(version, &config.Tags, config.ConfigDirs, true) + host := env.NewHostInfo(config.Version, &config.Tags, config.ConfigDirs, true) return &OneTimeRegistration{ tags: &config.Tags, - agentVersion: version, + agentVersion: config.Version, meta: meta, config: config, env: env, diff --git a/src/plugins/registration_test.go b/src/plugins/registration_test.go index ab0360d27..84c05da74 100644 --- a/src/plugins/registration_test.go +++ b/src/plugins/registration_test.go @@ -45,7 +45,7 @@ func TestRegistration_Process(t *testing.T) { Extensions: []string{agent_config.NginxAppProtectExtensionPlugin}, } - pluginUnderTest := NewOneTimeRegistration(cfg, binary, env, &proto.Metadata{}, "0.0.0") + pluginUnderTest := NewOneTimeRegistration(cfg, binary, env, &proto.Metadata{}) pluginUnderTest.dataplaneSoftwareDetails[agent_config.NginxAppProtectExtensionPlugin] = &proto.DataplaneSoftwareDetails{ Data: testNAPDetailsActive, } @@ -77,7 +77,7 @@ func TestRegistration_areDataplaneSoftwareDetailsReady(t *testing.T) { conf := tutils.GetMockAgentConfig() conf.Extensions = []string{agent_config.NginxAppProtectExtensionPlugin} - pluginUnderTest := NewOneTimeRegistration(conf, nil, tutils.GetMockEnv(), nil, "") + pluginUnderTest := NewOneTimeRegistration(conf, nil, tutils.GetMockEnv(), nil) softwareDetails := make(map[string]*proto.DataplaneSoftwareDetails) softwareDetails[agent_config.NginxAppProtectExtensionPlugin] = &proto.DataplaneSoftwareDetails{} pluginUnderTest.dataplaneSoftwareDetails = softwareDetails @@ -86,13 +86,13 @@ func TestRegistration_areDataplaneSoftwareDetailsReady(t *testing.T) { } func TestRegistration_Subscriptions(t *testing.T) { - pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil, "") + pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil) assert.Equal(t, []string{core.RegistrationCompletedTopic, core.DataplaneSoftwareDetailsUpdated}, pluginUnderTest.Subscriptions()) } func TestRegistration_Info(t *testing.T) { - pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil, "") + pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil) assert.Equal(t, "registration", pluginUnderTest.Info().Name()) } diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go new file mode 100644 index 000000000..a6943f3a4 --- /dev/null +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go @@ -0,0 +1,203 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package events + +import ( + "fmt" + "strings" + + "github.com/gogo/protobuf/types" + "github.com/google/uuid" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/sdk/v2/proto" + commonProto "github.com/nginx/agent/sdk/v2/proto/common" + eventsProto "github.com/nginx/agent/sdk/v2/proto/events" +) + +type AgentEventMeta struct { + module string + version string + pid string + hostname string + systemUuid string + instanceGroup string + tags string + tagsRaw []string +} + +func NewAgentEventMeta( + module, version, pid, hostname, systemUuid, instanceGroup string, + tags []string, +) *AgentEventMeta { + return &AgentEventMeta{ + module: module, + version: version, + pid: pid, + hostname: hostname, + systemUuid: systemUuid, + instanceGroup: instanceGroup, + tagsRaw: tags, + tags: strings.Join(tags, ","), + } +} + +func (aem *AgentEventMeta) GetVersion() string { + return aem.version +} + +func (aem *AgentEventMeta) GetPid() string { + return aem.pid +} + +func (aem *AgentEventMeta) GenerateAgentStartEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_START_MESSAGE, aem.version, aem.hostname, aem.pid), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) GenerateAgentStopEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_STOP_MESSAGE, aem.version, aem.pid, aem.hostname), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) CreateAgentEvent(timestamp *types.Timestamp, level, message, correlationId, module string) *eventsProto.Event { + activityEvent := aem.CreateActivityEvent(message, "") // blank nginxId, this relates to agent not it's nginx instances + + return &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: correlationId, + Module: module, + Timestamp: timestamp, + EventLevel: level, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } +} + +func (aem *AgentEventMeta) CreateActivityEvent(message string, nginxId string) *eventsProto.ActivityEvent { + activityEvent := &eventsProto.ActivityEvent{ + Message: message, + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + if nginxId != "" { + nginxDim := []*commonProto.Dimension{{Name: "nginx_id", Value: nginxId}} + activityEvent.Dimensions = append(nginxDim, activityEvent.Dimensions...) + } + + return activityEvent +} diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go new file mode 100644 index 000000000..8f55f5d34 --- /dev/null +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go @@ -0,0 +1,23 @@ +package events + +const ( + // Types + NGINX_EVENT_TYPE = "Nginx" + AGENT_EVENT_TYPE = "Agent" + + // Categories + STATUS_CATEGORY = "Status" + CONFIG_CATEGORY = "Config" + APP_PROTECT_CATEGORY = "AppProtect" + + // Event Levels + INFO_EVENT_LEVEL = "INFO" + DEBUG_EVENT_LEVEL = "DEBUG" + WARN_EVENT_LEVEL = "WARN" + ERROR_EVENT_LEVEL = "ERROR" + CRITICAL_EVENT_LEVEL = "CRITICAL" + + // Messages + AGENT_START_MESSAGE = "nginx-agent %s started on %s with pid %s" + AGENT_STOP_MESSAGE = "nginx-agent %s (pid: %s) stopped on %s" +) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go index b89efd01b..7117269d0 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go @@ -18,6 +18,7 @@ func NewClientController() Controller { type ctrl struct { ctx context.Context + cncl context.CancelFunc clients []Client } @@ -28,8 +29,7 @@ func (c *ctrl) WithClient(client Client) Controller { } func (c *ctrl) WithContext(ctx context.Context) Controller { - c.ctx = ctx - + c.ctx, c.cncl = context.WithCancel(ctx) return c } @@ -49,6 +49,7 @@ func (c *ctrl) Connect() error { } func (c *ctrl) Close() error { + defer c.cncl() var retErr error for _, client := range c.clients { if err := client.Close(); err != nil { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go index 638ebff64..84dd39582 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -55,6 +55,7 @@ var Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) func SetVersion(version, commit string) { ROOT_COMMAND.Version = version + "-" + commit + Viper.SetDefault(VersionKey, version) } func Execute() error { @@ -62,6 +63,22 @@ func Execute() error { return ROOT_COMMAND.Execute() } +func InitConfiguration(version, commit string) { + SetVersion(version, commit) + SetDefaults() + RegisterFlags() + dynamicConfigPath := DynamicConfigFileAbsPath + if runtime.GOOS == "freebsd" { + dynamicConfigPath = DynamicConfigFileAbsFreeBsdPath + } + configPath, err := RegisterConfigFile(dynamicConfigPath, ConfigFileName, ConfigFilePaths()...) + if err != nil { + log.Fatalf("Failed to load configuration file: %v", err) + } + log.Debugf("Configuration file loaded %v", configPath) + Viper.Set(ConfigPathKey, configPath) +} + func SetDefaults() { // CLOUDACCOUNTID DEFAULT Viper.SetDefault(CloudAccountIdKey, Defaults.CloudAccountID) @@ -165,6 +182,7 @@ func GetConfig(clientId string) (*Config, error) { } config := &Config{ + Version: Viper.GetString(VersionKey), Path: Viper.GetString(ConfigPathKey), DynamicConfigPath: Viper.GetString(DynamicConfigPathKey), ClientID: clientId, diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index 4c7adfccb..de88138c1 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -93,6 +93,8 @@ var ( ) const ( + MODULE = "NGINX-AGENT" + DynamicConfigFileName = "agent-dynamic.conf" DynamicConfigFileAbsPath = "/var/lib/nginx-agent/agent-dynamic.conf" DynamicConfigFileAbsFreeBsdPath = "/var/db/nginx-agent/agent-dynamic.conf" @@ -102,6 +104,7 @@ const ( ConfigPathKey = "path" DynamicConfigPathKey = "dynamic-config-path" + VersionKey = "version" CloudAccountIdKey = "cloudaccountid" LocationKey = "location" DisplayNameKey = "display_name" diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/types.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/types.go index 57497ebe6..a7ba6a7fd 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/types.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/types.go @@ -14,6 +14,7 @@ import ( ) type Config struct { + Version string Path string `yaml:"-"` DynamicConfigPath string `yaml:"-"` ClientID string `mapstructure:"agent_id" yaml:"-"` diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/grpc.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/grpc.go new file mode 100644 index 000000000..0d22474f7 --- /dev/null +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/grpc.go @@ -0,0 +1,75 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package core + +import ( + "context" + "strings" + + "github.com/google/uuid" + "github.com/nginx/agent/sdk/v2/client" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/v2/src/core/config" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func CreateGrpcClients(ctx context.Context, loadedConfig *config.Config) (client.Controller, client.Commander, client.MetricReporter) { + if !loadedConfig.IsGrpcServerConfigured() { + log.Info("GRPC clients not created due to missing server config") + return nil, nil, nil + } + + grpcDialOptions := setDialOptions(loadedConfig) + secureMetricsDialOpts, err := sdkGRPC.SecureDialOptions( + loadedConfig.TLS.Enable, + loadedConfig.TLS.Cert, + loadedConfig.TLS.Key, + loadedConfig.TLS.Ca, + loadedConfig.Server.Metrics, + loadedConfig.TLS.SkipVerify) + if err != nil { + log.Fatalf("Failed to load secure metric gRPC dial options: %v", err) + } + + secureCmdDialOpts, err := sdkGRPC.SecureDialOptions( + loadedConfig.TLS.Enable, + loadedConfig.TLS.Cert, + loadedConfig.TLS.Key, + loadedConfig.TLS.Ca, + loadedConfig.Server.Command, + loadedConfig.TLS.SkipVerify) + if err != nil { + log.Fatalf("Failed to load secure command gRPC dial options: %v", err) + } + + controller := client.NewClientController() + controller.WithContext(ctx) + commander := client.NewCommanderClient() + commander.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) + + commander.WithServer(loadedConfig.Server.Target) + commander.WithDialOptions(append(grpcDialOptions, secureCmdDialOpts)...) + + reporter := client.NewMetricReporterClient() + reporter.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) + reporter.WithServer(loadedConfig.Server.Target) + reporter.WithDialOptions(append(grpcDialOptions, secureMetricsDialOpts)...) + + controller.WithClient(commander) + controller.WithClient(reporter) + + return controller, commander, reporter +} + +func setDialOptions(loadedConfig *config.Config) []grpc.DialOption { + grpcDialOptions := []grpc.DialOption{grpc.WithUserAgent("nginx-agent/" + strings.TrimPrefix(version, "v"))} + grpcDialOptions = append(grpcDialOptions, sdkGRPC.DefaultClientDialOptions...) + grpcDialOptions = append(grpcDialOptions, sdkGRPC.DataplaneConnectionDialOptions(loadedConfig.Server.Token, sdkGRPC.NewMessageMeta(uuid.NewString()))...) + return grpcDialOptions +} diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 9b9bbdead..1ded89e02 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -54,6 +54,15 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { } } +func InitializePipe(ctx context.Context, corePlugins []Plugin, extensionPlugins []ExtensionPlugin, size int) MessagePipeInterface { + pipe := NewMessagePipe(ctx) + err := pipe.Register(size, corePlugins, extensionPlugins) + if err != nil { + log.Warnf("Failed to start agent successfully, error loading plugins %v", err) + } + return pipe +} + func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []ExtensionPlugin) error { p.mu.Lock() diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/signals.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/signals.go new file mode 100644 index 000000000..c8a1ed8d8 --- /dev/null +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/signals.go @@ -0,0 +1,74 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package core + +import ( + "context" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/nginx/agent/sdk/v2/agent/events" + "github.com/nginx/agent/sdk/v2/client" + "github.com/nginx/agent/v2/src/core/config" + log "github.com/sirupsen/logrus" +) + +// handleSignals handles signals to attempt graceful shutdown +// for now it also handles sending the agent stopped event because as of today we don't have a mechanism for synchronizing +// tasks between multiple plugins from outside a plugin +func HandleSignals( + ctx context.Context, + cmder client.Commander, + loadedConfig *config.Config, + env Environment, + pipe MessagePipeInterface, + cancel context.CancelFunc, + controller client.Controller, +) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + select { + case <-sigChan: + event := events.NewAgentEventMeta( + config.MODULE, + version, + strconv.Itoa(os.Getpid()), + env.GetHostname(), + env.GetSystemUUID(), + loadedConfig.InstanceGroup, + loadedConfig.Tags) + + stopCmd := event.GenerateAgentStopEventCommand() + log.Debugf("Sending agent stopped event: %v", stopCmd) + + if cmder == nil { + log.Warn("Command channel not configured. Skipping sending AgentStopped event") + } else if err := cmder.Send(ctx, client.MessageFromCommand(stopCmd)); err != nil { + log.Errorf("Error sending AgentStopped event to command channel: %v", err) + } + + if controller != nil { + if err := controller.Close(); err != nil { + log.Warnf("Unable to close controller: %v", err) + } + } + + log.Warn("NGINX Agent exiting") + cancel() + + timeout := time.Second * 5 + time.Sleep(timeout) + log.Fatalf("Failed to gracefully shutdown within timeout of %v. Exiting", timeout) + case <-ctx.Done(): + } + }() +} diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go index 58fa133dd..a9bae0bb6 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go @@ -46,6 +46,10 @@ func GetMockAgentConfig() *config.Config { CollectionInterval: 1, Mode: "aggregated", }, + Server: config.Server{ + Host: "127.0.0.1", + GrpcPort: 67890, + }, } } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index 6980a98a7..53b2a9779 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -124,8 +124,8 @@ func (m *MockNginxBinary) UpdateNginxDetailsFromProcesses(nginxProcesses []*core } func (m *MockNginxBinary) GetNginxIDForProcess(nginxProcess *core.Process) string { - args := m.Called(nginxProcess) - return args.String(0) + m.Called(nginxProcess) + return nginxProcess.Name } func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) *proto.NginxDetails { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/process.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/process.go index dfd405618..6c8c5f61f 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/process.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/process.go @@ -5,6 +5,8 @@ import ( "os" "os/exec" "time" + + "github.com/nginx/agent/sdk/v2/proto" ) // StartFakeProcesses creates a fake process for each of the string names and @@ -29,3 +31,13 @@ func StartFakeProcesses(names []string, fakeProcsDuration string) func() { } } } + +func GetProcessMap() map[string][]*proto.NginxDetails { + return map[string][]*proto.NginxDetails{ + "12345": { + { + ProcessId: "1", + }, + }, + } +} diff --git a/test/integration/vendor/modules.txt b/test/integration/vendor/modules.txt index d8d3101fc..fde2702a6 100644 --- a/test/integration/vendor/modules.txt +++ b/test/integration/vendor/modules.txt @@ -649,6 +649,7 @@ github.com/munnerz/goautoneg ## explicit; go 1.21 github.com/nginx/agent/sdk/v2 github.com/nginx/agent/sdk/v2/agent/config +github.com/nginx/agent/sdk/v2/agent/events github.com/nginx/agent/sdk/v2/backoff github.com/nginx/agent/sdk/v2/checksum github.com/nginx/agent/sdk/v2/client diff --git a/test/performance/metrics_test.go b/test/performance/metrics_test.go index 786f40a09..a3c1966b2 100644 --- a/test/performance/metrics_test.go +++ b/test/performance/metrics_test.go @@ -323,10 +323,10 @@ func startNginxAgent(b *testing.B) { plugins.NewNginx(commander, binary, env, &config.Config{}), plugins.NewCommander(commander, loadedConfig), plugins.NewMetricsSender(reporter), - plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.New().String()), "1.0.0"), + plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.New().String())), plugins.NewMetrics(loadedConfig, env, binary), plugins.NewMetricsThrottle(loadedConfig, env), - plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env, "1.0.0"), + plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env), } messagePipe := core.NewMessagePipe(ctx) diff --git a/test/performance/plugins_test.go b/test/performance/plugins_test.go index 408c43ab0..719928d39 100644 --- a/test/performance/plugins_test.go +++ b/test/performance/plugins_test.go @@ -5,11 +5,12 @@ import ( "testing" "time" + sdk "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/plugins" - "github.com/nginx/agent/v2/test/utils" + tutils "github.com/nginx/agent/v2/test/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -68,13 +69,129 @@ func BenchmarkPlugin(b *testing.B) { plugin.AssertExpectations(b) } +func BenchmarkFeaturesExtensionsAndPlugins(b *testing.B) { + detailsMap := tutils.GetDetailsMap() + procMap := tutils.GetProcessMap() + + binary := tutils.GetMockNginxBinary() + binary.On("GetNginxDetailsMapFromProcesses", mock.Anything).Return(detailsMap) + binary.On("GetNginxIDForProcess", mock.Anything).Return("12345") + binary.On("UpdateNginxDetailsFromProcesses", mock.Anything).Return() + binary.On("GetChildProcesses").Return(procMap) + binary.On("ReadConfig", mock.Anything, mock.Anything, mock.Anything).Return(&proto.NginxConfig{}, nil) + + env := tutils.GetMockEnvWithHostAndProcess() + env.Mock.On("IsContainer").Return(false) + env.Mock.On("DiskDevices").Return([]string{"disk1", "disk2"}, nil) + env.Mock.On("GetNetOverflow").Return(0.0, nil) + + tests := []struct { + name string + loadedConfig *config.Config + expectedPluginSize int + expectedExtensionSize int + }{ + { + name: "default plugins and no extensions", + loadedConfig: &config.Config{ + Server: tutils.GetMockAgentConfig().Server, + AgentMetrics: config.AgentMetrics{ + BulkSize: 1, + ReportInterval: 1, + CollectionInterval: 1, + Mode: "aggregated", + }, + AgentAPI: config.AgentAPI{ + Port: 23456, + Key: "", + Cert: "", + }, + Dataplane: config.Dataplane{ + Status: config.Status{PollInterval: 30 * time.Second}, + }, + }, + expectedPluginSize: 5, + expectedExtensionSize: 0, + }, + { + name: "default plugins and all extensions", + loadedConfig: &config.Config{ + Extensions: sdk.GetKnownExtensions()[:len(sdk.GetKnownExtensions())-1], + Server: tutils.GetMockAgentConfig().Server, + AgentMetrics: config.AgentMetrics{ + BulkSize: 1, + ReportInterval: 1, + CollectionInterval: 1, + Mode: "aggregated", + }, + Dataplane: config.Dataplane{ + Status: config.Status{PollInterval: 30 * time.Second}, + }, + }, + expectedPluginSize: 5, + expectedExtensionSize: 2, + }, + { + name: "all plugins and extensions", + loadedConfig: &config.Config{ + Version: "v9.99.999", + Server: tutils.GetMockAgentConfig().Server, + Features: sdk.GetDefaultFeatures(), + // temporarily to figure out what's going on with the monitoring extension + Extensions: sdk.GetKnownExtensions()[:len(sdk.GetKnownExtensions())-1], + AgentMetrics: config.AgentMetrics{ + BulkSize: 1, + ReportInterval: 1, + CollectionInterval: 1, + Mode: "aggregated", + }, + AgentAPI: config.AgentAPI{ + Port: 2345, + Key: "", + Cert: "", + }, + Dataplane: config.Dataplane{ + Status: config.Status{PollInterval: 30 * time.Second}, + }, + }, + expectedPluginSize: 15, + // temporarily to figure out what's going on with the monitoring extension + expectedExtensionSize: len(sdk.GetKnownExtensions()[:len(sdk.GetKnownExtensions())-1]), + }, + } + + for _, tt := range tests { + ctx, cancel := context.WithCancel(context.Background()) + var pipe core.MessagePipeInterface + var corePlugins []core.Plugin + var extensionPlugins []core.ExtensionPlugin + + b.Run(tt.name, func(t *testing.B) { + for i := 0; i < b.N; i++ { + b.ResetTimer() + controller, cmdr, reporter := core.CreateGrpcClients(ctx, tt.loadedConfig) + corePlugins, extensionPlugins = plugins.LoadPlugins(cmdr, binary, env, reporter, tt.loadedConfig) + pipe = core.InitializePipe(ctx, corePlugins, extensionPlugins, 20) + core.HandleSignals(ctx, cmdr, tt.loadedConfig, env, pipe, cancel, controller) + } + + assert.NotNil(t, corePlugins) + assert.Len(t, corePlugins, tt.expectedPluginSize) + assert.Len(t, extensionPlugins, tt.expectedExtensionSize) + }) + } +} + func BenchmarkPluginOneTimeRegistration(b *testing.B) { var pluginsUnderTest []core.Plugin ctx, cancel := context.WithCancel(context.Background()) pipelineDone := make(chan bool) - config := config.Config{Nginx: config.Nginx{Debug: true}} + config := config.Config{ + Nginx: config.Nginx{Debug: true}, + Version: "1234", + } processID := "12345" detailsMap := map[string]*proto.NginxDetails{ @@ -84,29 +201,18 @@ func BenchmarkPluginOneTimeRegistration(b *testing.B) { }, } - binary := utils.NewMockNginxBinary() + binary := tutils.NewMockNginxBinary() binary.On("GetNginxDetailsMapFromProcesses", mock.Anything).Return(detailsMap) binary.On("GetNginxIDForProcess", mock.Anything).Return(processID) binary.On("GetNginxDetailsFromProcess", mock.Anything).Return(detailsMap[processID]) binary.On("ReadConfig", mock.Anything, mock.Anything, mock.Anything).Return(&proto.NginxConfig{}, nil) - env := utils.NewMockEnvironment() - env.Mock.On("NewHostInfo", mock.Anything, mock.Anything, mock.Anything).Return(&proto.HostInfo{ - Hostname: "test-host", - }) - env.Mock.On("Processes", mock.Anything).Return([]*core.Process{ - { - Name: processID, - IsMaster: true, - }, - }) - + env := tutils.GetMockEnvWithHostAndProcess() meta := proto.Metadata{} - version := "1234" messagePipe := core.NewMessagePipe(ctx) for n := 0; n < b.N; n++ { - pluginsUnderTest = append(pluginsUnderTest, plugins.NewOneTimeRegistration(&config, binary, env, &meta, version)) + pluginsUnderTest = append(pluginsUnderTest, plugins.NewOneTimeRegistration(&config, binary, env, &meta)) } err := messagePipe.Register(b.N, pluginsUnderTest, []core.ExtensionPlugin{}) diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go new file mode 100644 index 000000000..a6943f3a4 --- /dev/null +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go @@ -0,0 +1,203 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package events + +import ( + "fmt" + "strings" + + "github.com/gogo/protobuf/types" + "github.com/google/uuid" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/sdk/v2/proto" + commonProto "github.com/nginx/agent/sdk/v2/proto/common" + eventsProto "github.com/nginx/agent/sdk/v2/proto/events" +) + +type AgentEventMeta struct { + module string + version string + pid string + hostname string + systemUuid string + instanceGroup string + tags string + tagsRaw []string +} + +func NewAgentEventMeta( + module, version, pid, hostname, systemUuid, instanceGroup string, + tags []string, +) *AgentEventMeta { + return &AgentEventMeta{ + module: module, + version: version, + pid: pid, + hostname: hostname, + systemUuid: systemUuid, + instanceGroup: instanceGroup, + tagsRaw: tags, + tags: strings.Join(tags, ","), + } +} + +func (aem *AgentEventMeta) GetVersion() string { + return aem.version +} + +func (aem *AgentEventMeta) GetPid() string { + return aem.pid +} + +func (aem *AgentEventMeta) GenerateAgentStartEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_START_MESSAGE, aem.version, aem.hostname, aem.pid), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) GenerateAgentStopEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_STOP_MESSAGE, aem.version, aem.pid, aem.hostname), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) CreateAgentEvent(timestamp *types.Timestamp, level, message, correlationId, module string) *eventsProto.Event { + activityEvent := aem.CreateActivityEvent(message, "") // blank nginxId, this relates to agent not it's nginx instances + + return &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: correlationId, + Module: module, + Timestamp: timestamp, + EventLevel: level, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } +} + +func (aem *AgentEventMeta) CreateActivityEvent(message string, nginxId string) *eventsProto.ActivityEvent { + activityEvent := &eventsProto.ActivityEvent{ + Message: message, + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + if nginxId != "" { + nginxDim := []*commonProto.Dimension{{Name: "nginx_id", Value: nginxId}} + activityEvent.Dimensions = append(nginxDim, activityEvent.Dimensions...) + } + + return activityEvent +} diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go new file mode 100644 index 000000000..8f55f5d34 --- /dev/null +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go @@ -0,0 +1,23 @@ +package events + +const ( + // Types + NGINX_EVENT_TYPE = "Nginx" + AGENT_EVENT_TYPE = "Agent" + + // Categories + STATUS_CATEGORY = "Status" + CONFIG_CATEGORY = "Config" + APP_PROTECT_CATEGORY = "AppProtect" + + // Event Levels + INFO_EVENT_LEVEL = "INFO" + DEBUG_EVENT_LEVEL = "DEBUG" + WARN_EVENT_LEVEL = "WARN" + ERROR_EVENT_LEVEL = "ERROR" + CRITICAL_EVENT_LEVEL = "CRITICAL" + + // Messages + AGENT_START_MESSAGE = "nginx-agent %s started on %s with pid %s" + AGENT_STOP_MESSAGE = "nginx-agent %s (pid: %s) stopped on %s" +) diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go index b89efd01b..7117269d0 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go @@ -18,6 +18,7 @@ func NewClientController() Controller { type ctrl struct { ctx context.Context + cncl context.CancelFunc clients []Client } @@ -28,8 +29,7 @@ func (c *ctrl) WithClient(client Client) Controller { } func (c *ctrl) WithContext(ctx context.Context) Controller { - c.ctx = ctx - + c.ctx, c.cncl = context.WithCancel(ctx) return c } @@ -49,6 +49,7 @@ func (c *ctrl) Connect() error { } func (c *ctrl) Close() error { + defer c.cncl() var retErr error for _, client := range c.clients { if err := client.Close(); err != nil { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index 638ebff64..84dd39582 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -55,6 +55,7 @@ var Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) func SetVersion(version, commit string) { ROOT_COMMAND.Version = version + "-" + commit + Viper.SetDefault(VersionKey, version) } func Execute() error { @@ -62,6 +63,22 @@ func Execute() error { return ROOT_COMMAND.Execute() } +func InitConfiguration(version, commit string) { + SetVersion(version, commit) + SetDefaults() + RegisterFlags() + dynamicConfigPath := DynamicConfigFileAbsPath + if runtime.GOOS == "freebsd" { + dynamicConfigPath = DynamicConfigFileAbsFreeBsdPath + } + configPath, err := RegisterConfigFile(dynamicConfigPath, ConfigFileName, ConfigFilePaths()...) + if err != nil { + log.Fatalf("Failed to load configuration file: %v", err) + } + log.Debugf("Configuration file loaded %v", configPath) + Viper.Set(ConfigPathKey, configPath) +} + func SetDefaults() { // CLOUDACCOUNTID DEFAULT Viper.SetDefault(CloudAccountIdKey, Defaults.CloudAccountID) @@ -165,6 +182,7 @@ func GetConfig(clientId string) (*Config, error) { } config := &Config{ + Version: Viper.GetString(VersionKey), Path: Viper.GetString(ConfigPathKey), DynamicConfigPath: Viper.GetString(DynamicConfigPathKey), ClientID: clientId, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index 4c7adfccb..de88138c1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -93,6 +93,8 @@ var ( ) const ( + MODULE = "NGINX-AGENT" + DynamicConfigFileName = "agent-dynamic.conf" DynamicConfigFileAbsPath = "/var/lib/nginx-agent/agent-dynamic.conf" DynamicConfigFileAbsFreeBsdPath = "/var/db/nginx-agent/agent-dynamic.conf" @@ -102,6 +104,7 @@ const ( ConfigPathKey = "path" DynamicConfigPathKey = "dynamic-config-path" + VersionKey = "version" CloudAccountIdKey = "cloudaccountid" LocationKey = "location" DisplayNameKey = "display_name" diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/types.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/types.go index 57497ebe6..a7ba6a7fd 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/types.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/types.go @@ -14,6 +14,7 @@ import ( ) type Config struct { + Version string Path string `yaml:"-"` DynamicConfigPath string `yaml:"-"` ClientID string `mapstructure:"agent_id" yaml:"-"` diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/grpc.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/grpc.go new file mode 100644 index 000000000..0d22474f7 --- /dev/null +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/grpc.go @@ -0,0 +1,75 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package core + +import ( + "context" + "strings" + + "github.com/google/uuid" + "github.com/nginx/agent/sdk/v2/client" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/v2/src/core/config" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func CreateGrpcClients(ctx context.Context, loadedConfig *config.Config) (client.Controller, client.Commander, client.MetricReporter) { + if !loadedConfig.IsGrpcServerConfigured() { + log.Info("GRPC clients not created due to missing server config") + return nil, nil, nil + } + + grpcDialOptions := setDialOptions(loadedConfig) + secureMetricsDialOpts, err := sdkGRPC.SecureDialOptions( + loadedConfig.TLS.Enable, + loadedConfig.TLS.Cert, + loadedConfig.TLS.Key, + loadedConfig.TLS.Ca, + loadedConfig.Server.Metrics, + loadedConfig.TLS.SkipVerify) + if err != nil { + log.Fatalf("Failed to load secure metric gRPC dial options: %v", err) + } + + secureCmdDialOpts, err := sdkGRPC.SecureDialOptions( + loadedConfig.TLS.Enable, + loadedConfig.TLS.Cert, + loadedConfig.TLS.Key, + loadedConfig.TLS.Ca, + loadedConfig.Server.Command, + loadedConfig.TLS.SkipVerify) + if err != nil { + log.Fatalf("Failed to load secure command gRPC dial options: %v", err) + } + + controller := client.NewClientController() + controller.WithContext(ctx) + commander := client.NewCommanderClient() + commander.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) + + commander.WithServer(loadedConfig.Server.Target) + commander.WithDialOptions(append(grpcDialOptions, secureCmdDialOpts)...) + + reporter := client.NewMetricReporterClient() + reporter.WithBackoffSettings(loadedConfig.GetServerBackoffSettings()) + reporter.WithServer(loadedConfig.Server.Target) + reporter.WithDialOptions(append(grpcDialOptions, secureMetricsDialOpts)...) + + controller.WithClient(commander) + controller.WithClient(reporter) + + return controller, commander, reporter +} + +func setDialOptions(loadedConfig *config.Config) []grpc.DialOption { + grpcDialOptions := []grpc.DialOption{grpc.WithUserAgent("nginx-agent/" + strings.TrimPrefix(version, "v"))} + grpcDialOptions = append(grpcDialOptions, sdkGRPC.DefaultClientDialOptions...) + grpcDialOptions = append(grpcDialOptions, sdkGRPC.DataplaneConnectionDialOptions(loadedConfig.Server.Token, sdkGRPC.NewMessageMeta(uuid.NewString()))...) + return grpcDialOptions +} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 9b9bbdead..1ded89e02 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -54,6 +54,15 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { } } +func InitializePipe(ctx context.Context, corePlugins []Plugin, extensionPlugins []ExtensionPlugin, size int) MessagePipeInterface { + pipe := NewMessagePipe(ctx) + err := pipe.Register(size, corePlugins, extensionPlugins) + if err != nil { + log.Warnf("Failed to start agent successfully, error loading plugins %v", err) + } + return pipe +} + func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []ExtensionPlugin) error { p.mu.Lock() diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/signals.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/signals.go new file mode 100644 index 000000000..c8a1ed8d8 --- /dev/null +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/signals.go @@ -0,0 +1,74 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package core + +import ( + "context" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/nginx/agent/sdk/v2/agent/events" + "github.com/nginx/agent/sdk/v2/client" + "github.com/nginx/agent/v2/src/core/config" + log "github.com/sirupsen/logrus" +) + +// handleSignals handles signals to attempt graceful shutdown +// for now it also handles sending the agent stopped event because as of today we don't have a mechanism for synchronizing +// tasks between multiple plugins from outside a plugin +func HandleSignals( + ctx context.Context, + cmder client.Commander, + loadedConfig *config.Config, + env Environment, + pipe MessagePipeInterface, + cancel context.CancelFunc, + controller client.Controller, +) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + select { + case <-sigChan: + event := events.NewAgentEventMeta( + config.MODULE, + version, + strconv.Itoa(os.Getpid()), + env.GetHostname(), + env.GetSystemUUID(), + loadedConfig.InstanceGroup, + loadedConfig.Tags) + + stopCmd := event.GenerateAgentStopEventCommand() + log.Debugf("Sending agent stopped event: %v", stopCmd) + + if cmder == nil { + log.Warn("Command channel not configured. Skipping sending AgentStopped event") + } else if err := cmder.Send(ctx, client.MessageFromCommand(stopCmd)); err != nil { + log.Errorf("Error sending AgentStopped event to command channel: %v", err) + } + + if controller != nil { + if err := controller.Close(); err != nil { + log.Warnf("Unable to close controller: %v", err) + } + } + + log.Warn("NGINX Agent exiting") + cancel() + + timeout := time.Second * 5 + time.Sleep(timeout) + log.Fatalf("Failed to gracefully shutdown within timeout of %v. Exiting", timeout) + case <-ctx.Done(): + } + }() +} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go index 994c952a1..8ce701833 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go @@ -100,7 +100,7 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { } func (c *Commander) agentRegistered(cmd *proto.Command) { - switch commandData := cmd.Data.(type) { + switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: log.Infof("config command %v", commandData) @@ -112,7 +112,7 @@ func (c *Commander) agentRegistered(cmd *proto.Command) { } default: - log.Debugf("unhandled command: %T", cmd.Data) + log.Debugf("unhandled command: %T", cmd.GetData()) } } @@ -152,24 +152,24 @@ func (c *Commander) dispatchLoop() { log.Debugf("Command msg from data plane: %v", cmd) var topic string - switch cmd.Data.(type) { + switch cmd.GetData().(type) { case *proto.Command_NginxConfig, *proto.Command_NginxConfigResponse: topic = core.CommNginxConfig case *proto.Command_AgentConnectRequest, *proto.Command_AgentConnectResponse: topic = core.AgentConnected case *proto.Command_AgentConfigRequest, *proto.Command_AgentConfig: - log.Debugf("agent config %T command data type received and ignored", cmd.Data) + log.Debugf("agent config %T command data type received and ignored", cmd.GetData()) topic = core.AgentConfig case *proto.Command_CmdStatus: - data := cmd.Data.(*proto.Command_CmdStatus) + data := cmd.GetData().(*proto.Command_CmdStatus) if data.CmdStatus.Status != proto.CommandStatusResponse_CMD_OK { - log.Debugf("command status %T :: %+v", cmd.Data, cmd.Data) + log.Debugf("command status %T :: %+v", cmd.GetData(), cmd.GetData()) } topic = core.UNKNOWN continue default: - if cmd.Data != nil { - log.Infof("unknown %T command data type received", cmd.Data) + if cmd.GetData() != nil { + log.Infof("unknown %T command data type received", cmd.GetData()) } topic = core.UNKNOWN continue diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go new file mode 100644 index 000000000..045fa1709 --- /dev/null +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go @@ -0,0 +1,116 @@ +package plugins + +import ( + "github.com/nginx/agent/sdk/v2/client" + "github.com/nginx/agent/v2/src/core" + "github.com/nginx/agent/v2/src/core/config" + "github.com/nginx/agent/v2/src/extensions" + log "github.com/sirupsen/logrus" + + agent_config "github.com/nginx/agent/sdk/v2/agent/config" + + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + + "github.com/google/uuid" +) + +func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.Environment, reporter client.MetricReporter, loadedConfig *config.Config) ([]core.Plugin, []core.ExtensionPlugin) { + var corePlugins []core.Plugin + var extensionPlugins []core.ExtensionPlugin + + if commander != nil { + corePlugins = append(corePlugins, + NewCommander(commander, loadedConfig), + ) + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureFileWatcher) { + corePlugins = append(corePlugins, + NewFileWatcher(loadedConfig, env), + NewFileWatchThrottle(), + ) + } + } + + if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil { + corePlugins = append(corePlugins, + NewMetricsSender(reporter), + ) + } + + corePlugins = append(corePlugins, + NewConfigReader(loadedConfig), + NewNginx(commander, binary, env, loadedConfig), + NewExtensions(loadedConfig, env), + NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version), + ) + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) { + corePlugins = append(corePlugins, NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()))) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || + (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { + corePlugins = append(corePlugins, NewMetrics(loadedConfig, env, binary)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { + corePlugins = append(corePlugins, NewMetricsThrottle(loadedConfig, env)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { + corePlugins = append(corePlugins, NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureProcessWatcher) { + corePlugins = append(corePlugins, NewProcessWatcher(env, binary)) + } + + if loadedConfig.IsFeatureEnabled(agent_config.FeatureActivityEvents) { + corePlugins = append(corePlugins, NewEvents(loadedConfig, env, sdkGRPC.NewMessageMeta(uuid.NewString()), binary)) + } + + if loadedConfig.AgentAPI.Port != 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureAgentAPI) { + corePlugins = append(corePlugins, NewAgentAPI(loadedConfig, env, binary)) + } else { + log.Info("Agent API not configured") + } + + if len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting) { + corePlugins = append(corePlugins, NewNginxCounter(loadedConfig, binary, env)) + } + + if loadedConfig.Extensions != nil && len(loadedConfig.Extensions) > 0 { + for _, extension := range loadedConfig.Extensions { + switch { + case extension == agent_config.AdvancedMetricsExtensionPlugin: + advancedMetricsExtensionPlugin := extensions.NewAdvancedMetrics(env, loadedConfig, config.Viper.Get(agent_config.AdvancedMetricsExtensionPluginConfigKey)) + extensionPlugins = append(extensionPlugins, advancedMetricsExtensionPlugin) + case extension == agent_config.NginxAppProtectExtensionPlugin: + nginxAppProtectExtensionPlugin, err := extensions.NewNginxAppProtect(loadedConfig, env, config.Viper.Get(agent_config.NginxAppProtectExtensionPluginConfigKey)) + if err != nil { + log.Errorf("Unable to load the Nginx App Protect plugin due to the following error: %v", err) + } else { + extensionPlugins = append(extensionPlugins, nginxAppProtectExtensionPlugin) + } + case extension == agent_config.NginxAppProtectMonitoringExtensionPlugin: + nginxAppProtectMonitoringExtensionPlugin, err := extensions.NewNAPMonitoring(env, loadedConfig, config.Viper.Get(agent_config.NginxAppProtectMonitoringExtensionPluginConfigKey)) + if err != nil { + log.Errorf("Unable to load the Nginx App Protect Monitoring plugin due to the following error: %v", err) + } else { + extensionPlugins = append(extensionPlugins, nginxAppProtectMonitoringExtensionPlugin) + } + case extension == agent_config.PhpFpmMetricsExtensionPlugin: + phpFpmMetricstExtensionPlugin, err := extensions.NewPhpFpmMetrics(env, loadedConfig) + if err != nil { + log.Errorf("Unable to load the PhpFpm Metrics plugin due to the following error: %v", err) + } else { + extensionPlugins = append(extensionPlugins, phpFpmMetricstExtensionPlugin) + } + default: + log.Warnf("unknown extension configured: %s", extension) + } + } + } + + return corePlugins, extensionPlugins +} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 2160e50e7..d1aef6ae7 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -48,7 +48,7 @@ const ( defaultMinInterval = time.Second * 30 ) -func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment, version string) *DataPlaneStatus { +func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment) *DataPlaneStatus { log.Tracef("Dataplane status interval %s", config.Dataplane.Status.PollInterval) pollInt := config.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { @@ -62,7 +62,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core meta: meta, binary: binary, env: env, - version: version, + version: config.Version, tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/events.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/events.go index 2050e5353..56404ce31 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/events.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/events.go @@ -10,26 +10,20 @@ package plugins import ( "context" "fmt" - "strings" "github.com/gogo/protobuf/types" "github.com/google/uuid" log "github.com/sirupsen/logrus" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/sdk/v2/agent/events" "github.com/nginx/agent/sdk/v2/proto" - commonProto "github.com/nginx/agent/sdk/v2/proto/common" eventsProto "github.com/nginx/agent/sdk/v2/proto/events" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" ) const ( - MODULE = "NGINX-AGENT" - - AGENT_START_MESSAGE = "nginx-agent %s started on %s with pid %s" - AGENT_STOP_MESSAGE = "nginx-agent %s (pid: %s) stopped on %s" NGINX_FOUND_MESSAGE = "nginx-v%s master process was found with a pid %s" NGINX_STOP_MESSAGE = "nginx-v%s master process (pid: %s) stopped" NGINX_RELOAD_SUCCESS_MESSAGE = "nginx-v%s master process (pid: %s) reloaded successfully" @@ -40,31 +34,16 @@ const ( CONFIG_APPLY_FAILURE_MESSAGE = "failed to apply nginx config on %s" CONFIG_ROLLBACK_SUCCESS_MESSAGE = "nginx config was rolled back on %s" CONFIG_ROLLBACK_FAILURE_MESSAGE = "failed to rollback nginx config on %s" - - // Types - NGINX_EVENT_TYPE = "Nginx" - AGENT_EVENT_TYPE = "Agent" - - // Categories - STATUS_CATEGORY = "Status" - CONFIG_CATEGORY = "Config" - APP_PROTECT_CATEGORY = "AppProtect" - - // Event Levels - INFO_EVENT_LEVEL = "INFO" - DEBUG_EVENT_LEVEL = "DEBUG" - WARN_EVENT_LEVEL = "WARN" - ERROR_EVENT_LEVEL = "ERROR" - CRITICAL_EVENT_LEVEL = "CRITICAL" ) type Events struct { - pipeline core.MessagePipeInterface - ctx context.Context - conf *config.Config - env core.Environment - meta *proto.Metadata - nginxBinary core.NginxBinary + pipeline core.MessagePipeInterface + ctx context.Context + conf *config.Config + env core.Environment + meta *proto.Metadata + nginxBinary core.NginxBinary + agentEventsMeta *events.AgentEventMeta } func NewEvents(conf *config.Config, env core.Environment, meta *proto.Metadata, nginxBinary core.NginxBinary) *Events { @@ -130,29 +109,17 @@ func (a *Events) Subscriptions() []string { } func (a *Events) sendAgentStartedEvent(msg *core.Message) { - agentEventMeta, ok := msg.Data().(*AgentEventMeta) + agentEventMeta, ok := msg.Data().(*events.AgentEventMeta) if !ok { log.Warnf("Invalid message received, %T, for topic, %s", msg.Data(), msg.Topic()) return } - event := a.createAgentEvent( - types.TimestampNow(), - INFO_EVENT_LEVEL, - fmt.Sprintf(AGENT_START_MESSAGE, agentEventMeta.version, a.env.GetHostname(), agentEventMeta.pid), - uuid.NewString(), - ) + event := agentEventMeta.GenerateAgentStartEventCommand() log.Debugf("Created event: %v", event) - a.pipeline.Process(core.NewMessage(core.Events, &proto.Command{ - Meta: a.meta, - Type: proto.Command_NORMAL, - Data: &proto.Command_EventReport{ - EventReport: &eventsProto.EventReport{ - Events: []*eventsProto.Event{event}, - }, - }, - })) + a.pipeline.Process(core.NewMessage(core.Events, event)) + a.agentEventsMeta = agentEventMeta } func (a *Events) sendNingxFoundEvent(msg *core.Message) { @@ -162,19 +129,19 @@ func (a *Events) sendNingxFoundEvent(msg *core.Message) { return } - events := []*eventsProto.Event{} + protoEvents := []*eventsProto.Event{} for _, nginxDetail := range nginxDetailsMap { event := a.createNginxEvent( nginxDetail.GetNginxId(), &types.Timestamp{Seconds: nginxDetail.GetStartTime() / 1000, Nanos: int32(nginxDetail.GetStartTime() % 1000)}, - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_FOUND_MESSAGE, nginxDetail.GetVersion(), nginxDetail.GetProcessId()), uuid.NewString(), ) log.Debugf("Created event: %v", event) - events = append(events, event) + protoEvents = append(protoEvents, event) } a.pipeline.Process(core.NewMessage(core.Events, &proto.Command{ @@ -182,7 +149,7 @@ func (a *Events) sendNingxFoundEvent(msg *core.Message) { Type: proto.Command_NORMAL, Data: &proto.Command_EventReport{ EventReport: &eventsProto.EventReport{ - Events: events, + Events: protoEvents, }, }, })) @@ -200,7 +167,7 @@ func (a *Events) sendNginxReloadEvent(msg *core.Message) { event = a.createNginxEvent( nginxReload.nginxDetails.GetNginxId(), nginxReload.timestamp, - WARN_EVENT_LEVEL, + events.WARN_EVENT_LEVEL, fmt.Sprintf(NGINX_RELOAD_SUCCESS_MESSAGE, nginxReload.nginxDetails.GetVersion(), nginxReload.nginxDetails.GetProcessId()), nginxReload.correlationId, ) @@ -208,7 +175,7 @@ func (a *Events) sendNginxReloadEvent(msg *core.Message) { event = a.createNginxEvent( nginxReload.nginxDetails.GetNginxId(), nginxReload.timestamp, - ERROR_EVENT_LEVEL, + events.ERROR_EVENT_LEVEL, fmt.Sprintf(NGINX_RELOAD_FAILED_MESSAGE, nginxReload.nginxDetails.GetVersion(), nginxReload.nginxDetails.GetProcessId()), nginxReload.correlationId, ) @@ -249,7 +216,7 @@ func (a *Events) sendConfigApplyEvent(msg *core.Message) { event = a.createConfigApplyEvent( nginxConfigResponse.GetConfigData().GetNginxId(), command.GetMeta().GetTimestamp(), - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(CONFIG_APPLY_SUCCESS_MESSAGE, a.env.GetHostname()), command.Meta.GetMessageId(), ) @@ -257,7 +224,7 @@ func (a *Events) sendConfigApplyEvent(msg *core.Message) { event = a.createConfigApplyEvent( nginxConfigResponse.GetConfigData().GetNginxId(), command.GetMeta().GetTimestamp(), - ERROR_EVENT_LEVEL, + events.ERROR_EVENT_LEVEL, fmt.Sprintf(CONFIG_APPLY_FAILURE_MESSAGE, a.env.GetHostname()), command.Meta.GetMessageId(), ) @@ -291,7 +258,7 @@ func (a *Events) sendConfigRollbackEvent(msg *core.Message) { event = a.createConfigApplyEvent( configRollbackResponse.nginxDetails.GetNginxId(), configRollbackResponse.timestamp, - WARN_EVENT_LEVEL, + events.WARN_EVENT_LEVEL, fmt.Sprintf(CONFIG_ROLLBACK_SUCCESS_MESSAGE, a.env.GetHostname()), configRollbackResponse.correlationId, ) @@ -299,7 +266,7 @@ func (a *Events) sendConfigRollbackEvent(msg *core.Message) { event = a.createConfigApplyEvent( configRollbackResponse.nginxDetails.GetNginxId(), configRollbackResponse.timestamp, - ERROR_EVENT_LEVEL, + events.ERROR_EVENT_LEVEL, fmt.Sprintf(CONFIG_ROLLBACK_FAILURE_MESSAGE, a.env.GetHostname()), configRollbackResponse.correlationId, ) @@ -328,7 +295,7 @@ func (a *Events) sendNginxStartEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), &types.Timestamp{Seconds: nginxDetails.GetStartTime() / 1000, Nanos: int32(nginxDetails.GetStartTime() % 1000)}, - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_FOUND_MESSAGE, nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -355,7 +322,7 @@ func (a *Events) sendNginxStopEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), types.TimestampNow(), - WARN_EVENT_LEVEL, + events.WARN_EVENT_LEVEL, fmt.Sprintf(NGINX_STOP_MESSAGE, nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -382,7 +349,7 @@ func (a *Events) sendNginxWorkerStartEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), &types.Timestamp{Seconds: nginxDetails.GetStartTime() / 1000, Nanos: int32(nginxDetails.GetStartTime() % 1000)}, - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_WORKER_START_MESSAGE, nginxDetails.GetProcessId(), nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -409,7 +376,7 @@ func (a *Events) sendNginxWorkerStopEvent(msg *core.Message) { event := a.createNginxEvent( nginxDetails.GetNginxId(), types.TimestampNow(), - INFO_EVENT_LEVEL, + events.INFO_EVENT_LEVEL, fmt.Sprintf(NGINX_WORKER_STOP_MESSAGE, nginxDetails.GetProcessId(), nginxDetails.GetVersion(), nginxDetails.GetProcessId()), uuid.NewString(), ) @@ -427,17 +394,17 @@ func (a *Events) sendNginxWorkerStopEvent(msg *core.Message) { } func (e *Events) createNginxEvent(nginxId string, timestamp *types.Timestamp, level string, message string, correlationId string) *eventsProto.Event { - activityEvent := e.createActivityEvent(message, nginxId) + activityEvent := e.agentEventsMeta.CreateActivityEvent(message, nginxId) return &eventsProto.Event{ Metadata: &eventsProto.Metadata{ UUID: uuid.NewString(), CorrelationID: correlationId, - Module: MODULE, + Module: config.MODULE, Timestamp: timestamp, EventLevel: level, - Type: NGINX_EVENT_TYPE, - Category: STATUS_CATEGORY, + Type: events.NGINX_EVENT_TYPE, + Category: events.STATUS_CATEGORY, }, Data: &eventsProto.Event_ActivityEvent{ ActivityEvent: activityEvent, @@ -446,131 +413,20 @@ func (e *Events) createNginxEvent(nginxId string, timestamp *types.Timestamp, le } func (e *Events) createConfigApplyEvent(nginxId string, timestamp *types.Timestamp, level string, message string, correlationId string) *eventsProto.Event { - activityEvent := e.createActivityEvent(message, nginxId) - - return &eventsProto.Event{ - Metadata: &eventsProto.Metadata{ - UUID: uuid.NewString(), - CorrelationID: correlationId, - Module: MODULE, - Timestamp: timestamp, - EventLevel: level, - Type: AGENT_EVENT_TYPE, - Category: CONFIG_CATEGORY, - }, - Data: &eventsProto.Event_ActivityEvent{ - ActivityEvent: activityEvent, - }, - } -} - -func (e *Events) createAgentEvent(timestamp *types.Timestamp, level string, message string, correlationId string) *eventsProto.Event { - activityEvent := e.createActivityEvent(message, "") // blank nginxId, this relates to agent not it's nginx instances + activityEvent := e.agentEventsMeta.CreateActivityEvent(message, nginxId) return &eventsProto.Event{ Metadata: &eventsProto.Metadata{ UUID: uuid.NewString(), CorrelationID: correlationId, - Module: MODULE, + Module: config.MODULE, Timestamp: timestamp, EventLevel: level, - Type: AGENT_EVENT_TYPE, - Category: STATUS_CATEGORY, - }, - Data: &eventsProto.Event_ActivityEvent{ - ActivityEvent: activityEvent, - }, - } -} - -func (e *Events) createActivityEvent(message string, nginxId string) *eventsProto.ActivityEvent { - activityEvent := &eventsProto.ActivityEvent{ - Message: message, - Dimensions: []*commonProto.Dimension{ - { - Name: "system_id", - Value: e.env.GetSystemUUID(), - }, - { - Name: "hostname", - Value: e.env.GetHostname(), - }, - { - Name: "instance_group", - Value: e.conf.InstanceGroup, - }, - { - Name: "system.tags", - Value: strings.Join(e.conf.Tags, ","), - }, - }, - } - - if nginxId != "" { - nginxDim := []*commonProto.Dimension{{Name: "nginx_id", Value: nginxId}} - activityEvent.Dimensions = append(nginxDim, activityEvent.Dimensions...) - } - - return activityEvent -} - -type AgentEventMeta struct { - version string - pid string -} - -func NewAgentEventMeta(version string, pid string) *AgentEventMeta { - return &AgentEventMeta{ - version: version, - pid: pid, - } -} - -func GenerateAgentStopEventCommand(agentEvent *AgentEventMeta, conf *config.Config, env core.Environment) *proto.Command { - activityEvent := &eventsProto.ActivityEvent{ - Message: fmt.Sprintf(AGENT_STOP_MESSAGE, agentEvent.version, agentEvent.pid, env.GetHostname()), - Dimensions: []*commonProto.Dimension{ - { - Name: "system_id", - Value: env.GetSystemUUID(), - }, - { - Name: "hostname", - Value: env.GetHostname(), - }, - { - Name: "instance_group", - Value: conf.InstanceGroup, - }, - { - Name: "system.tags", - Value: strings.Join(conf.Tags, ","), - }, - }, - } - - event := &eventsProto.Event{ - Metadata: &eventsProto.Metadata{ - UUID: uuid.NewString(), - CorrelationID: uuid.NewString(), - Module: MODULE, - Timestamp: types.TimestampNow(), - EventLevel: WARN_EVENT_LEVEL, - Type: AGENT_EVENT_TYPE, - Category: STATUS_CATEGORY, + Type: events.AGENT_EVENT_TYPE, + Category: events.CONFIG_CATEGORY, }, Data: &eventsProto.Event_ActivityEvent{ ActivityEvent: activityEvent, }, } - - return &proto.Command{ - Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), - Type: proto.Command_NORMAL, - Data: &proto.Command_EventReport{ - EventReport: &eventsProto.EventReport{ - Events: []*eventsProto.Event{event}, - }, - }, - } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index 7a17686a3..51f1867a2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -210,7 +210,7 @@ func (f *Features) enableRegistrationFeature(data string) []core.Plugin { } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.version) + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString())) return []core.Plugin{registration} } @@ -225,7 +225,7 @@ func (f *Features) enableDataPlaneStatusFeature(data string) []core.Plugin { } f.conf = conf - dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env, f.version) + dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env) return []core.Plugin{dataPlaneStatus} } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go index 8d94b736d..c57209fc0 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go @@ -49,13 +49,12 @@ func NewOneTimeRegistration( binary core.NginxBinary, env core.Environment, meta *proto.Metadata, - version string, ) *OneTimeRegistration { // this might be slow so do on startup - host := env.NewHostInfo(version, &config.Tags, config.ConfigDirs, true) + host := env.NewHostInfo(config.Version, &config.Tags, config.ConfigDirs, true) return &OneTimeRegistration{ tags: &config.Tags, - agentVersion: version, + agentVersion: config.Version, meta: meta, config: config, env: env, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go index 58fa133dd..a9bae0bb6 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go @@ -46,6 +46,10 @@ func GetMockAgentConfig() *config.Config { CollectionInterval: 1, Mode: "aggregated", }, + Server: config.Server{ + Host: "127.0.0.1", + GrpcPort: 67890, + }, } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index 6980a98a7..53b2a9779 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -124,8 +124,8 @@ func (m *MockNginxBinary) UpdateNginxDetailsFromProcesses(nginxProcesses []*core } func (m *MockNginxBinary) GetNginxIDForProcess(nginxProcess *core.Process) string { - args := m.Called(nginxProcess) - return args.String(0) + m.Called(nginxProcess) + return nginxProcess.Name } func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) *proto.NginxDetails { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/process.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/process.go index dfd405618..6c8c5f61f 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/process.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/process.go @@ -5,6 +5,8 @@ import ( "os" "os/exec" "time" + + "github.com/nginx/agent/sdk/v2/proto" ) // StartFakeProcesses creates a fake process for each of the string names and @@ -29,3 +31,13 @@ func StartFakeProcesses(names []string, fakeProcsDuration string) func() { } } } + +func GetProcessMap() map[string][]*proto.NginxDetails { + return map[string][]*proto.NginxDetails{ + "12345": { + { + ProcessId: "1", + }, + }, + } +} diff --git a/test/performance/vendor/modules.txt b/test/performance/vendor/modules.txt index a97ea9fa2..0948ac12c 100644 --- a/test/performance/vendor/modules.txt +++ b/test/performance/vendor/modules.txt @@ -128,6 +128,7 @@ github.com/nats-io/nuid ## explicit; go 1.21 github.com/nginx/agent/sdk/v2 github.com/nginx/agent/sdk/v2/agent/config +github.com/nginx/agent/sdk/v2/agent/events github.com/nginx/agent/sdk/v2/backoff github.com/nginx/agent/sdk/v2/checksum github.com/nginx/agent/sdk/v2/client diff --git a/test/utils/agent_config.go b/test/utils/agent_config.go index 58fa133dd..a9bae0bb6 100644 --- a/test/utils/agent_config.go +++ b/test/utils/agent_config.go @@ -46,6 +46,10 @@ func GetMockAgentConfig() *config.Config { CollectionInterval: 1, Mode: "aggregated", }, + Server: config.Server{ + Host: "127.0.0.1", + GrpcPort: 67890, + }, } } diff --git a/test/utils/nginx.go b/test/utils/nginx.go index 6980a98a7..53b2a9779 100644 --- a/test/utils/nginx.go +++ b/test/utils/nginx.go @@ -124,8 +124,8 @@ func (m *MockNginxBinary) UpdateNginxDetailsFromProcesses(nginxProcesses []*core } func (m *MockNginxBinary) GetNginxIDForProcess(nginxProcess *core.Process) string { - args := m.Called(nginxProcess) - return args.String(0) + m.Called(nginxProcess) + return nginxProcess.Name } func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) *proto.NginxDetails { diff --git a/test/utils/process.go b/test/utils/process.go index dfd405618..6c8c5f61f 100644 --- a/test/utils/process.go +++ b/test/utils/process.go @@ -5,6 +5,8 @@ import ( "os" "os/exec" "time" + + "github.com/nginx/agent/sdk/v2/proto" ) // StartFakeProcesses creates a fake process for each of the string names and @@ -29,3 +31,13 @@ func StartFakeProcesses(names []string, fakeProcsDuration string) func() { } } } + +func GetProcessMap() map[string][]*proto.NginxDetails { + return map[string][]*proto.NginxDetails{ + "12345": { + { + ProcessId: "1", + }, + }, + } +} diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go b/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go new file mode 100644 index 000000000..a6943f3a4 --- /dev/null +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/events/meta.go @@ -0,0 +1,203 @@ +/** + * Copyright (c) F5, Inc. + * + * This source code is licensed under the Apache License, Version 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +package events + +import ( + "fmt" + "strings" + + "github.com/gogo/protobuf/types" + "github.com/google/uuid" + sdkGRPC "github.com/nginx/agent/sdk/v2/grpc" + "github.com/nginx/agent/sdk/v2/proto" + commonProto "github.com/nginx/agent/sdk/v2/proto/common" + eventsProto "github.com/nginx/agent/sdk/v2/proto/events" +) + +type AgentEventMeta struct { + module string + version string + pid string + hostname string + systemUuid string + instanceGroup string + tags string + tagsRaw []string +} + +func NewAgentEventMeta( + module, version, pid, hostname, systemUuid, instanceGroup string, + tags []string, +) *AgentEventMeta { + return &AgentEventMeta{ + module: module, + version: version, + pid: pid, + hostname: hostname, + systemUuid: systemUuid, + instanceGroup: instanceGroup, + tagsRaw: tags, + tags: strings.Join(tags, ","), + } +} + +func (aem *AgentEventMeta) GetVersion() string { + return aem.version +} + +func (aem *AgentEventMeta) GetPid() string { + return aem.pid +} + +func (aem *AgentEventMeta) GenerateAgentStartEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_START_MESSAGE, aem.version, aem.hostname, aem.pid), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) GenerateAgentStopEventCommand() *proto.Command { + activityEvent := &eventsProto.ActivityEvent{ + Message: fmt.Sprintf(AGENT_STOP_MESSAGE, aem.version, aem.pid, aem.hostname), + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + event := &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: uuid.NewString(), + Module: aem.module, + Timestamp: types.TimestampNow(), + EventLevel: WARN_EVENT_LEVEL, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } + + return &proto.Command{ + Meta: sdkGRPC.NewMessageMeta(uuid.NewString()), + Type: proto.Command_NORMAL, + Data: &proto.Command_EventReport{ + EventReport: &eventsProto.EventReport{ + Events: []*eventsProto.Event{event}, + }, + }, + } +} + +func (aem *AgentEventMeta) CreateAgentEvent(timestamp *types.Timestamp, level, message, correlationId, module string) *eventsProto.Event { + activityEvent := aem.CreateActivityEvent(message, "") // blank nginxId, this relates to agent not it's nginx instances + + return &eventsProto.Event{ + Metadata: &eventsProto.Metadata{ + UUID: uuid.NewString(), + CorrelationID: correlationId, + Module: module, + Timestamp: timestamp, + EventLevel: level, + Type: AGENT_EVENT_TYPE, + Category: STATUS_CATEGORY, + }, + Data: &eventsProto.Event_ActivityEvent{ + ActivityEvent: activityEvent, + }, + } +} + +func (aem *AgentEventMeta) CreateActivityEvent(message string, nginxId string) *eventsProto.ActivityEvent { + activityEvent := &eventsProto.ActivityEvent{ + Message: message, + Dimensions: []*commonProto.Dimension{ + { + Name: "system_id", + Value: aem.systemUuid, + }, + { + Name: "hostname", + Value: aem.hostname, + }, + { + Name: "instance_group", + Value: aem.instanceGroup, + }, + { + Name: "system.tags", + Value: aem.tags, + }, + }, + } + + if nginxId != "" { + nginxDim := []*commonProto.Dimension{{Name: "nginx_id", Value: nginxId}} + activityEvent.Dimensions = append(nginxDim, activityEvent.Dimensions...) + } + + return activityEvent +} diff --git a/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go b/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go new file mode 100644 index 000000000..8f55f5d34 --- /dev/null +++ b/vendor/github.com/nginx/agent/sdk/v2/agent/events/types.go @@ -0,0 +1,23 @@ +package events + +const ( + // Types + NGINX_EVENT_TYPE = "Nginx" + AGENT_EVENT_TYPE = "Agent" + + // Categories + STATUS_CATEGORY = "Status" + CONFIG_CATEGORY = "Config" + APP_PROTECT_CATEGORY = "AppProtect" + + // Event Levels + INFO_EVENT_LEVEL = "INFO" + DEBUG_EVENT_LEVEL = "DEBUG" + WARN_EVENT_LEVEL = "WARN" + ERROR_EVENT_LEVEL = "ERROR" + CRITICAL_EVENT_LEVEL = "CRITICAL" + + // Messages + AGENT_START_MESSAGE = "nginx-agent %s started on %s with pid %s" + AGENT_STOP_MESSAGE = "nginx-agent %s (pid: %s) stopped on %s" +) diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/controller.go b/vendor/github.com/nginx/agent/sdk/v2/client/controller.go index b89efd01b..7117269d0 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/controller.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/controller.go @@ -18,6 +18,7 @@ func NewClientController() Controller { type ctrl struct { ctx context.Context + cncl context.CancelFunc clients []Client } @@ -28,8 +29,7 @@ func (c *ctrl) WithClient(client Client) Controller { } func (c *ctrl) WithContext(ctx context.Context) Controller { - c.ctx = ctx - + c.ctx, c.cncl = context.WithCancel(ctx) return c } @@ -49,6 +49,7 @@ func (c *ctrl) Connect() error { } func (c *ctrl) Close() error { + defer c.cncl() var retErr error for _, client := range c.clients { if err := client.Close(); err != nil { diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 95a30461e..8ad40b98c 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -25,14 +25,13 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/nginx/agent/sdk/v2/backoff" filesSDK "github.com/nginx/agent/sdk/v2/files" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/sdk/v2/zip" crossplane "github.com/nginxinc/nginx-go-crossplane" + log "github.com/sirupsen/logrus" ) const ( diff --git a/vendor/modules.txt b/vendor/modules.txt index 7fe7e4223..2e164272a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1122,6 +1122,7 @@ github.com/nbutton23/zxcvbn-go/utils/math ## explicit; go 1.21 github.com/nginx/agent/sdk/v2 github.com/nginx/agent/sdk/v2/agent/config +github.com/nginx/agent/sdk/v2/agent/events github.com/nginx/agent/sdk/v2/backoff github.com/nginx/agent/sdk/v2/checksum github.com/nginx/agent/sdk/v2/client