Skip to content

Commit

Permalink
Performance tests for loading of plugins and different feature combin…
Browse files Browse the repository at this point in the history
…ations (#463)

* Trying to move code around so it's easier to test the main.go functionality in terms of performance. Added performance tests around plugin combinations.
  • Loading branch information
oliveromahony authored Sep 11, 2023
1 parent a668cb1 commit 26e43d6
Show file tree
Hide file tree
Showing 67 changed files with 2,341 additions and 694 deletions.
255 changes: 16 additions & 239 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,19 @@ package main
import (
"context"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"
"time"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/sdk/v2/client"
"github.com/nginx/agent/sdk/v2/agent/events"

sdkGRPC "github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/logger"
"github.com/nginx/agent/v2/src/extensions"
"github.com/nginx/agent/v2/src/plugins"

"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)

var (
Expand All @@ -39,19 +32,7 @@ var (
)

func init() {
config.SetVersion(version, commit)
config.SetDefaults()
config.RegisterFlags()
dynamicConfigPath := config.DynamicConfigFileAbsPath
if runtime.GOOS == "freebsd" {
dynamicConfigPath = config.DynamicConfigFileAbsFreeBsdPath
}
configPath, err := config.RegisterConfigFile(dynamicConfigPath, config.ConfigFileName, config.ConfigFilePaths()...)
if err != nil {
log.Fatalf("Failed to load configuration file: %v", err)
}
log.Debugf("Configuration file loaded %v", configPath)
config.Viper.Set(config.ConfigPathKey, configPath)
config.InitConfiguration(version, commit)
}

func main() {
Expand Down Expand Up @@ -83,7 +64,7 @@ func main() {
version, commit, os.Getpid(), loadedConfig.ClientID, loadedConfig.DisplayName, loadedConfig.Features)
sdkGRPC.InitMeta(loadedConfig.ClientID, loadedConfig.CloudAccountID)

controller, commander, reporter := createGrpcClients(ctx, loadedConfig)
controller, commander, reporter := core.CreateGrpcClients(ctx, loadedConfig)

if controller != nil {
if err := controller.Connect(); err != nil {
Expand All @@ -94,15 +75,21 @@ func main() {

binary := core.NewNginxBinary(env, loadedConfig)

corePlugins, extensionPlugins := loadPlugins(commander, binary, env, reporter, loadedConfig)
corePlugins, extensionPlugins := plugins.LoadPlugins(commander, binary, env, reporter, loadedConfig)

pipe := initializeMessagePipe(ctx, corePlugins, extensionPlugins)
pipe := core.InitializePipe(ctx, corePlugins, extensionPlugins, agent_config.DefaultPluginSize)

pipe.Process(core.NewMessage(core.AgentStarted,
plugins.NewAgentEventMeta(version, strconv.Itoa(os.Getpid()))),
)
event := events.NewAgentEventMeta(
config.MODULE,
version,
strconv.Itoa(os.Getpid()),
env.GetHostname(),
env.GetSystemUUID(),
loadedConfig.InstanceGroup,
loadedConfig.Tags)

handleSignals(ctx, commander, loadedConfig, env, pipe, cancel, controller)
pipe.Process(core.NewMessage(core.AgentStarted, event))
core.HandleSignals(ctx, commander, loadedConfig, env, pipe, cancel, controller)

pipe.Run()
})
Expand All @@ -111,213 +98,3 @@ func main() {
log.Fatal(err)
}
}

// handleSignals handles signals to attempt graceful shutdown
// for now it also handles sending the agent stopped event because as of today we don't have a mechanism for synchronizing
// tasks between multiple plugins from outside a plugin
func handleSignals(
ctx context.Context,
cmder client.Commander,
loadedConfig *config.Config,
env core.Environment,
pipe core.MessagePipeInterface,
cancel context.CancelFunc,
controller client.Controller,
) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case <-sigChan:
stopCmd := plugins.GenerateAgentStopEventCommand(
plugins.NewAgentEventMeta(version, strconv.Itoa(os.Getpid())), loadedConfig, env,
)
log.Debugf("Sending agent stopped event: %v", stopCmd)

if cmder == nil {
log.Warn("Command channel not configured. Skipping sending AgentStopped event")
} else if err := cmder.Send(ctx, client.MessageFromCommand(stopCmd)); err != nil {
log.Errorf("Error sending AgentStopped event to command channel: %v", err)
}

if controller != nil {
if err := controller.Close(); err != nil {
log.Warnf("Unable to close controller: %v", err)
}
}

log.Warn("NGINX Agent exiting")
cancel()

timeout := time.Second * 5
time.Sleep(timeout)
log.Fatalf("Failed to gracefully shutdown within timeout of %v. Exiting", timeout)
case <-ctx.Done():
}
}()
}

func createGrpcClients(ctx context.Context, loadedConfig *config.Config) (client.Controller, client.Commander, client.MetricReporter) {
if !loadedConfig.IsGrpcServerConfigured() {
log.Info("GRPC clients not created due to missing server config")
return nil, nil, nil
}

grpcDialOptions := setDialOptions(loadedConfig)
secureMetricsDialOpts, err := sdkGRPC.SecureDialOptions(
loadedConfig.TLS.Enable,
loadedConfig.TLS.Cert,
loadedConfig.TLS.Key,
loadedConfig.TLS.Ca,
loadedConfig.Server.Metrics,
loadedConfig.TLS.SkipVerify)
if err != nil {
log.Fatalf("Failed to load secure metric gRPC dial options: %v", err)
}

secureCmdDialOpts, err := sdkGRPC.SecureDialOptions(
loadedConfig.TLS.Enable,
loadedConfig.TLS.Cert,
loadedConfig.TLS.Key,
loadedConfig.TLS.Ca,
loadedConfig.Server.Command,
loadedConfig.TLS.SkipVerify)
if err != nil {
log.Fatalf("Failed to load secure command gRPC dial options: %v", err)
}

controller := client.NewClientController()
controller.WithContext(ctx)
commander := client.NewCommanderClient()
commander.WithBackoffSettings(loadedConfig.GetServerBackoffSettings())

commander.WithServer(loadedConfig.Server.Target)
commander.WithDialOptions(append(grpcDialOptions, secureCmdDialOpts)...)

reporter := client.NewMetricReporterClient()
reporter.WithBackoffSettings(loadedConfig.GetServerBackoffSettings())
reporter.WithServer(loadedConfig.Server.Target)
reporter.WithDialOptions(append(grpcDialOptions, secureMetricsDialOpts)...)

controller.WithClient(commander)
controller.WithClient(reporter)

return controller, commander, reporter
}

func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env *core.EnvironmentType, reporter client.MetricReporter, loadedConfig *config.Config) ([]core.Plugin, []core.ExtensionPlugin) {
var corePlugins []core.Plugin
var extensionPlugins []core.ExtensionPlugin

if commander != nil {
corePlugins = append(corePlugins,
plugins.NewCommander(commander, loadedConfig),
)

if loadedConfig.IsFeatureEnabled(agent_config.FeatureFileWatcher) {
corePlugins = append(corePlugins,
plugins.NewFileWatcher(loadedConfig, env),
plugins.NewFileWatchThrottle(),
)
}
}

if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil {
corePlugins = append(corePlugins,
plugins.NewMetricsSender(reporter),
)
}

corePlugins = append(corePlugins,
plugins.NewConfigReader(loadedConfig),
plugins.NewNginx(commander, binary, env, loadedConfig),
plugins.NewExtensions(loadedConfig, env),
plugins.NewFeatures(commander, loadedConfig, env, binary, version),
)

if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) {
corePlugins = append(corePlugins, plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), version))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) ||
(len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) {
corePlugins = append(corePlugins, plugins.NewMetrics(loadedConfig, env, binary))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) {
corePlugins = append(corePlugins, plugins.NewMetricsThrottle(loadedConfig, env))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) {
corePlugins = append(corePlugins, plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env, version))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureProcessWatcher) {
corePlugins = append(corePlugins, plugins.NewProcessWatcher(env, binary))
}

if loadedConfig.IsFeatureEnabled(agent_config.FeatureActivityEvents) {
corePlugins = append(corePlugins, plugins.NewEvents(loadedConfig, env, sdkGRPC.NewMessageMeta(uuid.NewString()), binary))
}

if loadedConfig.AgentAPI.Port != 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureAgentAPI) {
corePlugins = append(corePlugins, plugins.NewAgentAPI(loadedConfig, env, binary))
} else {
log.Info("Agent API not configured")
}

if len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting) {
corePlugins = append(corePlugins, plugins.NewNginxCounter(loadedConfig, binary, env))
}

if loadedConfig.Extensions != nil && len(loadedConfig.Extensions) > 0 {
for _, extension := range loadedConfig.Extensions {
switch {
case extension == agent_config.AdvancedMetricsExtensionPlugin:
advancedMetricsExtensionPlugin := extensions.NewAdvancedMetrics(env, loadedConfig, config.Viper.Get(agent_config.AdvancedMetricsExtensionPluginConfigKey))
extensionPlugins = append(extensionPlugins, advancedMetricsExtensionPlugin)
case extension == agent_config.NginxAppProtectExtensionPlugin:
nginxAppProtectExtensionPlugin, err := extensions.NewNginxAppProtect(loadedConfig, env, config.Viper.Get(agent_config.NginxAppProtectExtensionPluginConfigKey))
if err != nil {
log.Errorf("Unable to load the Nginx App Protect plugin due to the following error: %v", err)
} else {
extensionPlugins = append(extensionPlugins, nginxAppProtectExtensionPlugin)
}
case extension == agent_config.NginxAppProtectMonitoringExtensionPlugin:
nginxAppProtectMonitoringExtensionPlugin, err := extensions.NewNAPMonitoring(env, loadedConfig, config.Viper.Get(agent_config.NginxAppProtectMonitoringExtensionPluginConfigKey))
if err != nil {
log.Errorf("Unable to load the Nginx App Protect Monitoring plugin due to the following error: %v", err)
} else {
extensionPlugins = append(extensionPlugins, nginxAppProtectMonitoringExtensionPlugin)
}
case extension == agent_config.PhpFpmMetricsExtensionPlugin:
phpFpmMetricstExtensionPlugin, err := extensions.NewPhpFpmMetrics(env, loadedConfig)
if err != nil {
log.Errorf("Unable to load the PhpFpm Metrics plugin due to the following error: %v", err)
} else {
extensionPlugins = append(extensionPlugins, phpFpmMetricstExtensionPlugin)
}
default:
log.Warnf("unknown extension configured: %s", extension)
}
}
}

return corePlugins, extensionPlugins
}

func initializeMessagePipe(ctx context.Context, corePlugins []core.Plugin, extensionPlugins []core.ExtensionPlugin) core.MessagePipeInterface {
pipe := core.NewMessagePipe(ctx)
err := pipe.Register(agent_config.DefaultPluginSize, corePlugins, extensionPlugins)
if err != nil {
log.Warnf("Failed to start agent successfully, error loading plugins %v", err)
}
return pipe
}

func setDialOptions(loadedConfig *config.Config) []grpc.DialOption {
grpcDialOptions := []grpc.DialOption{grpc.WithUserAgent("nginx-agent/" + strings.TrimPrefix(version, "v"))}
grpcDialOptions = append(grpcDialOptions, sdkGRPC.DefaultClientDialOptions...)
grpcDialOptions = append(grpcDialOptions, sdkGRPC.DataplaneConnectionDialOptions(loadedConfig.Server.Token, sdkGRPC.NewMessageMeta(uuid.NewString()))...)
return grpcDialOptions
}
Loading

0 comments on commit 26e43d6

Please sign in to comment.