Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocate inventory queue #70

Merged
merged 3 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 40 additions & 36 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type registerableSender interface {
}

type Agent struct {
inv inventoryState
plugins []Plugin // Slice of registered plugins
oldPlugins []ids.PluginID // Deprecated plugins whose cached data must be removed, if existing
agentDir string // Base data directory for the agent
Expand All @@ -87,6 +88,11 @@ type Agent struct {
notificationHandler *ctl.NotificationHandlerWithCancellation // Handle ipc messaging.
}

type inventoryState struct {
readyToReap bool
sendErrorCount uint32
}

// inventory holds the reaper and sender for the inventories of a given entity (local or remote), as well as their status
type inventory struct {
reaper *patchReaper
Expand Down Expand Up @@ -442,7 +448,7 @@ func New(
}

// Create input channel for plugins to feed data back to the agent
a.Context.ch = make(chan PluginOutput)
a.Context.ch = make(chan PluginOutput, a.Context.cfg.InventoryQueueLen)
a.Context.activeEntities = make(chan string, activeEntitiesBufferLength)

if cfg.RegisterEnabled {
Expand Down Expand Up @@ -767,10 +773,6 @@ func (a *Agent) Run() (err error) {
}
}

// State variables
var readyToReap bool // Do we need to execute a reap phase?
var sendErrorCount uint32 = 0 // Send error counter

// Timers
reapTimer := time.NewTicker(cfg.FirstReapInterval)
sendTimer := time.NewTimer(cfg.SendInterval) // Send any deltas every X seconds
Expand Down Expand Up @@ -876,10 +878,10 @@ func (a *Agent) Run() (err error) {
case <-reapTimer.C:
{
for _, inventory := range a.inventories {
if !readyToReap {
if !a.inv.readyToReap {
if len(distinctPlugins) <= len(idsReporting) {
alog.Debug("Signalling initial reap.")
readyToReap = true
a.inv.readyToReap = true
inventory.needsCleanup = true
} else {
pluginIds := make([]ids.PluginID, 0)
Expand All @@ -891,7 +893,7 @@ func (a *Agent) Run() (err error) {
alog.WithField("pluginIds", pluginIds).Debug("Still waiting on plugins.")
}
}
if readyToReap && inventory.needsReaping {
if a.inv.readyToReap && inventory.needsReaping {
reapTimer.Stop()
reapTimer = time.NewTicker(cfg.ReapInterval)
inventory.reaper.Reap()
Expand All @@ -905,40 +907,15 @@ func (a *Agent) Run() (err error) {
}
case <-initialReapTimeout.C:
// If we've waited too long and still not received data from all plugins, we can just send what we have.
if !readyToReap {
if !a.inv.readyToReap {
alog.Debug("Maximum initial reap delay exceeded - marking inventory as ready to report.")
readyToReap = true
a.inv.readyToReap = true
for _, inventory := range a.inventories {
inventory.needsCleanup = true
}
}
case <-sendTimer.C:
{
backoffMax := config.MAX_BACKOFF
for _, inventory := range a.inventories {
err := inventory.sender.Process()
if err != nil {
if ingestError, ok := err.(*inventoryapi.IngestError); ok {
if ingestError.StatusCode == http.StatusTooManyRequests {
alog.Warn("server is rate limiting inventory for this Infrastructure Agent")
backoffMax = config.RATE_LIMITED_BACKOFF
sendErrorCount = helpers.MaxBackoffErrorCount
}
} else {
sendErrorCount++
}
alog.WithError(err).WithField("errorCount", sendErrorCount).
Debug("Inventory sender can't process after retrying.")
break // Assuming break will try to send later the data from the missing inventory senders
} else {
sendErrorCount = 0
}
}
sendTimerVal := helpers.ExpBackoff(cfg.SendInterval,
time.Duration(backoffMax)*time.Second,
sendErrorCount)
sendTimer.Reset(sendTimerVal)
}
a.sendInventory(sendTimer)
case <-debugTimer:
{
debugInfo, err := a.debugProvide()
Expand All @@ -957,6 +934,33 @@ func (a *Agent) Run() (err error) {
}
}

func (a *Agent) sendInventory(sendTimer *time.Timer) {
backoffMax := config.MAX_BACKOFF
for _, i := range a.inventories {
err := i.sender.Process()
if err != nil {
if ingestError, ok := err.(*inventoryapi.IngestError); ok &&
ingestError.StatusCode == http.StatusTooManyRequests {
alog.Warn("server is rate limiting inventory submission")
backoffMax = config.RATE_LIMITED_BACKOFF
a.inv.sendErrorCount = helpers.MaxBackoffErrorCount
} else {
a.inv.sendErrorCount++
}
alog.WithError(err).WithField("errorCount", a.inv.sendErrorCount).
Debug("Inventory sender can't process after retrying.")
// Assuming break will try to send later the data from the missing inventory senders
break
} else {
a.inv.sendErrorCount = 0
}
}
sendTimerVal := helpers.ExpBackoff(a.Context.cfg.SendInterval,
time.Duration(backoffMax)*time.Second,
a.inv.sendErrorCount)
sendTimer.Reset(sendTimerVal)
}

func (a *Agent) removeOutdatedEntities(reportedEntities map[string]bool) {
alog.Debug("Triggered periodic removal of outdated entities.")
// The entities to remove are those entities that haven't reported activity in the last period and
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ type Config struct {
// Public: No
BatchQueueDepth int `yaml:"batch_queue_depth" envconfig:"batch_queue_depth" public:"false"` // See event_sender.go

// InventoryQueue sets the inventory processing queue size.
// Default: 10
// Public: Yes
InventoryQueueLen int

// EnableWinUpdatePlugin enables the windows updates plugin which retrieves the lists of hotfix that are installed
// on the host.
// Default: False
Expand Down Expand Up @@ -1245,6 +1250,7 @@ func NewConfig() *Config {
SmartVerboseModeEntryLimit: DefaultSmartVerboseModeEntryLimit,
DefaultIntegrationsTempDir: defaultIntegrationsTempDir,
IncludeMetricsMatchers: defaultMetricsMatcherConfig,
InventoryQueueLen: DefaultInventoryQueue,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
DefaultStripCommandLine = true
DefaultSmartVerboseModeEntryLimit = 1000
DefaultIntegrationsDir = "newrelic-integrations"
DefaultInventoryQueue = 10

// private
defaultAppDataDir = ""
Expand Down