Skip to content

Commit

Permalink
Fix empty access item when log format is changed (#270)
Browse files Browse the repository at this point in the history
* fix empty access item
---------

Co-authored-by: oliveromahony <[email protected]>
  • Loading branch information
aphralG and oliveromahony authored May 4, 2023
1 parent 721514b commit c978574
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 160 deletions.
10 changes: 6 additions & 4 deletions src/core/metrics/collectors/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (
log "github.com/sirupsen/logrus"
)

var (
_ metrics.Collector = (*NginxCollector)(nil)
)

type NginxCollector struct {
sources []metrics.NginxSource
buf chan *metrics.StatsEntityWrapper
Expand Down Expand Up @@ -130,3 +126,9 @@ func (c *NginxCollector) Stop() {
func (c *NginxCollector) GetNginxId() string {
return c.dimensions.NginxId
}

func (c *NginxCollector) UpdateSources() {
for _, nginxSource := range c.sources {
nginxSource.Update(c.dimensions, c.collectorConf)
}
}
2 changes: 2 additions & 0 deletions src/core/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ type Collector interface {
Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *StatsEntityWrapper)
UpdateConfig(config *config.Config)
}

type Source interface {
Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *StatsEntityWrapper)
}

type NginxSource interface {
Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *StatsEntityWrapper)
Update(dimensions *CommonDim, collectorConf *NginxCollectorConfig)
Expand Down
91 changes: 51 additions & 40 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func NewNginxAccessLog(
nginxAccessLog.logFormats[logFile] = logFormat
go nginxAccessLog.logStats(logCTX, logFile, logFormat)
}

return nginxAccessLog
}

Expand All @@ -118,26 +117,61 @@ func (c *NginxAccessLog) Update(dimensions *metrics.CommonDim, collectorConf *me

if c.collectionInterval != collectorConf.CollectionInterval {
c.collectionInterval = collectorConf.CollectionInterval
// remove old access logs
// add new access logs
c.recreateLogs()
} else {
// add, remove or update existing log trailers
c.syncLogs()
}

for f, fn := range c.logs {
log.Infof("Removing access log tailer: %s", f)
fn()
delete(c.logs, f)
delete(c.logFormats, f)
}
}

logs := c.binary.GetAccessLogs()
func (c *NginxAccessLog) recreateLogs() {
for f, fn := range c.logs {
c.stopTailer(f, fn)
}

for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding access log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile, logFormat)
}
logs := c.binary.GetAccessLogs()

for logFile, logFormat := range logs {
c.startTailer(logFile, logFormat)
}
}

func (c *NginxAccessLog) syncLogs() {
logs := c.binary.GetAccessLogs()

for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
c.stopTailer(f, fn)
}
}

for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
c.startTailer(logFile, logFormat)
} else if c.logFormats[logFile] != logFormat {
// cancel tailer with old log format
c.logs[logFile]()
c.startTailer(logFile, logFormat)
}
}
}

func (c *NginxAccessLog) startTailer(logFile string, logFormat string) {
log.Infof("Adding access log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile, logFormat)
}

func (c *NginxAccessLog) stopTailer(logFile string, cancelFunction context.CancelFunc) {
log.Infof("Removing access log tailer: %s", logFile)
cancelFunction()
delete(c.logs, logFile)
delete(c.logFormats, logFile)
}

func (c *NginxAccessLog) Stop() {
Expand All @@ -152,28 +186,6 @@ func (c *NginxAccessLog) Stop() {
func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()
logs := c.binary.GetAccessLogs()

if c.binary.UpdateLogs(c.logFormats, logs) {
log.Info("Access logs updated")
// cancel any removed access logs
for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
log.Infof("Removing access log tailer: %s", f)
fn()
delete(c.logs, f)
}
}
// add any new ones
for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding access log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
go c.logStats(logCTX, logFile, logFormat)
}
}
}

for _, stat := range c.buf {
m <- stat
Expand Down Expand Up @@ -206,7 +218,6 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string

tick := time.NewTicker(c.collectionInterval)
defer tick.Stop()

for {
select {
case d := <-data:
Expand Down Expand Up @@ -344,7 +355,7 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
case <-ctx.Done():
err := ctx.Err()
if err != nil {
log.Errorf("NginxAccessLog: error in done context logStats %v", err)
log.Tracef("NginxAccessLog: error in done context logStats %v", err)
}
log.Info("NginxAccessLog: logStats are done")
return
Expand Down
85 changes: 49 additions & 36 deletions src/core/metrics/sources/nginx_error_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,55 +121,68 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met
defer c.mu.Unlock()

c.baseDimensions = dimensions

if c.collectionInterval != collectorConf.CollectionInterval {
c.collectionInterval = collectorConf.CollectionInterval
// remove old error logs
// add new error logs
c.recreateLogs()
} else {
// add, remove or update existing log trailers
c.syncLogs()
}
}

for f, fn := range c.logs {
log.Infof("Removing error log tailer: %s", f)
fn()
delete(c.logs, f)
}
func (c *NginxErrorLog) recreateLogs() {
for f, fn := range c.logs {
c.stopTailer(f, fn)
}

logs := c.binary.GetErrorLogs()
logs := c.binary.GetErrorLogs()

// add any new ones
for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding error log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile)
}
}
for logFile, logFormat := range logs {
c.startTailer(logFile, logFormat)
}
}

func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()
func (c *NginxErrorLog) syncLogs() {
logs := c.binary.GetErrorLogs()

if c.binary.UpdateLogs(c.logFormats, logs) {
log.Info("Error logs updated")
// cancel any removed error logs
for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
log.Infof("Removing error log tailer: %s", f)
fn()
delete(c.logs, f)
}
for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
c.stopTailer(f, fn)
}
// add any new ones
for logFile := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding error log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
go c.logStats(logCTX, logFile)
}
}

for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
c.startTailer(logFile, logFormat)
} else if c.logFormats[logFile] != logFormat {
// cancel tailer with old log format
c.logs[logFile]()
c.startTailer(logFile, logFormat)
}
}
}

func (c *NginxErrorLog) startTailer(logFile string, logFormat string) {
log.Infof("Adding error log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile)
}

func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.CancelFunc) {
log.Infof("Removing error log tailer: %s", logFile)
cancelFunction()
delete(c.logs, logFile)
delete(c.logFormats, logFile)
}

func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()

for _, stat := range c.buf {
m <- stat
Expand Down
14 changes: 14 additions & 0 deletions src/plugins/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (m *Metrics) Process(msg *core.Message) {
m.registerStatsSources()
return

case msg.Exact(core.NginxConfigApplySucceeded):
m.updateCollectorsSources()
return

case msg.Exact(core.NginxDetailProcUpdate):
collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary)
for key, collectorConfig := range collectorConfigsMap {
Expand Down Expand Up @@ -161,6 +165,7 @@ func (m *Metrics) Subscriptions() []string {
core.NginxStatusAPIUpdate,
core.NginxPluginConfigured,
core.NginxDetailProcUpdate,
core.NginxConfigApplySucceeded,
}
}

Expand Down Expand Up @@ -342,3 +347,12 @@ func (m *Metrics) updateCollectorsConfig() {
collector.UpdateConfig(m.conf)
}
}

func (m *Metrics) updateCollectorsSources() {
log.Trace("Updating nginx collector sources")
for _, collector := range m.collectors {
if nginxCollector, ok := collector.(*collectors.NginxCollector); ok {
nginxCollector.UpdateSources()
}
}
}
1 change: 1 addition & 0 deletions src/plugins/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func TestMetrics_Subscriptions(t *testing.T) {
core.NginxStatusAPIUpdate,
core.NginxPluginConfigured,
core.NginxDetailProcUpdate,
core.NginxConfigApplySucceeded,
}
pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary())
assert.Equal(t, subs, pluginUnderTest.Subscriptions())
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c978574

Please sign in to comment.