Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix empty access item when log format is changed #270

Merged
merged 22 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/core/metrics/collectors/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (
log "github.com/sirupsen/logrus"
)

var (
_ metrics.Collector = (*NginxCollector)(nil)
)

type NginxCollector struct {
sources []metrics.NginxSource
buf chan *metrics.StatsEntityWrapper
Expand Down Expand Up @@ -130,3 +126,9 @@ func (c *NginxCollector) Stop() {
func (c *NginxCollector) GetNginxId() string {
return c.dimensions.NginxId
}

func (c *NginxCollector) UpdateSources() {
for _, nginxSource := range c.sources {
nginxSource.Update(c.dimensions, c.collectorConf)
}
}
2 changes: 2 additions & 0 deletions src/core/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ type Collector interface {
Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *StatsEntityWrapper)
UpdateConfig(config *config.Config)
}

type Source interface {
Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *StatsEntityWrapper)
}

type NginxSource interface {
Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *StatsEntityWrapper)
Update(dimensions *CommonDim, collectorConf *NginxCollectorConfig)
Expand Down
91 changes: 51 additions & 40 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func NewNginxAccessLog(
nginxAccessLog.logFormats[logFile] = logFormat
go nginxAccessLog.logStats(logCTX, logFile, logFormat)
}

return nginxAccessLog
}

Expand All @@ -118,26 +117,61 @@ func (c *NginxAccessLog) Update(dimensions *metrics.CommonDim, collectorConf *me

if c.collectionInterval != collectorConf.CollectionInterval {
c.collectionInterval = collectorConf.CollectionInterval
// remove old access logs
// add new access logs
c.recreateLogs()
} else {
// add, remove or update existing log trailers
c.syncLogs()
}

for f, fn := range c.logs {
log.Infof("Removing access log tailer: %s", f)
fn()
delete(c.logs, f)
delete(c.logFormats, f)
}
}

logs := c.binary.GetAccessLogs()
func (c *NginxAccessLog) recreateLogs() {
for f, fn := range c.logs {
c.stopTailer(f, fn)
}

for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding access log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile, logFormat)
}
logs := c.binary.GetAccessLogs()

for logFile, logFormat := range logs {
c.startTailer(logFile, logFormat)
}
}

func (c *NginxAccessLog) syncLogs() {
logs := c.binary.GetAccessLogs()

for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
c.stopTailer(f, fn)
}
}

for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
c.startTailer(logFile, logFormat)
} else if c.logFormats[logFile] != logFormat {
// cancel tailer with old log format
c.logs[logFile]()
c.startTailer(logFile, logFormat)
}
}
}

func (c *NginxAccessLog) startTailer(logFile string, logFormat string) {
log.Infof("Adding access log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile, logFormat)
}

func (c *NginxAccessLog) stopTailer(logFile string, cancelFunction context.CancelFunc) {
log.Infof("Removing access log tailer: %s", logFile)
cancelFunction()
delete(c.logs, logFile)
delete(c.logFormats, logFile)
}

func (c *NginxAccessLog) Stop() {
Expand All @@ -152,28 +186,6 @@ func (c *NginxAccessLog) Stop() {
func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()
logs := c.binary.GetAccessLogs()

if c.binary.UpdateLogs(c.logFormats, logs) {
log.Info("Access logs updated")
// cancel any removed access logs
for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
log.Infof("Removing access log tailer: %s", f)
fn()
delete(c.logs, f)
}
}
// add any new ones
for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding access log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
go c.logStats(logCTX, logFile, logFormat)
}
}
}

for _, stat := range c.buf {
m <- stat
Expand Down Expand Up @@ -206,7 +218,6 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string

tick := time.NewTicker(c.collectionInterval)
defer tick.Stop()

for {
select {
case d := <-data:
Expand Down Expand Up @@ -344,7 +355,7 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
case <-ctx.Done():
err := ctx.Err()
if err != nil {
log.Errorf("NginxAccessLog: error in done context logStats %v", err)
log.Tracef("NginxAccessLog: error in done context logStats %v", err)
}
log.Info("NginxAccessLog: logStats are done")
return
Expand Down
85 changes: 49 additions & 36 deletions src/core/metrics/sources/nginx_error_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,55 +121,68 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met
defer c.mu.Unlock()

c.baseDimensions = dimensions

if c.collectionInterval != collectorConf.CollectionInterval {
c.collectionInterval = collectorConf.CollectionInterval
// remove old error logs
// add new error logs
c.recreateLogs()
} else {
// add, remove or update existing log trailers
c.syncLogs()
}
}

for f, fn := range c.logs {
log.Infof("Removing error log tailer: %s", f)
fn()
delete(c.logs, f)
}
func (c *NginxErrorLog) recreateLogs() {
for f, fn := range c.logs {
c.stopTailer(f, fn)
}

logs := c.binary.GetErrorLogs()
logs := c.binary.GetErrorLogs()

// add any new ones
for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding error log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile)
}
}
for logFile, logFormat := range logs {
c.startTailer(logFile, logFormat)
}
}

func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()
func (c *NginxErrorLog) syncLogs() {
logs := c.binary.GetErrorLogs()

if c.binary.UpdateLogs(c.logFormats, logs) {
log.Info("Error logs updated")
// cancel any removed error logs
for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
log.Infof("Removing error log tailer: %s", f)
fn()
delete(c.logs, f)
}
for f, fn := range c.logs {
if _, ok := logs[f]; !ok {
c.stopTailer(f, fn)
}
// add any new ones
for logFile := range logs {
if _, ok := c.logs[logFile]; !ok {
log.Infof("Adding error log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
go c.logStats(logCTX, logFile)
}
}

for logFile, logFormat := range logs {
if _, ok := c.logs[logFile]; !ok {
c.startTailer(logFile, logFormat)
} else if c.logFormats[logFile] != logFormat {
// cancel tailer with old log format
c.logs[logFile]()
c.startTailer(logFile, logFormat)
}
}
}

func (c *NginxErrorLog) startTailer(logFile string, logFormat string) {
log.Infof("Adding error log tailer: %s", logFile)
logCTX, fn := context.WithCancel(context.Background())
c.logs[logFile] = fn
c.logFormats[logFile] = logFormat
go c.logStats(logCTX, logFile)
}

func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.CancelFunc) {
log.Infof("Removing error log tailer: %s", logFile)
cancelFunction()
delete(c.logs, logFile)
delete(c.logFormats, logFile)
}

func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()

for _, stat := range c.buf {
m <- stat
Expand Down
14 changes: 14 additions & 0 deletions src/plugins/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (m *Metrics) Process(msg *core.Message) {
m.registerStatsSources()
return

case msg.Exact(core.NginxConfigApplySucceeded):
m.updateCollectorsSources()
return

case msg.Exact(core.NginxDetailProcUpdate):
collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary)
for key, collectorConfig := range collectorConfigsMap {
Expand Down Expand Up @@ -161,6 +165,7 @@ func (m *Metrics) Subscriptions() []string {
core.NginxStatusAPIUpdate,
core.NginxPluginConfigured,
core.NginxDetailProcUpdate,
core.NginxConfigApplySucceeded,
}
}

Expand Down Expand Up @@ -342,3 +347,12 @@ func (m *Metrics) updateCollectorsConfig() {
collector.UpdateConfig(m.conf)
}
}

func (m *Metrics) updateCollectorsSources() {
log.Trace("Updating nginx collector sources")
for _, collector := range m.collectors {
if nginxCollector, ok := collector.(*collectors.NginxCollector); ok {
nginxCollector.UpdateSources()
}
}
}
1 change: 1 addition & 0 deletions src/plugins/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func TestMetrics_Subscriptions(t *testing.T) {
core.NginxStatusAPIUpdate,
core.NginxPluginConfigured,
core.NginxDetailProcUpdate,
core.NginxConfigApplySucceeded,
}
pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary())
assert.Equal(t, subs, pluginUnderTest.Subscriptions())
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading