From a7b17c60b4ba75291ab9cf6a71d62a3c929f0584 Mon Sep 17 00:00:00 2001 From: jhvaras Date: Fri, 14 Aug 2020 18:13:22 +0200 Subject: [PATCH] Allocate inventory queue (#70) * fix: inventory error count increasing on ingest error not 429 * refactor: extract sendInventory * allocate configurable inventory queue for 10 payloads --- internal/agent/agent.go | 76 ++++++++++++++++++++++------------------- pkg/config/config.go | 6 ++++ pkg/config/defaults.go | 1 + 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index fd4368964..f75b0ac9e 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -66,6 +66,7 @@ type registerableSender interface { } type Agent struct { + inv inventoryState plugins []Plugin // Slice of registered plugins oldPlugins []ids.PluginID // Deprecated plugins whose cached data must be removed, if existing agentDir string // Base data directory for the agent @@ -87,6 +88,11 @@ type Agent struct { notificationHandler *ctl.NotificationHandlerWithCancellation // Handle ipc messaging. } +type inventoryState struct { + readyToReap bool + sendErrorCount uint32 +} + // inventory holds the reaper and sender for the inventories of a given entity (local or remote), as well as their status type inventory struct { reaper *patchReaper @@ -442,7 +448,7 @@ func New( } // Create input channel for plugins to feed data back to the agent - a.Context.ch = make(chan PluginOutput) + a.Context.ch = make(chan PluginOutput, a.Context.cfg.InventoryQueueLen) a.Context.activeEntities = make(chan string, activeEntitiesBufferLength) if cfg.RegisterEnabled { @@ -767,10 +773,6 @@ func (a *Agent) Run() (err error) { } } - // State variables - var readyToReap bool // Do we need to execute a reap phase? - var sendErrorCount uint32 = 0 // Send error counter - // Timers reapTimer := time.NewTicker(cfg.FirstReapInterval) sendTimer := time.NewTimer(cfg.SendInterval) // Send any deltas every X seconds @@ -876,10 +878,10 @@ func (a *Agent) Run() (err error) { case <-reapTimer.C: { for _, inventory := range a.inventories { - if !readyToReap { + if !a.inv.readyToReap { if len(distinctPlugins) <= len(idsReporting) { alog.Debug("Signalling initial reap.") - readyToReap = true + a.inv.readyToReap = true inventory.needsCleanup = true } else { pluginIds := make([]ids.PluginID, 0) @@ -891,7 +893,7 @@ func (a *Agent) Run() (err error) { alog.WithField("pluginIds", pluginIds).Debug("Still waiting on plugins.") } } - if readyToReap && inventory.needsReaping { + if a.inv.readyToReap && inventory.needsReaping { reapTimer.Stop() reapTimer = time.NewTicker(cfg.ReapInterval) inventory.reaper.Reap() @@ -905,40 +907,15 @@ func (a *Agent) Run() (err error) { } case <-initialReapTimeout.C: // If we've waited too long and still not received data from all plugins, we can just send what we have. - if !readyToReap { + if !a.inv.readyToReap { alog.Debug("Maximum initial reap delay exceeded - marking inventory as ready to report.") - readyToReap = true + a.inv.readyToReap = true for _, inventory := range a.inventories { inventory.needsCleanup = true } } case <-sendTimer.C: - { - backoffMax := config.MAX_BACKOFF - for _, inventory := range a.inventories { - err := inventory.sender.Process() - if err != nil { - if ingestError, ok := err.(*inventoryapi.IngestError); ok { - if ingestError.StatusCode == http.StatusTooManyRequests { - alog.Warn("server is rate limiting inventory for this Infrastructure Agent") - backoffMax = config.RATE_LIMITED_BACKOFF - sendErrorCount = helpers.MaxBackoffErrorCount - } - } else { - sendErrorCount++ - } - alog.WithError(err).WithField("errorCount", sendErrorCount). - Debug("Inventory sender can't process after retrying.") - break // Assuming break will try to send later the data from the missing inventory senders - } else { - sendErrorCount = 0 - } - } - sendTimerVal := helpers.ExpBackoff(cfg.SendInterval, - time.Duration(backoffMax)*time.Second, - sendErrorCount) - sendTimer.Reset(sendTimerVal) - } + a.sendInventory(sendTimer) case <-debugTimer: { debugInfo, err := a.debugProvide() @@ -957,6 +934,33 @@ func (a *Agent) Run() (err error) { } } +func (a *Agent) sendInventory(sendTimer *time.Timer) { + backoffMax := config.MAX_BACKOFF + for _, i := range a.inventories { + err := i.sender.Process() + if err != nil { + if ingestError, ok := err.(*inventoryapi.IngestError); ok && + ingestError.StatusCode == http.StatusTooManyRequests { + alog.Warn("server is rate limiting inventory submission") + backoffMax = config.RATE_LIMITED_BACKOFF + a.inv.sendErrorCount = helpers.MaxBackoffErrorCount + } else { + a.inv.sendErrorCount++ + } + alog.WithError(err).WithField("errorCount", a.inv.sendErrorCount). + Debug("Inventory sender can't process after retrying.") + // Assuming break will try to send later the data from the missing inventory senders + break + } else { + a.inv.sendErrorCount = 0 + } + } + sendTimerVal := helpers.ExpBackoff(a.Context.cfg.SendInterval, + time.Duration(backoffMax)*time.Second, + a.inv.sendErrorCount) + sendTimer.Reset(sendTimerVal) +} + func (a *Agent) removeOutdatedEntities(reportedEntities map[string]bool) { alog.Debug("Triggered periodic removal of outdated entities.") // The entities to remove are those entities that haven't reported activity in the last period and diff --git a/pkg/config/config.go b/pkg/config/config.go index 7e86ab833..e6b363427 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -592,6 +592,11 @@ type Config struct { // Public: No BatchQueueDepth int `yaml:"batch_queue_depth" envconfig:"batch_queue_depth" public:"false"` // See event_sender.go + // InventoryQueue sets the inventory processing queue size. + // Default: 10 + // Public: Yes + InventoryQueueLen int + // EnableWinUpdatePlugin enables the windows updates plugin which retrieves the lists of hotfix that are installed // on the host. // Default: False @@ -1245,6 +1250,7 @@ func NewConfig() *Config { SmartVerboseModeEntryLimit: DefaultSmartVerboseModeEntryLimit, DefaultIntegrationsTempDir: defaultIntegrationsTempDir, IncludeMetricsMatchers: defaultMetricsMatcherConfig, + InventoryQueueLen: DefaultInventoryQueue, } } diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 926f4a667..95072b280 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -43,6 +43,7 @@ var ( DefaultStripCommandLine = true DefaultSmartVerboseModeEntryLimit = 1000 DefaultIntegrationsDir = "newrelic-integrations" + DefaultInventoryQueue = 10 // private defaultAppDataDir = ""