Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race conditions fixes #810

Merged
merged 23 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ jobs:
name: official-oss-integration-test-logs-${{ matrix.container.image }}-${{ matrix.container.version }}
path: /tmp/integration-test-logs/
retention-days: 3

official-plus-image-integration-tests:
name: Integration Tests - Official Plus Images
needs: build-unsigned-snapshot
Expand Down
2 changes: 1 addition & 1 deletion nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ config_dirs: "/etc/nginx:/usr/local/etc/nginx:/usr/share/nginx/modules:/etc/nms"
# host: 127.0.0.1
#
# Set this value to a secure port number to prevent information leaks.
# port: 8038
# port: 8038
2 changes: 2 additions & 0 deletions sdk/client/metric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error {
}

func (r *metricReporter) Close() (err error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.closeConnection()
}

Expand Down
5 changes: 5 additions & 0 deletions sdk/config_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/nginx/agent/sdk/v2/backoff"
Expand All @@ -43,6 +44,8 @@ const (
httpClientTimeout = 1 * time.Second
)

var readLock = sync.Mutex{}

type DirectoryMap struct {
paths map[string]*proto.Directory
}
Expand Down Expand Up @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives(
allowedDirectories map[string]struct{},
ignoreDirectives []string,
) (*proto.NginxConfig, error) {
readLock.Lock()
payload, err := crossplane.Parse(confFile,
&crossplane.ParseOptions{
IgnoreDirectives: ignoreDirectives,
Expand Down Expand Up @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives(
if err != nil {
return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err)
}
readLock.Unlock()

return nginxConfig, nil
}
Expand Down
6 changes: 5 additions & 1 deletion src/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"time"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
Expand Down Expand Up @@ -50,6 +51,7 @@ const (
var (
Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter))
MigratedEnv = false
cfgMu = sync.Mutex{}
)

func SetVersion(version, commit string) {
Expand Down Expand Up @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) {
}

func GetConfig(clientId string) (*Config, error) {
extensions := []string{}
var extensions []string

for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) {
if agent_config.IsKnownExtension(extension) {
Expand Down Expand Up @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) {
// overwritten or not.
func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) {
// Get current config on disk
cfgMu.Lock()
defer cfgMu.Unlock()
config, err := GetConfig(systemId)
if err != nil {
log.Errorf("Failed to register config: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions src/core/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestGetConfig(t *testing.T) {

assert.Equal(t, []string{}, config.Tags)
assert.Equal(t, Defaults.Features, config.Features)
assert.Equal(t, []string{}, config.Extensions)
assert.Equal(t, []string(nil), config.Extensions)
})

t.Run("test override defaults with flags", func(t *testing.T) {
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestGetConfig(t *testing.T) {
assert.Equal(t, Defaults.AgentMetrics.Mode, config.AgentMetrics.Mode)
assert.Equal(t, 10*time.Minute, config.AgentMetrics.Backoff.MaxInterval)
assert.Equal(t, Defaults.Features, config.Features)
assert.Equal(t, []string{}, config.Extensions)
assert.Equal(t, []string(nil), config.Extensions)
})

t.Run("test override config values with ENV variables", func(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions src/core/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"regexp"
"runtime"
"strings"
"sync"
"syscall"

"github.com/google/uuid"
Expand All @@ -43,7 +44,6 @@ import (
//go:generate mv fake_environment_fixed.go fake_environment_test.go
type Environment interface {
NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo
// NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo
GetHostname() (hostname string)
GetSystemUUID() (hostId string)
ReadDirectory(dir string, ext string) ([]string, error)
Expand All @@ -70,6 +70,7 @@ type EnvironmentType struct {
host *proto.HostInfo
virtualizationFunc func(ctx context.Context) (string, string, error)
isContainerFunc func() bool
hostMu sync.Mutex
}

type Process struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ const (
IsContainerKey = "isContainer"
GetContainerIDKey = "GetContainerID"
GetSystemUUIDKey = "GetSystemUUIDKey"
ReleaseInfoFile = "/etc/os-release"
)

var (
Expand All @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con

func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo {
defer ctx.Done()
env.hostMu.Lock()
defer env.hostMu.Unlock()
// temp cache measure
if env.host == nil || clearCache {
hostInformation, err := host.InfoWithContext(ctx)
Expand Down Expand Up @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer
Partitons: disks,
Network: env.networks(),
Processor: env.processors(hostInformation.KernelArch),
Release: releaseInfo("/etc/os-release"),
Release: releaseInfo(ReleaseInfoFile),
Tags: *tags,
AgentAccessibleDirs: configDirs,
}
Expand Down
3 changes: 0 additions & 3 deletions src/core/metrics/sources/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/metrics"
log "github.com/sirupsen/logrus"
)

const MOUNT_POINT = "mount_point"
Expand Down Expand Up @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper
"in_use": float64(usage.UsedPercentage),
})

log.Debugf("disk metrics collected: %v", len(simpleMetrics))

select {
case <-ctx.Done():
return
Expand Down
5 changes: 3 additions & 2 deletions src/core/metrics/sources/net_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"fmt"
"sync"

log "github.com/sirupsen/logrus"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/metrics"
"github.com/shirou/gopsutil/v3/net"
log "github.com/sirupsen/logrus"
)

const NETWORK_INTERFACE = "network_interface"
Expand Down Expand Up @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap
}

simpleMetrics := nio.convertSamplesToSimpleMetrics(v)
log.Debugf("net IO stats count: %d", len(simpleMetrics))

select {
case <-ctx.Done():
Expand All @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap
simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats)
m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM)

log.Debugf("net IO stats: %v", currentNetIOStats)
nio.netIOStats = currentNetIOStats
}

Expand Down
5 changes: 2 additions & 3 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() {
fn()
delete(c.logs, f)
}
log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId)
}

func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
Expand Down Expand Up @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
mu.Unlock()

case <-tick.C:
mu.Lock()

c.baseDimensions.NginxType = c.nginxType
c.baseDimensions.PublishedAPI = logFile

mu.Lock()

if len(requestLengths) > 0 {
httpCounters["request.length"] = getAverageMetricValue(requestLengths)
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/metrics/sources/nginx_error_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() {

func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) {
c.mu.Lock()
defer c.mu.Unlock()

c.baseDimensions = dimensions

Expand All @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met
// add, remove or update existing log trailers
c.syncLogs()
}
c.mu.Unlock()
}

func (c *NginxErrorLog) recreateLogs() {
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel
delete(c.logFormats, logFile)
}

func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) {
mu.Unlock()

case <-tick.C:
mu.Lock()
c.baseDimensions.NginxType = c.nginxType
c.baseDimensions.PublishedAPI = logFile

mu.Lock()
simpleMetrics := c.convertSamplesToSimpleMetrics(counters)
log.Tracef("Error log metrics collected: %v", simpleMetrics)

Expand Down
Loading
Loading