diff --git a/Makefile.packaging b/Makefile.packaging index 6b2f216bd..b3621c1bf 100644 --- a/Makefile.packaging +++ b/Makefile.packaging @@ -47,7 +47,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## # Create deb packages @for arch in $(DEB_ARCHS); do \ - GOWORK=off CGO_ENABLED=0 GOARCH=$${arch} GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ + GOWORK=off CGO_ENABLED=0 GOARCH=$${arch} GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ for distro in $(DEB_DISTROS); do \ deb_codename=`echo $$distro | cut -d- -f 2`; \ VERSION=$(shell echo ${VERSION} | tr -d 'v')~$${deb_codename} ARCH=$${arch} nfpm pkg --config .nfpm.yaml --packager deb --target ${PACKAGES_DIR}/deb/${PACKAGE_PREFIX}_$(shell echo ${VERSION} | tr -d 'v')~$${deb_codename}_$${arch}.deb; \ @@ -59,7 +59,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## # Create rpm packages - @GOWORK=off CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent + @GOWORK=off CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent @for distro in $(RPM_DISTROS); do \ rpm_distro=`echo $$distro | cut -d- -f 1`; \ rpm_major=`echo $$distro | cut -d- -f 2`; \ @@ -82,7 +82,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## @for arch in $(REDHAT_ARCHS); do \ goarch=amd64; \ if [ "$$arch" = "aarch64" ]; then goarch="arm64"; fi; \ - GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ + GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ for distro in $(REDHAT_VERSIONS); do \ rpm_distro=`echo $$distro | cut -d- -f 1`; \ rpm_major=`echo $$distro | cut -d- -f 2`; \ @@ -99,7 +99,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## @for arch in $(ALMA_ARCHS); do \ goarch=amd64; \ if [ "$$arch" = "aarch64" ]; then goarch="arm64"; fi; \ - GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ + GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ for distro in $(ALMA_VERSIONS); do \ rpm_distro=`echo $$distro | cut -d- -f 1`; \ rpm_major=`echo $$distro | cut -d- -f 2`; \ @@ -116,7 +116,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## @for arch in $(ROCKY_ARCHS); do \ goarch=amd64; \ if [ "$$arch" = "aarch64" ]; then goarch="arm64"; fi; \ - GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ + GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ for distro in $(ROCKY_VERSIONS); do \ rpm_distro=`echo $$distro | cut -d- -f 1`; \ rpm_major=`echo $$distro | cut -d- -f 2`; \ @@ -134,7 +134,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## @for arch in $(AMAZON_ARCHS); do \ goarch=amd64; \ if [ "$$arch" = "aarch64" ]; then goarch="arm64"; fi; \ - GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ + GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ for version in $(AMAZON_VERSIONS); do \ rpm_major=`echo $$version | cut -d- -f 2`; \ rpm_codename="amzn$$rpm_major";\ @@ -150,7 +150,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## @for arch in $(APK_ARCHS); do \ goarch=amd64; \ if [ "$$arch" = "aarch64" ]; then goarch="arm64"; fi; \ - GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ + GOWORK=off CGO_ENABLED=0 GOARCH=$${goarch} GOOS=linux go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent; \ for version in $(APK_VERSIONS); do \ if [ ! -d "$(PACKAGES_DIR)/apk/v$${version}/$${arch}" ]; then mkdir -p $(PACKAGES_DIR)/apk/v$${version}/$${arch}; fi; \ VERSION=$(shell echo ${VERSION} | tr -d 'v') ARCH=$${arch} nfpm pkg --config .nfpm.yaml --packager apk --target $(PACKAGES_DIR)/apk/v$${version}/$${arch}/${PACKAGE_PREFIX}-$(shell echo ${VERSION} | tr -d 'v').apk; \ @@ -163,7 +163,7 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## # Create txz packages rm -rf ./build/nginx-agent - @GOWORK=off CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags=${LDFLAGS} -o ./build/nginx-agent + @GOWORK=off CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -pgo=auto -ldflags=${LDFLAGS} -o ./build/nginx-agent docker run -v ${PWD}:/nginx-agent/ build-signed-packager:1.0.0 diff --git a/nginx-agent.conf b/nginx-agent.conf index 51da248d4..da4660cb8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -45,4 +45,4 @@ config_dirs: "/etc/nginx:/usr/local/etc/nginx:/usr/share/nginx/modules:/etc/nms" # host: 127.0.0.1 # # Set this value to a secure port number to prevent information leaks. - # port: 8038 \ No newline at end of file + # port: 8038 diff --git a/profile.cgo b/profile.cgo index 64dd48bd5..c6c34068b 100644 Binary files a/profile.cgo and b/profile.cgo differ diff --git a/src/core/environment.go b/src/core/environment.go index 8cfab0119..b80a0112d 100644 --- a/src/core/environment.go +++ b/src/core/environment.go @@ -30,6 +30,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/process" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" "golang.org/x/sys/unix" "github.com/nginx/agent/sdk/v2/files" @@ -52,7 +53,9 @@ type Environment interface { DeleteFile(backup ConfigApplyMarker, fileName string) error Processes() (result []*Process) FileStat(path string) (os.FileInfo, error) + Disks() ([]*proto.DiskPartition, error) DiskDevices() ([]string, error) + DiskUsage(mountPoint string) (*DiskUsage, error) GetContainerID() (string, error) GetNetOverflow() (float64, error) IsContainer() bool @@ -80,6 +83,13 @@ type Process struct { Command string } +type DiskUsage struct { + Total float64 + Used float64 + Free float64 + UsedPercentage float64 +} + const ( lengthOfContainerId = 64 versionId = "VERSION_ID" @@ -91,16 +101,19 @@ const ( KernProc = 14 // struct: process entries KernProcPathname = 12 // path to executable SYS_SYSCTL = 202 + IsContainerKey = "isContainer" + GetContainerIDKey = "GetContainerID" + GetSystemUUIDKey = "GetSystemUUIDKey" ) var ( - virtualizationFunc = host.VirtualizationWithContext - _ Environment = &EnvironmentType{} - basePattern = regexp.MustCompile("/([a-f0-9]{64})$") - colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") - scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) - containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") - containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") + virtualizationFunc = host.VirtualizationWithContext + singleflightGroup = &singleflight.Group{} + basePattern = regexp.MustCompile("/([a-f0-9]{64})$") + colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") + scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) + containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") + containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") ) func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { @@ -123,6 +136,12 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer tags = &[]string{} } + disks, err := env.Disks() + if err != nil { + log.Warnf("Unable to get disks information from the host: %v", err) + disks = nil + } + hostInfoFacacde := &proto.HostInfo{ Agent: agentVersion, Boot: hostInformation.BootTime, @@ -131,7 +150,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer OsType: hostInformation.OS, Uuid: env.GetSystemUUID(), Uname: getUnixName(), - Partitons: diskPartitions(), + Partitons: disks, Network: env.networks(), Processor: processors(hostInformation.KernelArch), Release: releaseInfo("/etc/os-release"), @@ -194,24 +213,32 @@ func (env *EnvironmentType) GetHostname() string { } func (env *EnvironmentType) GetSystemUUID() string { - ctx := context.Background() - defer ctx.Done() + res, err, _ := singleflightGroup.Do(GetSystemUUIDKey, func() (interface{}, error) { + var err error + ctx := context.Background() + defer ctx.Done() - if env.IsContainer() { - containerID, err := env.GetContainerID() - if err != nil { - log.Errorf("Unable to read docker container ID: %v", err) - return "" + if env.IsContainer() { + containerID, err := env.GetContainerID() + if err != nil { + return "", fmt.Errorf("unable to read docker container ID: %v", err) + } + return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String(), err } - return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String() - } - hostInfo, err := host.InfoWithContext(ctx) + hostID, err := host.HostIDWithContext(ctx) + if err != nil { + log.Warnf("Unable to read host id from dataplane, defaulting value. Error: %v", err) + return "", err + } + return uuid.NewMD5(uuid.Nil, []byte(hostID)).String(), err + }) if err != nil { - log.Infof("Unable to read host id from dataplane, defaulting value. Error: %v", err) + log.Warnf("Unable to set hostname due to %v", err) return "" } - return uuid.NewMD5(uuid.Nil, []byte(hostInfo.HostID)).String() + + return res.(string) } func (env *EnvironmentType) ReadDirectory(dir string, ext string) ([]string, error) { @@ -307,18 +334,25 @@ func (env *EnvironmentType) IsContainer() bool { k8sServiceAcct = "/var/run/secrets/kubernetes.io/serviceaccount" ) - for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { - if _, err := os.Stat(filename); err == nil { - log.Debugf("is a container because (%s) exists", filename) - return true + res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) { + for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { + if _, err := os.Stat(filename); err == nil { + log.Debugf("Is a container because (%s) exists", filename) + return true, nil + } } - } - // v1 check - if result, err := cGroupV1Check(selfCgroup); err == nil && result { - return result + // v1 check + if result, err := cGroupV1Check(selfCgroup); err == nil && result { + return result, err + } + return false, nil + }) + + if err != nil { + log.Warnf("Unable to retrieve values from cache (%v)", err) } - return false + return res.(bool) } // cGroupV1Check returns if running cgroup v1 @@ -345,20 +379,24 @@ func cGroupV1Check(cgroupFile string) (bool, error) { // GetContainerID returns the ID of the current environment if running in a container func (env *EnvironmentType) GetContainerID() (string, error) { - const mountInfo = "/proc/self/mountinfo" + res, err, _ := singleflightGroup.Do(GetContainerIDKey, func() (interface{}, error) { + const mountInfo = "/proc/self/mountinfo" - if !env.IsContainer() { - return "", errors.New("not in docker") - } + if !env.IsContainer() { + return "", errors.New("not in docker") + } - containerID, err := getContainerID(mountInfo) - if err != nil { - return "", fmt.Errorf("could not get container ID: %v", err) - } + containerID, err := getContainerID(mountInfo) + if err != nil { + return "", fmt.Errorf("could not get container ID: %v", err) + } + + log.Debugf("Container ID: %s", containerID) - log.Debugf("Container ID: %s", containerID) + return containerID, err + }) - return containerID, err + return res.(string), err } // getContainerID returns the container ID of the current running environment. @@ -428,6 +466,42 @@ func (env *EnvironmentType) DiskDevices() ([]string, error) { } } +func (env *EnvironmentType) Disks() (partitions []*proto.DiskPartition, err error) { + ctx := context.Background() + defer ctx.Done() + parts, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + // return an array of 0 + log.Errorf("Could not read disk partitions for host: %v", err) + return []*proto.DiskPartition{}, err + } + for _, part := range parts { + pm := proto.DiskPartition{ + MountPoint: part.Mountpoint, + Device: part.Device, + FsType: part.Fstype, + } + partitions = append(partitions, &pm) + } + return partitions, nil +} + +func (env *EnvironmentType) DiskUsage(mountPoint string) (*DiskUsage, error) { + ctx := context.Background() + defer ctx.Done() + usage, err := disk.UsageWithContext(ctx, mountPoint) + if err != nil { + return nil, errors.New("unable to obtain disk usage stats") + } + + return &DiskUsage{ + Total: float64(usage.Total), + Used: float64(usage.Used), + Free: float64(usage.Free), + UsedPercentage: float64(usage.UsedPercent), + }, nil +} + func (env *EnvironmentType) GetNetOverflow() (float64, error) { return network.GetNetOverflow() } @@ -795,26 +869,6 @@ func virtualization() (string, string) { return virtualizationSystem, virtualizationRole } -func diskPartitions() (partitions []*proto.DiskPartition) { - ctx := context.Background() - defer ctx.Done() - parts, err := disk.PartitionsWithContext(ctx, false) - if err != nil { - // return an array of 0 - log.Errorf("Could not read disk partitions for host: %v", err) - return []*proto.DiskPartition{} - } - for _, part := range parts { - pm := proto.DiskPartition{ - MountPoint: part.Mountpoint, - Device: part.Device, - FsType: part.Fstype, - } - partitions = append(partitions, &pm) - } - return partitions -} - func releaseInfo(osReleaseFile string) (release *proto.ReleaseInfo) { hostReleaseInfo := getHostReleaseInfo() osRelease, err := getOsRelease(osReleaseFile) diff --git a/src/core/fake_environment_test.go b/src/core/fake_environment_test.go index 02b4da669..4de09b7a2 100644 --- a/src/core/fake_environment_test.go +++ b/src/core/fake_environment_test.go @@ -33,6 +33,31 @@ type FakeEnvironment struct { result1 []string result2 error } + DiskUsageStub func(string) (*DiskUsage, error) + diskUsageMutex sync.RWMutex + diskUsageArgsForCall []struct { + arg1 string + } + diskUsageReturns struct { + result1 *DiskUsage + result2 error + } + diskUsageReturnsOnCall map[int]struct { + result1 *DiskUsage + result2 error + } + DisksStub func() ([]*proto.DiskPartition, error) + disksMutex sync.RWMutex + disksArgsForCall []struct { + } + disksReturns struct { + result1 []*proto.DiskPartition + result2 error + } + disksReturnsOnCall map[int]struct { + result1 []*proto.DiskPartition + result2 error + } FileStatStub func(string) (fs.FileInfo, error) fileStatMutex sync.RWMutex fileStatArgsForCall []struct { @@ -287,6 +312,126 @@ func (fake *FakeEnvironment) DiskDevicesReturnsOnCall(i int, result1 []string, r }{result1, result2} } +func (fake *FakeEnvironment) DiskUsage(arg1 string) (*DiskUsage, error) { + fake.diskUsageMutex.Lock() + ret, specificReturn := fake.diskUsageReturnsOnCall[len(fake.diskUsageArgsForCall)] + fake.diskUsageArgsForCall = append(fake.diskUsageArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.DiskUsageStub + fakeReturns := fake.diskUsageReturns + fake.recordInvocation("DiskUsage", []interface{}{arg1}) + fake.diskUsageMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeEnvironment) DiskUsageCallCount() int { + fake.diskUsageMutex.RLock() + defer fake.diskUsageMutex.RUnlock() + return len(fake.diskUsageArgsForCall) +} + +func (fake *FakeEnvironment) DiskUsageCalls(stub func(string) (*DiskUsage, error)) { + fake.diskUsageMutex.Lock() + defer fake.diskUsageMutex.Unlock() + fake.DiskUsageStub = stub +} + +func (fake *FakeEnvironment) DiskUsageArgsForCall(i int) string { + fake.diskUsageMutex.RLock() + defer fake.diskUsageMutex.RUnlock() + argsForCall := fake.diskUsageArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeEnvironment) DiskUsageReturns(result1 *DiskUsage, result2 error) { + fake.diskUsageMutex.Lock() + defer fake.diskUsageMutex.Unlock() + fake.DiskUsageStub = nil + fake.diskUsageReturns = struct { + result1 *DiskUsage + result2 error + }{result1, result2} +} + +func (fake *FakeEnvironment) DiskUsageReturnsOnCall(i int, result1 *DiskUsage, result2 error) { + fake.diskUsageMutex.Lock() + defer fake.diskUsageMutex.Unlock() + fake.DiskUsageStub = nil + if fake.diskUsageReturnsOnCall == nil { + fake.diskUsageReturnsOnCall = make(map[int]struct { + result1 *DiskUsage + result2 error + }) + } + fake.diskUsageReturnsOnCall[i] = struct { + result1 *DiskUsage + result2 error + }{result1, result2} +} + +func (fake *FakeEnvironment) Disks() ([]*proto.DiskPartition, error) { + fake.disksMutex.Lock() + ret, specificReturn := fake.disksReturnsOnCall[len(fake.disksArgsForCall)] + fake.disksArgsForCall = append(fake.disksArgsForCall, struct { + }{}) + stub := fake.DisksStub + fakeReturns := fake.disksReturns + fake.recordInvocation("Disks", []interface{}{}) + fake.disksMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeEnvironment) DisksCallCount() int { + fake.disksMutex.RLock() + defer fake.disksMutex.RUnlock() + return len(fake.disksArgsForCall) +} + +func (fake *FakeEnvironment) DisksCalls(stub func() ([]*proto.DiskPartition, error)) { + fake.disksMutex.Lock() + defer fake.disksMutex.Unlock() + fake.DisksStub = stub +} + +func (fake *FakeEnvironment) DisksReturns(result1 []*proto.DiskPartition, result2 error) { + fake.disksMutex.Lock() + defer fake.disksMutex.Unlock() + fake.DisksStub = nil + fake.disksReturns = struct { + result1 []*proto.DiskPartition + result2 error + }{result1, result2} +} + +func (fake *FakeEnvironment) DisksReturnsOnCall(i int, result1 []*proto.DiskPartition, result2 error) { + fake.disksMutex.Lock() + defer fake.disksMutex.Unlock() + fake.DisksStub = nil + if fake.disksReturnsOnCall == nil { + fake.disksReturnsOnCall = make(map[int]struct { + result1 []*proto.DiskPartition + result2 error + }) + } + fake.disksReturnsOnCall[i] = struct { + result1 []*proto.DiskPartition + result2 error + }{result1, result2} +} + func (fake *FakeEnvironment) FileStat(arg1 string) (fs.FileInfo, error) { fake.fileStatMutex.Lock() ret, specificReturn := fake.fileStatReturnsOnCall[len(fake.fileStatArgsForCall)] @@ -943,6 +1088,10 @@ func (fake *FakeEnvironment) Invocations() map[string][][]interface{} { defer fake.deleteFileMutex.RUnlock() fake.diskDevicesMutex.RLock() defer fake.diskDevicesMutex.RUnlock() + fake.diskUsageMutex.RLock() + defer fake.diskUsageMutex.RUnlock() + fake.disksMutex.RLock() + defer fake.disksMutex.RUnlock() fake.fileStatMutex.RLock() defer fake.fileStatMutex.RUnlock() fake.getContainerIDMutex.RLock() diff --git a/src/core/metrics/collectors/system.go b/src/core/metrics/collectors/system.go index 0aa1060b3..a92c4a7b8 100644 --- a/src/core/metrics/collectors/system.go +++ b/src/core/metrics/collectors/system.go @@ -40,7 +40,7 @@ func NewSystemCollector(env core.Environment, conf *config.Config) *SystemCollec systemSources = []metrics.Source{ sources.NewVirtualMemorySource(sources.SystemNamespace, env), sources.NewCPUTimesSource(sources.SystemNamespace, env), - sources.NewDiskSource(sources.SystemNamespace), + sources.NewDiskSource(sources.SystemNamespace, env), sources.NewDiskIOSource(sources.SystemNamespace, env), sources.NewNetIOSource(sources.SystemNamespace, env), sources.NewLoadSource(sources.SystemNamespace), diff --git a/src/core/metrics/sources/disk.go b/src/core/metrics/sources/disk.go index 29b788953..472133097 100644 --- a/src/core/metrics/sources/disk.go +++ b/src/core/metrics/sources/disk.go @@ -13,8 +13,8 @@ import ( "sync" "github.com/nginx/agent/sdk/v2/proto" + "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - "github.com/shirou/gopsutil/v3/disk" ) const MOUNT_POINT = "mount_point" @@ -22,25 +22,24 @@ const MOUNT_POINT = "mount_point" type Disk struct { logger *MetricSourceLogger *namedMetric - disks []disk.PartitionStat + disks []*proto.DiskPartition + env core.Environment } -func NewDiskSource(namespace string) *Disk { - ctx := context.Background() - defer ctx.Done() - disks, _ := disk.PartitionsWithContext(ctx, false) - return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks} +func NewDiskSource(namespace string, env core.Environment) *Disk { + disks, _ := env.Disks() + return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks, env} } func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { defer wg.Done() for _, part := range c.disks { - if part.Device == "" || part.Fstype == "" { + if part.Device == "" || part.FsType == "" { continue } - usage, err := disk.UsageWithContext(ctx, part.Mountpoint) + usage, err := c.env.DiskUsage(part.MountPoint) if err != nil { - c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.Mountpoint, err)) + c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.MountPoint, err)) continue } @@ -48,14 +47,14 @@ func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metric "total": float64(usage.Total), "used": float64(usage.Used), "free": float64(usage.Free), - "in_use": float64(usage.UsedPercent), + "in_use": float64(usage.UsedPercentage), }) select { case <-ctx.Done(): return // mount point is not a common dim - case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.Mountpoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): + case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.MountPoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): } } } diff --git a/src/core/metrics/sources/disk_test.go b/src/core/metrics/sources/disk_test.go index 44a54bfc6..9e0047c03 100644 --- a/src/core/metrics/sources/disk_test.go +++ b/src/core/metrics/sources/disk_test.go @@ -14,22 +14,25 @@ import ( "testing" "github.com/nginx/agent/v2/src/core/metrics" + tutils "github.com/nginx/agent/v2/test/utils" "github.com/stretchr/testify/assert" ) func TestNewDiskSource(t *testing.T) { namespace := "test" - actual := NewDiskSource(namespace) + env := tutils.GetMockEnv() + actual := NewDiskSource(namespace, env) assert.Equal(t, "disk", actual.group) assert.Equal(t, namespace, actual.namespace) - assert.Greater(t, len(actual.disks), 1) + assert.Equal(t, len(actual.disks), 2) } func TestDiskCollect(t *testing.T) { namespace := "test" - disk := NewDiskSource(namespace) + env := tutils.GetMockEnv() + disk := NewDiskSource(namespace, env) ctx := context.TODO() wg := &sync.WaitGroup{} diff --git a/src/core/nginx.go b/src/core/nginx.go index 04dcc944e..f29f410ee 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -131,7 +131,6 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []*Proc n.nginxWorkersMap[nginxDetails.GetNginxId()] = append(n.nginxWorkersMap[nginxDetails.GetNginxId()], nginxDetails) } } - } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { diff --git a/src/plugins/agent_api.go b/src/plugins/agent_api.go index 512f2d578..2fe5e86ef 100644 --- a/src/plugins/agent_api.go +++ b/src/plugins/agent_api.go @@ -18,6 +18,7 @@ import ( "net/http" "os" "regexp" + "sync" "time" "github.com/nginx/agent/v2/src/core/metrics" @@ -60,6 +61,7 @@ type AgentAPI struct { nginxBinary core.NginxBinary nginxHandler *NginxHandler exporter *prometheus_metrics.Exporter + processes []*core.Process } type NginxHandler struct { @@ -69,6 +71,8 @@ type NginxHandler struct { nginxBinary core.NginxBinary responseChannel chan *proto.Command_NginxConfigResponse configResponseStatuses map[string]*proto.NginxConfigStatus + processesMutex sync.RWMutex + processes []*core.Process } // swagger:parameters apply-nginx-config @@ -133,12 +137,13 @@ const ( jsonMimeType = "application/json" ) -func NewAgentAPI(config *config.Config, env core.Environment, nginxBinary core.NginxBinary) *AgentAPI { +func NewAgentAPI(config *config.Config, env core.Environment, nginxBinary core.NginxBinary, processes []*core.Process) *AgentAPI { return &AgentAPI{ config: config, env: env, nginxBinary: nginxBinary, exporter: prometheus_metrics.NewExporter(&proto.MetricsReport{}), + processes: processes, } } @@ -181,6 +186,11 @@ func (a *AgentAPI) Process(message *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, response) } + case core.NginxDetailProcUpdate: + a.processes = message.Data().([]*core.Process) + if a.nginxHandler != nil { + a.nginxHandler.syncProcessInfo(a.processes) + } } } @@ -195,6 +205,7 @@ func (a *AgentAPI) Subscriptions() []string { core.NginxConfigValidationPending, core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, + core.NginxDetailProcUpdate, } } @@ -206,6 +217,7 @@ func (a *AgentAPI) createHttpServer() { nginxBinary: a.nginxBinary, responseChannel: make(chan *proto.Command_NginxConfigResponse), configResponseStatuses: make(map[string]*proto.NginxConfigStatus), + processes: a.processes, } mux := http.NewServeMux() @@ -462,7 +474,7 @@ func readFileFromRequest(r *http.Request) (*bytes.Buffer, error) { func (h *NginxHandler) getNginxDetails() []*proto.NginxDetails { var nginxDetails []*proto.NginxDetails - for _, proc := range h.env.Processes() { + for _, proc := range h.getNginxProccessInfo() { if proc.IsMaster { nginxDetails = append(nginxDetails, h.nginxBinary.GetNginxDetailsFromProcess(proc)) } @@ -581,6 +593,18 @@ func (h *NginxHandler) getConfigStatus(w http.ResponseWriter, r *http.Request) e return writeObjectToResponseBody(w, agentAPIConfigApplyStatusResponse) } +func (h *NginxHandler) getNginxProccessInfo() []*core.Process { + h.processesMutex.RLock() + defer h.processesMutex.RUnlock() + return h.processes +} + +func (h *NginxHandler) syncProcessInfo(processInfo []*core.Process) { + h.processesMutex.Lock() + defer h.processesMutex.Unlock() + h.processes = processInfo +} + func writeObjectToResponseBody(w http.ResponseWriter, response any) error { respBody := new(bytes.Buffer) err := json.NewEncoder(respBody).Encode(response) diff --git a/src/plugins/agent_api_test.go b/src/plugins/agent_api_test.go index ee6fefb5d..f2a8ac295 100644 --- a/src/plugins/agent_api_test.go +++ b/src/plugins/agent_api_test.go @@ -55,6 +55,7 @@ func TestAgentAPI_Subscriptions(t *testing.T) { core.NginxConfigValidationPending, core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, + core.NginxDetailProcUpdate, } agentAPI := AgentAPI{} @@ -122,9 +123,7 @@ func TestNginxHandler_sendInstanceDetailsPayload(t *testing.T) { processes = append(processes, &core.Process{Pid: 1, Name: "12345", IsMaster: true}) } - env.On("Processes").Return(processes) - - nginxHandler := NginxHandler{env: env, nginxBinary: mockNginxBinary} + nginxHandler := NginxHandler{env: env, nginxBinary: mockNginxBinary, processes: processes} err := nginxHandler.sendInstanceDetailsPayload(respRec, req) assert.NoError(t, err) @@ -266,12 +265,12 @@ func TestNginxHandler_updateConfig(t *testing.T) { ConfigureArgs: []string{}, } - var env *tutils.MockEnvironment + env := tutils.GetMockEnv() + var processes []*core.Process if tt.nginxInstancesPresent { - env = tutils.GetMockEnvWithProcess() + processes = tutils.GetProcesses() } else { - env = tutils.GetMockEnv() - env.On("Processes", mock.Anything).Return([]*core.Process{}) + processes = []*core.Process{} } env.On("WriteFiles", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -287,6 +286,7 @@ func TestNginxHandler_updateConfig(t *testing.T) { pipeline: pipeline, nginxBinary: mockNginxBinary, responseChannel: make(chan *proto.Command_NginxConfigResponse), + processes: processes, } if tt.response != nil { @@ -412,7 +412,7 @@ func TestProcess_metricReport(t *testing.T) { metricReport := &proto.MetricsReport{Meta: &proto.Metadata{MessageId: "123"}} metricReportBundle := &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{metricReport}} - agentAPI := NewAgentAPI(conf, mockEnvironment, mockNginxBinary) + agentAPI := NewAgentAPI(conf, mockEnvironment, mockNginxBinary, []*core.Process{}) // Check that latest metric report isn't set assert.NotEqual(t, metricReport, agentAPI.exporter.GetLatestMetricReports()[0]) @@ -493,7 +493,7 @@ func TestMtls_forApi(t *testing.T) { client.SetTransport(transport) } - pluginUnderTest := NewAgentAPI(tt.conf, tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) + pluginUnderTest := NewAgentAPI(tt.conf, tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), tutils.GetProcesses()) pluginUnderTest.Init(core.NewMockMessagePipe(ctx)) client.SetRetryCount(3).SetRetryWaitTime(50 * time.Millisecond).SetRetryMaxWaitTime(200 * time.Millisecond) diff --git a/src/plugins/common.go b/src/plugins/common.go index 397db17bf..1c13793f5 100644 --- a/src/plugins/common.go +++ b/src/plugins/common.go @@ -19,6 +19,8 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E var corePlugins []core.Plugin var extensionPlugins []core.ExtensionPlugin + processes := env.Processes() + if commander != nil { corePlugins = append(corePlugins, NewCommander(commander, loadedConfig), @@ -40,18 +42,18 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E corePlugins = append(corePlugins, NewConfigReader(loadedConfig), - NewNginx(commander, binary, env, loadedConfig), + NewNginx(commander, binary, env, loadedConfig, processes), NewExtensions(loadedConfig, env), - NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, agentEventsMeta), + NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta), ) if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) { - corePlugins = append(corePlugins, NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()))) + corePlugins = append(corePlugins, NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { - corePlugins = append(corePlugins, NewMetrics(loadedConfig, env, binary)) + corePlugins = append(corePlugins, NewMetrics(loadedConfig, env, binary, processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { @@ -59,11 +61,11 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E } if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { - corePlugins = append(corePlugins, NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env)) + corePlugins = append(corePlugins, NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env, processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureProcessWatcher) { - corePlugins = append(corePlugins, NewProcessWatcher(env, binary)) + corePlugins = append(corePlugins, NewProcessWatcher(env, binary, processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureActivityEvents) { @@ -71,7 +73,7 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E } if loadedConfig.AgentAPI.Port != 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureAgentAPI) { - corePlugins = append(corePlugins, NewAgentAPI(loadedConfig, env, binary)) + corePlugins = append(corePlugins, NewAgentAPI(loadedConfig, env, binary, processes)) } else { log.Info("Agent API not configured") } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index d1aef6ae7..6b6014a5f 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -42,13 +42,14 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + processes []*core.Process } const ( defaultMinInterval = time.Second * 30 ) -func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment) *DataPlaneStatus { +func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment, processes []*core.Process) *DataPlaneStatus { log.Tracef("Dataplane status interval %s", config.Dataplane.Status.PollInterval) pollInt := config.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { @@ -69,6 +70,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), + processes: processes, } } @@ -129,6 +131,8 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } + case msg.Exact(core.NginxDetailProcUpdate): + dps.processes = msg.Data().([]*core.Process) } } @@ -139,6 +143,7 @@ func (dps *DataPlaneStatus) Subscriptions() []string { core.NginxConfigValidationPending, core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, + core.NginxDetailProcUpdate, } } @@ -182,7 +187,6 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface) } func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus { - processes := dps.env.Processes() forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails) agentActivityStatuses := []*proto.AgentActivityStatus{} @@ -200,8 +204,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), - Details: dps.detailsForProcess(processes, forceDetails), - Healths: dps.healthForProcess(processes), + Details: dps.detailsForProcess(dps.processes, forceDetails), + Healths: dps.healthForProcess(dps.processes), DataplaneSoftwareDetails: dataplaneSoftwareDetails, AgentActivityStatus: agentActivityStatuses, } @@ -210,7 +214,7 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send - hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, true) + hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil } diff --git a/src/plugins/dataplane_status_test.go b/src/plugins/dataplane_status_test.go index 5fce85134..d09facb75 100644 --- a/src/plugins/dataplane_status_test.go +++ b/src/plugins/dataplane_status_test.go @@ -159,7 +159,6 @@ func TestDataPlaneStatus(t *testing.T) { binary.On("GetNginxDetailsFromProcess", mock.Anything).Return(detailsMap[processID]) env := tutils.NewMockEnvironment() - env.On("Processes", mock.Anything).Return([]*core.Process{}) env.On("NewHostInfo", mock.Anything, mock.Anything, mock.Anything).Return(&proto.HostInfo{ Hostname: "test-host", }) @@ -176,7 +175,7 @@ func TestDataPlaneStatus(t *testing.T) { Tags: []string{}, } - dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env) + dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env, []*core.Process{}) messagePipe := core.NewMockMessagePipe(context.Background()) err := messagePipe.Register(10, []core.Plugin{dataPlaneStatus}, []core.ExtensionPlugin{}) @@ -257,7 +256,6 @@ func TestDPSSyncAgentConfigChange(t *testing.T) { binary.On("GetNginxDetailsFromProcess", mock.Anything).Return(detailsMap[processID]) env := tutils.NewMockEnvironment() - env.On("Processes", mock.Anything).Return([]*core.Process{}) env.On("NewHostInfo", mock.Anything, mock.Anything, mock.Anything).Return(&proto.HostInfo{ Hostname: "test-host", }) @@ -273,7 +271,7 @@ func TestDPSSyncAgentConfigChange(t *testing.T) { defer cleanupFunc() // Setup data plane status and mock pipeline - dataPlaneStatus := NewDataPlaneStatus(tc.config, grpc.NewMessageMeta(uuid.New().String()), binary, env) + dataPlaneStatus := NewDataPlaneStatus(tc.config, grpc.NewMessageMeta(uuid.New().String()), binary, env, []*core.Process{}) messagePipe := core.NewMockMessagePipe(context.Background()) err = messagePipe.Register(10, []core.Plugin{dataPlaneStatus}, []core.ExtensionPlugin{}) @@ -330,7 +328,6 @@ func TestDPSSyncNAPDetails(t *testing.T) { binary.On("GetNginxDetailsFromProcess", mock.Anything).Return(detailsMap[processID]) env := tutils.NewMockEnvironment() - env.On("Processes", mock.Anything).Return([]*core.Process{}) env.On("NewHostInfo", mock.Anything, mock.Anything, mock.Anything).Return(&proto.HostInfo{ Hostname: "test-host", }) @@ -350,7 +347,7 @@ func TestDPSSyncNAPDetails(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { // Setup DataPlaneStatus - dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env) + dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env, []*core.Process{}) dataPlaneStatus.softwareDetails[agent_config.NginxAppProtectExtensionPlugin] = &proto.DataplaneSoftwareDetails{Data: tc.initialNAPDetails} defer dataPlaneStatus.Close() @@ -389,6 +386,7 @@ func TestDataPlaneSubscriptions(t *testing.T) { core.NginxConfigValidationPending, core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, + core.NginxDetailProcUpdate, } processID := "12345" @@ -399,7 +397,6 @@ func TestDataPlaneSubscriptions(t *testing.T) { binary.On("GetNginxDetailsFromProcess", mock.Anything).Return(detailsMap[processID]) env := tutils.NewMockEnvironment() - env.On("Processes", mock.Anything).Return([]*core.Process{}) env.On("NewHostInfo", mock.Anything, mock.Anything, mock.Anything).Return(&proto.HostInfo{ Hostname: "test-host", }) @@ -416,7 +413,7 @@ func TestDataPlaneSubscriptions(t *testing.T) { Tags: []string{}, } - dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env) + dataPlaneStatus := NewDataPlaneStatus(config, grpc.NewMessageMeta(uuid.New().String()), binary, env, []*core.Process{}) assert.Equal(t, expectedSubscriptions, dataPlaneStatus.Subscriptions()) } diff --git a/src/plugins/features.go b/src/plugins/features.go index 44aabefed..2c21b300e 100644 --- a/src/plugins/features.go +++ b/src/plugins/features.go @@ -27,16 +27,26 @@ type Features struct { binary core.NginxBinary version string featureMap map[string]func(data string) []core.Plugin + processes []*core.Process agentEventsMeta *events.AgentEventMeta } -func NewFeatures(commander client.Commander, conf *config.Config, env core.Environment, binary core.NginxBinary, version string, agentEventsMeta *events.AgentEventMeta) *Features { +func NewFeatures( + commander client.Commander, + conf *config.Config, + env core.Environment, + binary core.NginxBinary, + version string, + processes []*core.Process, + agentEventsMeta *events.AgentEventMeta, +) *Features { return &Features{ commander: commander, conf: conf, env: env, binary: binary, version: version, + processes: processes, agentEventsMeta: agentEventsMeta, } } @@ -118,6 +128,8 @@ func (f *Features) Process(msg *core.Message) { for _, plugin := range plugins { plugin.Init(f.pipeline) } + } else if msg.Topic() == core.NginxDetailProcUpdate { + f.processes = msg.Data().([]*core.Process) } } @@ -130,7 +142,7 @@ func (f *Features) enableMetricsFeature(data string) []core.Plugin { } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, f.processes) metricsThrottle := NewMetricsThrottle(f.conf, f.env) metricsSender := NewMetricsSender(f.commander) @@ -149,7 +161,7 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, f.processes) return []core.Plugin{metrics} } @@ -198,7 +210,7 @@ func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { } f.conf = conf - api := NewAgentAPI(f.conf, f.env, f.binary) + api := NewAgentAPI(f.conf, f.env, f.binary, f.processes) return []core.Plugin{api} } @@ -213,7 +225,7 @@ func (f *Features) enableRegistrationFeature(data string) []core.Plugin { } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString())) + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.processes) return []core.Plugin{registration} } @@ -228,7 +240,7 @@ func (f *Features) enableDataPlaneStatusFeature(data string) []core.Plugin { } f.conf = conf - dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env) + dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env, f.processes) return []core.Plugin{dataPlaneStatus} } @@ -243,7 +255,7 @@ func (f *Features) enableProcessWatcherFeature(data string) []core.Plugin { } f.conf = conf - processWatcher := NewProcessWatcher(f.env, f.binary) + processWatcher := NewProcessWatcher(f.env, f.binary, f.processes) return []core.Plugin{processWatcher} } @@ -294,7 +306,7 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, f.processes) countingPlugins = append(countingPlugins, metrics) } diff --git a/src/plugins/features_test.go b/src/plugins/features_test.go index e13351559..0bd19918e 100644 --- a/src/plugins/features_test.go +++ b/src/plugins/features_test.go @@ -87,7 +87,6 @@ func TestFeatures_Process(t *testing.T) { env.Mock.On("IsContainer").Return(true) env.On("NewHostInfo", "agentVersion", &[]string{"locally-tagged", "tagged-locally"}).Return(&proto.HostInfo{}) - env.Mock.On("Processes", mock.Anything).Return(processes) binary.On("GetNginxDetailsFromProcess", &core.Process{Name: "12345", IsMaster: true}).Return(detailsMap[processID]) binary.On("GetNginxDetailsMapFromProcesses", mock.Anything).Return(detailsMap) @@ -97,15 +96,15 @@ func TestFeatures_Process(t *testing.T) { configuration, _ := config.GetConfig("1234") - pluginUnderTest := NewFeatures(cmdr, configuration, env, binary, "agentVersion", events.NewAgentEventMeta( + pluginUnderTest := NewFeatures(cmdr, configuration, env, binary, "agentVersion", processes, events.NewAgentEventMeta( config.MODULE, "v0.0.1", "75231", "test-host", "12345678", "group-a", - []string{"tag-a", "tag-b"}, - )) + []string{"tag-a", "tag-b"}), + ) for _, tc := range testCases { messagePipe := core.SetupMockMessagePipe(t, ctx, []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 77d5d7309..eaf35d07d 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -39,10 +39,12 @@ type Metrics struct { env core.Environment conf *config.Config binary core.NginxBinary + processesMutex sync.RWMutex + processes []*core.Process } -func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { - collectorConfigsMap := createCollectorConfigsMap(config, env, binary) +func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, processes []*core.Process) *Metrics { + collectorConfigsMap := createCollectorConfigsMap(config, env, binary, processes) return &Metrics{ collectorsUpdate: atomic.NewBool(false), ticker: time.NewTicker(config.AgentMetrics.CollectionInterval), @@ -56,6 +58,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi env: env, conf: config, binary: binary, + processes: processes, } } @@ -77,7 +80,7 @@ func (m *Metrics) Process(msg *core.Message) { case msg.Exact(core.AgentConfigChanged), msg.Exact(core.NginxConfigApplySucceeded): // If the agent config on disk changed or the NGINX statusAPI was updated // Then update Metrics with relevant config info - collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary) + collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary, m.getNginxProccessInfo()) m.collectorConfigsMapMutex.Lock() m.collectorConfigsMap = collectorConfigsMap m.collectorConfigsMapMutex.Unlock() @@ -97,7 +100,8 @@ func (m *Metrics) Process(msg *core.Message) { return case msg.Exact(core.NginxDetailProcUpdate): - collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary) + m.syncProcessInfo(msg.Data().([]*core.Process)) + collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary, m.getNginxProccessInfo()) for key, collectorConfig := range collectorConfigsMap { if _, ok := m.collectorConfigsMap[key]; !ok { log.Debugf("Adding new nginx collector for nginx id: %s", collectorConfig.NginxId) @@ -299,10 +303,9 @@ func (m *Metrics) syncAgentConfigChange() { m.conf = conf } -func createCollectorConfigsMap(config *config.Config, env core.Environment, binary core.NginxBinary) map[string]*metrics.NginxCollectorConfig { +func createCollectorConfigsMap(config *config.Config, env core.Environment, binary core.NginxBinary, processes []*core.Process) map[string]*metrics.NginxCollectorConfig { collectorConfigsMap := make(map[string]*metrics.NginxCollectorConfig) - processes := env.Processes() for _, p := range processes { if !p.IsMaster { continue @@ -357,3 +360,15 @@ func (m *Metrics) updateCollectorsSources() { } } } + +func (m *Metrics) getNginxProccessInfo() []*core.Process { + m.processesMutex.RLock() + defer m.processesMutex.RUnlock() + return m.processes +} + +func (m *Metrics) syncProcessInfo(processInfo []*core.Process) { + m.processesMutex.Lock() + defer m.processesMutex.Unlock() + m.processes = processInfo +} diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index a3932c372..f59dfa130 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -88,13 +88,13 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { }{ { testName: "NginxRestart", - message: core.NewMessage(core.NginxDetailProcUpdate, []*core.Process{}), - processes: []*core.Process{ + message: core.NewMessage(core.NginxDetailProcUpdate, []*core.Process{ { Name: firstNginxPid, IsMaster: true, }, }, + ), expectedNumberOfCollectors: 1, expectedCollectorConfigMap: map[string]*metrics.NginxCollectorConfig{ firstNginxId: firstCollectorConfig, @@ -102,8 +102,7 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { }, { testName: "NewNginxInstanceAdded", - message: core.NewMessage(core.NginxDetailProcUpdate, []*core.Process{}), - processes: []*core.Process{ + message: core.NewMessage(core.NginxDetailProcUpdate, []*core.Process{ { Name: firstNginxPid, IsMaster: true, @@ -113,6 +112,7 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { IsMaster: true, }, }, + ), expectedNumberOfCollectors: 2, expectedCollectorConfigMap: map[string]*metrics.NginxCollectorConfig{ firstNginxId: firstCollectorConfig, @@ -121,13 +121,13 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { }, { testName: "NginxInstanceRemovedAndNewOneAdded", - message: core.NewMessage(core.NginxDetailProcUpdate, []*core.Process{}), - processes: []*core.Process{ + message: core.NewMessage(core.NginxDetailProcUpdate, []*core.Process{ { Name: secondNginxPid, IsMaster: true, }, }, + ), expectedNumberOfCollectors: 1, expectedCollectorConfigMap: map[string]*metrics.NginxCollectorConfig{ secondNginxId: secondCollectorConfig, @@ -144,29 +144,26 @@ func TestMetricsProcessNginxDetailProcUpdate(t *testing.T) { defer cleanupFunc() env := tutils.NewMockEnvironment() - env.Mock.On("NewHostInfo", mock.Anything, mock.Anything, mock.Anything).Return(&proto.HostInfo{ Hostname: "test-host", }) - env.On("Processes", mock.Anything).Return([]*core.Process{ + env.On("IsContainer").Return(false) + env.On("GetChildProcesses").Return(tutils.GetProcessMap()) + + processes := []*core.Process{ { Name: firstNginxPid, IsMaster: true, }, - }).Once() - env.On("IsContainer").Return(false) - env.On("GetChildProcesses").Return(tutils.GetProcessMap()) + } - metricsPlugin := NewMetrics(config, env, binary) + metricsPlugin := NewMetrics(config, env, binary, processes) metricsPlugin.collectors = []metrics.Collector{ collectors.NewNginxCollector(config, env, metricsPlugin.collectorConfigsMap[firstNginxId], binary), } messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{metricsPlugin}, []core.ExtensionPlugin{}) messagePipe.RunWithoutInit() - // Update the nginx processes seen - env.Mock.On("Processes", mock.Anything).Return(tc.processes).Once() - metricsPlugin.Process(tc.message) assert.Equal(t, tc.expectedNumberOfCollectors, len(metricsPlugin.collectors)) @@ -257,7 +254,7 @@ func TestMetrics_Process_AgentConfigChanged(t *testing.T) { }) // Setup metrics and mock pipeline - metricsPlugin := NewMetrics(tc.config, env, binary) + metricsPlugin := NewMetrics(tc.config, env, binary, tutils.GetProcesses()) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{metricsPlugin}, []core.ExtensionPlugin{}) messagePipe.RunWithoutInit() @@ -288,7 +285,7 @@ func TestMetrics_Process_AgentCollectorsUpdate(t *testing.T) { env := tutils.GetMockEnvWithProcess() env.On("IsContainer").Return(false) - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, tutils.GetMockNginxBinary()) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, tutils.GetMockNginxBinary(), tutils.GetProcesses()) pluginUnderTest.Process(core.NewMessage(core.AgentCollectorsUpdate, nil)) assert.True(t, pluginUnderTest.collectorsUpdate.Load()) @@ -301,7 +298,7 @@ func TestMetrics_Process_NginxConfigApplySucceeded_AgentConfigChanged(t *testing env := tutils.GetMockEnvWithHostAndProcess() env.On("IsContainer").Return(false) - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, binary) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), env, binary, tutils.GetProcesses()) pluginUnderTest.Process(core.NewMessage(core.NginxPluginConfigured, nil)) conf := pluginUnderTest.collectorConfigsMap[secondNginxId] @@ -323,7 +320,7 @@ func TestMetrics_Process_NginxConfigApplySucceeded_AgentConfigChanged(t *testing } func TestMetrics_Info(t *testing.T) { - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), tutils.GetProcesses()) assert.Equal(t, "metrics", pluginUnderTest.Info().Name()) } @@ -336,6 +333,6 @@ func TestMetrics_Subscriptions(t *testing.T) { core.NginxDetailProcUpdate, core.NginxConfigApplySucceeded, } - pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary()) + pluginUnderTest := NewMetrics(tutils.GetMockAgentConfig(), tutils.GetMockEnvWithProcess(), tutils.GetMockNginxBinary(), tutils.GetProcesses()) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) } diff --git a/src/plugins/nginx.go b/src/plugins/nginx.go index 09b5848ab..1937ca8dc 100644 --- a/src/plugins/nginx.go +++ b/src/plugins/nginx.go @@ -89,14 +89,14 @@ type NginxConfigValidationResponse struct { elapsedTime time.Duration } -func NewNginx(cmdr client.Commander, nginxBinary core.NginxBinary, env core.Environment, loadedConfig *config.Config) *Nginx { +func NewNginx(cmdr client.Commander, nginxBinary core.NginxBinary, env core.Environment, loadedConfig *config.Config, processes []*core.Process) *Nginx { isFeatureNginxConfigEnabled := loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxConfig) || loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxConfigAsync) isNginxAppProtectEnabled := loadedConfig.IsExtensionEnabled(agent_config.NginxAppProtectExtensionPlugin) return &Nginx{ nginxBinary: nginxBinary, - processes: env.Processes(), + processes: processes, env: env, cmdr: cmdr, config: loadedConfig, @@ -530,9 +530,6 @@ func (n *Nginx) completeConfigApply(response *NginxConfigValidationResponse) (st }, } - n.syncProcessInfo(n.env.Processes()) - n.nginxBinary.UpdateNginxDetailsFromProcesses(n.processes) - n.messagePipeline.Process(core.NewMessage(core.NginxConfigApplySucceeded, agentActivityStatus)) status = &proto.Command_NginxConfigResponse{ @@ -693,7 +690,6 @@ func (n *Nginx) handleErrorStatus(status *proto.Command_NginxConfigResponse, mes func (n *Nginx) uploadConfigs() { systemId := n.env.GetSystemUUID() - n.syncProcessInfo(n.env.Processes()) for nginxID := range n.nginxBinary.GetNginxDetailsMapFromProcesses(n.getNginxProccessInfo()) { err := n.uploadConfig( &proto.ConfigDescriptor{ diff --git a/src/plugins/nginx_test.go b/src/plugins/nginx_test.go index 4afda2431..7cc9fb7da 100644 --- a/src/plugins/nginx_test.go +++ b/src/plugins/nginx_test.go @@ -552,7 +552,7 @@ func TestNginxConfigApply(t *testing.T) { Extensions: []string{agent_config.NginxAppProtectExtensionPlugin}, } - pluginUnderTest := NewNginx(commandClient, binary, env, conf) + pluginUnderTest := NewNginx(commandClient, binary, env, conf, tutils.GetProcesses()) if (test.config.GetZaux() != &proto.ZippedFile{} && len(test.config.GetZaux().GetContents()) > 0) { pluginUnderTest.nginxAppProtectSoftwareDetails = &proto.AppProtectWAFDetails{ PrecompiledPublication: true, @@ -620,7 +620,7 @@ func TestUploadConfigs(t *testing.T) { core.DataplaneChanged, } - env := tutils.GetMockEnvWithProcess() + env := tutils.GetMockEnv() binary := tutils.NewMockNginxBinary() binary.On("GetNginxDetailsByID", "12345").Return(tutils.GetDetailsMap()["12345"]) @@ -633,7 +633,7 @@ func TestUploadConfigs(t *testing.T) { conf := &loadedConfig.Config{Server: loadedConfig.Server{Host: "127.0.0.1", GrpcPort: 9092}, Features: []string{agent_config.FeatureNginxConfigAsync}} - pluginUnderTest := NewNginx(cmdr, binary, env, conf) + pluginUnderTest := NewNginx(cmdr, binary, env, conf, tutils.GetProcesses()) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) pluginUnderTest.Init(messagePipe) @@ -648,7 +648,6 @@ func TestUploadConfigs(t *testing.T) { binary.AssertExpectations(t) cmdr.AssertExpectations(t) - env.AssertExpectations(t) core.ValidateMessages(t, messagePipe, msgTopics) } @@ -661,7 +660,7 @@ func TestDisableUploadConfigs(t *testing.T) { core.DataplaneChanged, } - env := tutils.GetMockEnvWithProcess() + env := tutils.GetMockEnv() binary := tutils.NewMockNginxBinary() binary.On("UpdateNginxDetailsFromProcesses", mock.Anything) @@ -669,7 +668,7 @@ func TestDisableUploadConfigs(t *testing.T) { cmdr := tutils.NewMockCommandClient() - pluginUnderTest := NewNginx(cmdr, binary, env, &loadedConfig.Config{}) + pluginUnderTest := NewNginx(cmdr, binary, env, &loadedConfig.Config{}, tutils.GetProcesses()) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) pluginUnderTest.Init(messagePipe) @@ -682,14 +681,13 @@ func TestDisableUploadConfigs(t *testing.T) { messagePipe.Run() binary.AssertExpectations(t) - env.AssertExpectations(t) core.ValidateMessages(t, messagePipe, msgTopics) } func TestNginxDetailProcUpdate(t *testing.T) { foundMessage := false - env := tutils.GetMockEnvWithProcess() + env := tutils.GetMockEnv() binary := tutils.NewMockNginxBinary() binary.On("GetNginxDetailsMapFromProcesses", mock.Anything).Return(tutils.GetDetailsMap()) @@ -697,7 +695,7 @@ func TestNginxDetailProcUpdate(t *testing.T) { cmdr := tutils.NewMockCommandClient() - pluginUnderTest := NewNginx(cmdr, binary, env, &loadedConfig.Config{}) + pluginUnderTest := NewNginx(cmdr, binary, env, &loadedConfig.Config{}, tutils.GetProcesses()) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) pluginUnderTest.Init(messagePipe) @@ -708,7 +706,6 @@ func TestNginxDetailProcUpdate(t *testing.T) { binary.AssertExpectations(t) cmdr.AssertExpectations(t) - env.AssertExpectations(t) processedMessages := messagePipe.GetProcessedMessages() @@ -747,15 +744,14 @@ func TestNginx_Process_NginxConfigUpload(t *testing.T) { binary.On("GetNginxDetailsByID", "12345").Return(tutils.GetDetailsMap()["12345"]) binary.On("ReadConfig", "/var/conf", "12345", "12345678").Return(config, nil) - env := tutils.GetMockEnvWithProcess() + env := tutils.GetMockEnv() conf := &loadedConfig.Config{Server: loadedConfig.Server{Host: "127.0.0.1", GrpcPort: 9092}, Features: []string{agent_config.FeatureNginxConfigAsync}} - pluginUnderTest := NewNginx(cmdr, binary, env, conf) + pluginUnderTest := NewNginx(cmdr, binary, env, conf, tutils.GetProcesses()) pluginUnderTest.Process(core.NewMessage(core.NginxConfigUpload, configDesc)) binary.AssertExpectations(t) cmdr.AssertExpectations(t) - env.AssertExpectations(t) pluginUnderTest.Close() } @@ -775,13 +771,13 @@ func TestNginx_Subscriptions(t *testing.T) { core.NginxConfigValidationFailed, core.AgentStarted, } - pluginUnderTest := NewNginx(nil, nil, tutils.GetMockEnvWithProcess(), &loadedConfig.Config{}) + pluginUnderTest := NewNginx(nil, nil, tutils.GetMockEnvWithProcess(), &loadedConfig.Config{}, tutils.GetProcesses()) assert.Equal(t, subs, pluginUnderTest.Subscriptions()) } func TestNginx_Info(t *testing.T) { - pluginUnderTest := NewNginx(nil, nil, tutils.GetMockEnvWithProcess(), &loadedConfig.Config{}) + pluginUnderTest := NewNginx(nil, nil, tutils.GetMockEnvWithProcess(), &loadedConfig.Config{}, tutils.GetProcesses()) assert.Equal(t, agent_config.NginxBinaryPlugin, pluginUnderTest.Info().Name()) } @@ -815,7 +811,7 @@ func TestNginx_validateConfig(t *testing.T) { binary.On("UpdateNginxDetailsFromProcesses", env.Processes()) conf := &loadedConfig.Config{Server: loadedConfig.Server{Host: "127.0.0.1", GrpcPort: 9092}, Features: []string{agent_config.FeatureNginxConfigAsync}} - pluginUnderTest := NewNginx(&tutils.MockCommandClient{}, binary, env, conf) + pluginUnderTest := NewNginx(&tutils.MockCommandClient{}, binary, env, conf, tutils.GetProcesses()) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) messagePipe.Run() @@ -902,7 +898,7 @@ func TestNginx_completeConfigApply(t *testing.T) { }, } - pluginUnderTest := NewNginx(commandClient, binary, env, conf) + pluginUnderTest := NewNginx(commandClient, binary, env, conf, tutils.GetProcesses()) dir := t.TempDir() tempConf, err := os.CreateTemp(dir, "nginx.conf") @@ -997,7 +993,7 @@ func TestNginx_rollbackConfigApply(t *testing.T) { conf := &loadedConfig.Config{Server: loadedConfig.Server{Host: "127.0.0.1", GrpcPort: 9092}, Features: []string{agent_config.FeatureNginxConfigAsync}} - pluginUnderTest := NewNginx(commandClient, binary, env, conf) + pluginUnderTest := NewNginx(commandClient, binary, env, conf, tutils.GetProcesses()) dir := t.TempDir() tempConf, err := os.CreateTemp(dir, "nginx.conf") @@ -1055,7 +1051,7 @@ func TestBlock_ConfigApply(t *testing.T) { binary.On("Reload", mock.Anything, mock.Anything).Return(nil) config := tutils.GetMockAgentConfig() - pluginUnderTest := NewNginx(commandClient, binary, env, config) + pluginUnderTest := NewNginx(commandClient, binary, env, config, tutils.GetProcesses()) messagePipe := core.SetupMockMessagePipe(t, context.TODO(), []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) messagePipe.Process( @@ -1096,7 +1092,7 @@ func TestNginx_monitor(t *testing.T) { config := tutils.GetMockAgentConfig() config.Nginx.ConfigReloadMonitoringPeriod = 10 * time.Second - pluginUnderTest := NewNginx(commandClient, binary, env, config) + pluginUnderTest := NewNginx(commandClient, binary, env, config, tutils.GetProcesses()) // Validate that errors in the logs returned go func() { @@ -1198,7 +1194,7 @@ func TestNginx_monitorLog(t *testing.T) { config := tutils.GetMockAgentConfig() config.Nginx.ConfigReloadMonitoringPeriod = 1 * time.Second config.Nginx.TreatWarningsAsErrors = test.treatWarningsAsErrors - pluginUnderTest := NewNginx(commandClient, binary, env, config) + pluginUnderTest := NewNginx(commandClient, binary, env, config, tutils.GetProcesses()) errorsChannel := make(chan string, 1) pluginUnderTest.monitorLogs(errorLogs, errorsChannel) diff --git a/src/plugins/process_watcher.go b/src/plugins/process_watcher.go index 9df477aa2..37c123bb9 100644 --- a/src/plugins/process_watcher.go +++ b/src/plugins/process_watcher.go @@ -29,27 +29,27 @@ type ProcessWatcher struct { wg sync.WaitGroup env core.Environment binary core.NginxBinary + processes []*core.Process } -func NewProcessWatcher(env core.Environment, nginxBinary core.NginxBinary) *ProcessWatcher { +func NewProcessWatcher(env core.Environment, nginxBinary core.NginxBinary, processes []*core.Process) *ProcessWatcher { return &ProcessWatcher{ - ticker: time.NewTicker(time.Millisecond * 500), + ticker: time.NewTicker(5 * time.Second), seenMasterProcs: make(map[int32]*core.Process), seenWorkerProcs: make(map[int32]*core.Process), nginxDetails: make(map[int32]*proto.NginxDetails), wg: sync.WaitGroup{}, env: env, binary: nginxBinary, + processes: processes, } } func (pw *ProcessWatcher) Init(pipeline core.MessagePipeInterface) { log.Info("ProcessWatcher initializing") - pw.messagePipeline = pipeline - nginxProcesses := pw.env.Processes() - for _, proc := range nginxProcesses { + for _, proc := range pw.processes { if proc.IsMaster { pw.seenMasterProcs[proc.Pid] = proc } else { @@ -61,7 +61,7 @@ func (pw *ProcessWatcher) Init(pipeline core.MessagePipeInterface) { pw.wg.Add(1) go pw.watchProcLoop(pipeline.Context()) - pw.messagePipeline.Process(core.NewMessage(core.NginxDetailProcUpdate, nginxProcesses)) + pw.messagePipeline.Process(core.NewMessage(core.NginxDetailProcUpdate, pw.processes)) } func (pw *ProcessWatcher) Info() *core.Info { diff --git a/src/plugins/process_watcher_test.go b/src/plugins/process_watcher_test.go index d1ea62fc9..d1fb09334 100644 --- a/src/plugins/process_watcher_test.go +++ b/src/plugins/process_watcher_test.go @@ -135,7 +135,7 @@ func TestProcessWatcher_getProcUpdates(t *testing.T) { env := tutils.NewMockEnvironment() binary := core.NewNginxBinary(env, &config.Config{}) - pw := NewProcessWatcher(env, binary) + pw := NewProcessWatcher(env, binary, tt.nginxProcs) pw.seenMasterProcs = tt.seenMasterProcs pw.seenWorkerProcs = tt.seenWorkerProcs pw.nginxDetails = tt.seenNginxDetails @@ -161,8 +161,8 @@ func TestProcessWatcher_getProcUpdates(t *testing.T) { } func TestProcessWatcher_Process(t *testing.T) { - env := tutils.GetMockEnvWithProcess() - pluginUnderTest := NewProcessWatcher(env, tutils.GetMockNginxBinary()) + env := tutils.GetMockEnv() + pluginUnderTest := NewProcessWatcher(env, tutils.GetMockNginxBinary(), tutils.GetProcesses()) ctx, cancel := context.WithCancel(context.TODO()) messagePipe := core.SetupMockMessagePipe(t, ctx, []core.Plugin{pluginUnderTest}, []core.ExtensionPlugin{}) @@ -170,8 +170,6 @@ func TestProcessWatcher_Process(t *testing.T) { pluginUnderTest.Init(messagePipe) messagePipe.Run() - env.AssertExpectations(t) - msgTopics := []string{core.NginxDetailProcUpdate} messages := messagePipe.GetMessages() @@ -187,13 +185,13 @@ func TestProcessWatcher_Process(t *testing.T) { } func TestProcessWatcher_Subscription(t *testing.T) { - pluginUnderTest := NewProcessWatcher(nil, nil) + pluginUnderTest := NewProcessWatcher(nil, nil, nil) assert.Equal(t, []string{}, pluginUnderTest.Subscriptions()) } func TestProcessWatcher_Info(t *testing.T) { - pluginUnderTest := NewProcessWatcher(nil, nil) + pluginUnderTest := NewProcessWatcher(nil, nil, nil) assert.Equal(t, "process-watcher", pluginUnderTest.Info().Name()) } diff --git a/src/plugins/registration.go b/src/plugins/registration.go index c57209fc0..535e01fee 100644 --- a/src/plugins/registration.go +++ b/src/plugins/registration.go @@ -42,6 +42,7 @@ type OneTimeRegistration struct { dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails pipeline core.MessagePipeInterface dataplaneSoftwareDetailsMutex sync.Mutex + processes []*core.Process } func NewOneTimeRegistration( @@ -49,6 +50,7 @@ func NewOneTimeRegistration( binary core.NginxBinary, env core.Environment, meta *proto.Metadata, + processes []*core.Process, ) *OneTimeRegistration { // this might be slow so do on startup host := env.NewHostInfo(config.Version, &config.Tags, config.ConfigDirs, true) @@ -62,6 +64,7 @@ func NewOneTimeRegistration( binary: binary, dataplaneSoftwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), dataplaneSoftwareDetailsMutex: sync.Mutex{}, + processes: processes, } } @@ -150,7 +153,7 @@ func (r *OneTimeRegistration) areDataplaneSoftwareDetailsReady() error { func (r *OneTimeRegistration) registerAgent() { var details []*proto.NginxDetails - for _, proc := range r.env.Processes() { + for _, proc := range r.processes { // only need master process for registration if proc.IsMaster { nginxDetails := r.binary.GetNginxDetailsFromProcess(proc) diff --git a/src/plugins/registration_test.go b/src/plugins/registration_test.go index 84c05da74..18ad5d6cd 100644 --- a/src/plugins/registration_test.go +++ b/src/plugins/registration_test.go @@ -45,7 +45,7 @@ func TestRegistration_Process(t *testing.T) { Extensions: []string{agent_config.NginxAppProtectExtensionPlugin}, } - pluginUnderTest := NewOneTimeRegistration(cfg, binary, env, &proto.Metadata{}) + pluginUnderTest := NewOneTimeRegistration(cfg, binary, env, &proto.Metadata{}, tutils.GetProcesses()) pluginUnderTest.dataplaneSoftwareDetails[agent_config.NginxAppProtectExtensionPlugin] = &proto.DataplaneSoftwareDetails{ Data: testNAPDetailsActive, } @@ -77,7 +77,7 @@ func TestRegistration_areDataplaneSoftwareDetailsReady(t *testing.T) { conf := tutils.GetMockAgentConfig() conf.Extensions = []string{agent_config.NginxAppProtectExtensionPlugin} - pluginUnderTest := NewOneTimeRegistration(conf, nil, tutils.GetMockEnv(), nil) + pluginUnderTest := NewOneTimeRegistration(conf, nil, tutils.GetMockEnv(), nil, tutils.GetProcesses()) softwareDetails := make(map[string]*proto.DataplaneSoftwareDetails) softwareDetails[agent_config.NginxAppProtectExtensionPlugin] = &proto.DataplaneSoftwareDetails{} pluginUnderTest.dataplaneSoftwareDetails = softwareDetails @@ -86,13 +86,13 @@ func TestRegistration_areDataplaneSoftwareDetailsReady(t *testing.T) { } func TestRegistration_Subscriptions(t *testing.T) { - pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil) + pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil, tutils.GetProcesses()) assert.Equal(t, []string{core.RegistrationCompletedTopic, core.DataplaneSoftwareDetailsUpdated}, pluginUnderTest.Subscriptions()) } func TestRegistration_Info(t *testing.T) { - pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil) + pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil, tutils.GetProcesses()) assert.Equal(t, "registration", pluginUnderTest.Info().Name()) } diff --git a/test/component/agent_api_test.go b/test/component/agent_api_test.go index db9e6c462..9ef0b1bbb 100644 --- a/test/component/agent_api_test.go +++ b/test/component/agent_api_test.go @@ -62,17 +62,20 @@ func TestGetNginxInstances(t *testing.T) { }, } + var processes []*core.Process + mockEnvironment := tutils.NewMockEnvironment() + if tt.nginxDetails == nil { - mockEnvironment.On("Processes").Return([]*core.Process{{Pid: pid, IsMaster: false}}) + processes = []*core.Process{{Pid: pid, IsMaster: false}} } else { - mockEnvironment.On("Processes").Return([]*core.Process{{Pid: pid, IsMaster: true}}) + processes = []*core.Process{{Pid: pid, IsMaster: true}} } mockNginxBinary := tutils.NewMockNginxBinary() mockNginxBinary.On("GetNginxDetailsFromProcess", mock.Anything).Return(tt.nginxDetails) - agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary) + agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary, processes) agentAPI.Init(core.NewMockMessagePipe(context.TODO())) client := resty.New() @@ -114,7 +117,7 @@ func TestInvalidPath(t *testing.T) { mockEnvironment := tutils.NewMockEnvironment() mockNginxBinary := tutils.NewMockNginxBinary() - agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary) + agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary, []*core.Process{}) agentAPI.Init(core.NewMockMessagePipe(context.TODO())) client := resty.New() @@ -143,7 +146,7 @@ func TestMetrics(t *testing.T) { mockEnvironment := tutils.NewMockEnvironment() mockNginxBinary := tutils.NewMockNginxBinary() - agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary) + agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary, []*core.Process{}) agentAPI.Init(core.NewMockMessagePipe(context.TODO())) agentAPI.Process(core.NewMessage(core.MetricReport, &metrics.MetricsReportBundle{Data: []*proto.MetricsReport{ { @@ -244,7 +247,7 @@ func TestMetricsDisabled(t *testing.T) { mockEnvironment := tutils.NewMockEnvironment() mockNginxBinary := tutils.NewMockNginxBinary() - agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary) + agentAPI := plugins.NewAgentAPI(conf, mockEnvironment, mockNginxBinary, []*core.Process{}) agentAPI.Init(core.NewMockMessagePipe(context.TODO())) client := resty.New() @@ -354,15 +357,16 @@ func TestConfigApply(t *testing.T) { Plus: &proto.NginxPlusMetaData{Enabled: true}, } + processes := []*core.Process{{Pid: 12345, IsMaster: true}} + mockEnvironment := tutils.NewMockEnvironment() - mockEnvironment.On("Processes").Return([]*core.Process{{Pid: 12345, IsMaster: true}}) mockEnvironment.On("WriteFiles", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mockNginxBinary := tutils.NewMockNginxBinary() mockNginxBinary.On("GetNginxDetailsFromProcess", mock.Anything).Return(nginxDetails) mockNginxBinary.On("ReadConfig", mock.Anything, mock.Anything, mock.Anything).Return(conf, nil) - agentAPI := plugins.NewAgentAPI(tt.agentConf, mockEnvironment, mockNginxBinary) + agentAPI := plugins.NewAgentAPI(tt.agentConf, mockEnvironment, mockNginxBinary, processes) pipeline := core.NewMockMessagePipe(context.TODO()) agentAPI.Init(pipeline) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go index 8cfab0119..b80a0112d 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -30,6 +30,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/process" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" "golang.org/x/sys/unix" "github.com/nginx/agent/sdk/v2/files" @@ -52,7 +53,9 @@ type Environment interface { DeleteFile(backup ConfigApplyMarker, fileName string) error Processes() (result []*Process) FileStat(path string) (os.FileInfo, error) + Disks() ([]*proto.DiskPartition, error) DiskDevices() ([]string, error) + DiskUsage(mountPoint string) (*DiskUsage, error) GetContainerID() (string, error) GetNetOverflow() (float64, error) IsContainer() bool @@ -80,6 +83,13 @@ type Process struct { Command string } +type DiskUsage struct { + Total float64 + Used float64 + Free float64 + UsedPercentage float64 +} + const ( lengthOfContainerId = 64 versionId = "VERSION_ID" @@ -91,16 +101,19 @@ const ( KernProc = 14 // struct: process entries KernProcPathname = 12 // path to executable SYS_SYSCTL = 202 + IsContainerKey = "isContainer" + GetContainerIDKey = "GetContainerID" + GetSystemUUIDKey = "GetSystemUUIDKey" ) var ( - virtualizationFunc = host.VirtualizationWithContext - _ Environment = &EnvironmentType{} - basePattern = regexp.MustCompile("/([a-f0-9]{64})$") - colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") - scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) - containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") - containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") + virtualizationFunc = host.VirtualizationWithContext + singleflightGroup = &singleflight.Group{} + basePattern = regexp.MustCompile("/([a-f0-9]{64})$") + colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") + scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) + containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") + containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") ) func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { @@ -123,6 +136,12 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer tags = &[]string{} } + disks, err := env.Disks() + if err != nil { + log.Warnf("Unable to get disks information from the host: %v", err) + disks = nil + } + hostInfoFacacde := &proto.HostInfo{ Agent: agentVersion, Boot: hostInformation.BootTime, @@ -131,7 +150,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer OsType: hostInformation.OS, Uuid: env.GetSystemUUID(), Uname: getUnixName(), - Partitons: diskPartitions(), + Partitons: disks, Network: env.networks(), Processor: processors(hostInformation.KernelArch), Release: releaseInfo("/etc/os-release"), @@ -194,24 +213,32 @@ func (env *EnvironmentType) GetHostname() string { } func (env *EnvironmentType) GetSystemUUID() string { - ctx := context.Background() - defer ctx.Done() + res, err, _ := singleflightGroup.Do(GetSystemUUIDKey, func() (interface{}, error) { + var err error + ctx := context.Background() + defer ctx.Done() - if env.IsContainer() { - containerID, err := env.GetContainerID() - if err != nil { - log.Errorf("Unable to read docker container ID: %v", err) - return "" + if env.IsContainer() { + containerID, err := env.GetContainerID() + if err != nil { + return "", fmt.Errorf("unable to read docker container ID: %v", err) + } + return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String(), err } - return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String() - } - hostInfo, err := host.InfoWithContext(ctx) + hostID, err := host.HostIDWithContext(ctx) + if err != nil { + log.Warnf("Unable to read host id from dataplane, defaulting value. Error: %v", err) + return "", err + } + return uuid.NewMD5(uuid.Nil, []byte(hostID)).String(), err + }) if err != nil { - log.Infof("Unable to read host id from dataplane, defaulting value. Error: %v", err) + log.Warnf("Unable to set hostname due to %v", err) return "" } - return uuid.NewMD5(uuid.Nil, []byte(hostInfo.HostID)).String() + + return res.(string) } func (env *EnvironmentType) ReadDirectory(dir string, ext string) ([]string, error) { @@ -307,18 +334,25 @@ func (env *EnvironmentType) IsContainer() bool { k8sServiceAcct = "/var/run/secrets/kubernetes.io/serviceaccount" ) - for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { - if _, err := os.Stat(filename); err == nil { - log.Debugf("is a container because (%s) exists", filename) - return true + res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) { + for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { + if _, err := os.Stat(filename); err == nil { + log.Debugf("Is a container because (%s) exists", filename) + return true, nil + } } - } - // v1 check - if result, err := cGroupV1Check(selfCgroup); err == nil && result { - return result + // v1 check + if result, err := cGroupV1Check(selfCgroup); err == nil && result { + return result, err + } + return false, nil + }) + + if err != nil { + log.Warnf("Unable to retrieve values from cache (%v)", err) } - return false + return res.(bool) } // cGroupV1Check returns if running cgroup v1 @@ -345,20 +379,24 @@ func cGroupV1Check(cgroupFile string) (bool, error) { // GetContainerID returns the ID of the current environment if running in a container func (env *EnvironmentType) GetContainerID() (string, error) { - const mountInfo = "/proc/self/mountinfo" + res, err, _ := singleflightGroup.Do(GetContainerIDKey, func() (interface{}, error) { + const mountInfo = "/proc/self/mountinfo" - if !env.IsContainer() { - return "", errors.New("not in docker") - } + if !env.IsContainer() { + return "", errors.New("not in docker") + } - containerID, err := getContainerID(mountInfo) - if err != nil { - return "", fmt.Errorf("could not get container ID: %v", err) - } + containerID, err := getContainerID(mountInfo) + if err != nil { + return "", fmt.Errorf("could not get container ID: %v", err) + } + + log.Debugf("Container ID: %s", containerID) - log.Debugf("Container ID: %s", containerID) + return containerID, err + }) - return containerID, err + return res.(string), err } // getContainerID returns the container ID of the current running environment. @@ -428,6 +466,42 @@ func (env *EnvironmentType) DiskDevices() ([]string, error) { } } +func (env *EnvironmentType) Disks() (partitions []*proto.DiskPartition, err error) { + ctx := context.Background() + defer ctx.Done() + parts, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + // return an array of 0 + log.Errorf("Could not read disk partitions for host: %v", err) + return []*proto.DiskPartition{}, err + } + for _, part := range parts { + pm := proto.DiskPartition{ + MountPoint: part.Mountpoint, + Device: part.Device, + FsType: part.Fstype, + } + partitions = append(partitions, &pm) + } + return partitions, nil +} + +func (env *EnvironmentType) DiskUsage(mountPoint string) (*DiskUsage, error) { + ctx := context.Background() + defer ctx.Done() + usage, err := disk.UsageWithContext(ctx, mountPoint) + if err != nil { + return nil, errors.New("unable to obtain disk usage stats") + } + + return &DiskUsage{ + Total: float64(usage.Total), + Used: float64(usage.Used), + Free: float64(usage.Free), + UsedPercentage: float64(usage.UsedPercent), + }, nil +} + func (env *EnvironmentType) GetNetOverflow() (float64, error) { return network.GetNetOverflow() } @@ -795,26 +869,6 @@ func virtualization() (string, string) { return virtualizationSystem, virtualizationRole } -func diskPartitions() (partitions []*proto.DiskPartition) { - ctx := context.Background() - defer ctx.Done() - parts, err := disk.PartitionsWithContext(ctx, false) - if err != nil { - // return an array of 0 - log.Errorf("Could not read disk partitions for host: %v", err) - return []*proto.DiskPartition{} - } - for _, part := range parts { - pm := proto.DiskPartition{ - MountPoint: part.Mountpoint, - Device: part.Device, - FsType: part.Fstype, - } - partitions = append(partitions, &pm) - } - return partitions -} - func releaseInfo(osReleaseFile string) (release *proto.ReleaseInfo) { hostReleaseInfo := getHostReleaseInfo() osRelease, err := getOsRelease(osReleaseFile) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 04dcc944e..f29f410ee 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -131,7 +131,6 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []*Proc n.nginxWorkersMap[nginxDetails.GetNginxId()] = append(n.nginxWorkersMap[nginxDetails.GetNginxId()], nginxDetails) } } - } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go index da6787a24..2039a2ce2 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go @@ -12,6 +12,31 @@ type MockEnvironment struct { mock.Mock } +// Disks implements core.Environment. +func (*MockEnvironment) Disks() ([]*proto.DiskPartition, error) { + return []*proto.DiskPartition{ + { + Device: "sd01", + MountPoint: "/sd01", + FsType: "ext4", + }, + { + Device: "sd02", + MountPoint: "/sd02", + FsType: "ext4", + }, + }, nil +} + +func (*MockEnvironment) DiskUsage(mountpoint string) (*core.DiskUsage, error) { + return &core.DiskUsage{ + Total: 20, + Used: 10, + Free: 10, + UsedPercentage: 100, + }, nil +} + func GetProcesses() []*core.Process { return []*core.Process{ {Pid: 1, Name: "12345", IsMaster: true}, diff --git a/test/integration/vendor/golang.org/x/sync/singleflight/singleflight.go b/test/integration/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..8473fb792 --- /dev/null +++ b/test/integration/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/test/integration/vendor/modules.txt b/test/integration/vendor/modules.txt index 90eb66298..4d683e6a4 100644 --- a/test/integration/vendor/modules.txt +++ b/test/integration/vendor/modules.txt @@ -978,6 +978,7 @@ golang.org/x/oauth2/internal ## explicit; go 1.17 golang.org/x/sync/errgroup golang.org/x/sync/semaphore +golang.org/x/sync/singleflight # golang.org/x/sys v0.11.0 ## explicit; go 1.17 golang.org/x/sys/cpu diff --git a/test/performance/metrics_test.go b/test/performance/metrics_test.go index a3c1966b2..a6740e636 100644 --- a/test/performance/metrics_test.go +++ b/test/performance/metrics_test.go @@ -318,15 +318,17 @@ func startNginxAgent(b *testing.B) { return } + processes := env.Processes() + corePlugins := []core.Plugin{ plugins.NewConfigReader(loadedConfig), - plugins.NewNginx(commander, binary, env, &config.Config{}), + plugins.NewNginx(commander, binary, env, &config.Config{}, processes), plugins.NewCommander(commander, loadedConfig), plugins.NewMetricsSender(reporter), - plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.New().String())), - plugins.NewMetrics(loadedConfig, env, binary), + plugins.NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.New().String()), processes), + plugins.NewMetrics(loadedConfig, env, binary, processes), plugins.NewMetricsThrottle(loadedConfig, env), - plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env), + plugins.NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.New().String()), binary, env, processes), } messagePipe := core.NewMessagePipe(ctx) diff --git a/test/performance/plugins_test.go b/test/performance/plugins_test.go index 2e7cb0a41..f2bbcee9b 100644 --- a/test/performance/plugins_test.go +++ b/test/performance/plugins_test.go @@ -223,7 +223,7 @@ func BenchmarkPluginOneTimeRegistration(b *testing.B) { messagePipe := core.NewMessagePipe(ctx) for n := 0; n < b.N; n++ { - pluginsUnderTest = append(pluginsUnderTest, plugins.NewOneTimeRegistration(&config, binary, env, &meta)) + pluginsUnderTest = append(pluginsUnderTest, plugins.NewOneTimeRegistration(&config, binary, env, &meta, tutils.GetProcesses())) } err := messagePipe.Register(b.N, pluginsUnderTest, []core.ExtensionPlugin{}) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go index 8cfab0119..b80a0112d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -30,6 +30,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/process" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" "golang.org/x/sys/unix" "github.com/nginx/agent/sdk/v2/files" @@ -52,7 +53,9 @@ type Environment interface { DeleteFile(backup ConfigApplyMarker, fileName string) error Processes() (result []*Process) FileStat(path string) (os.FileInfo, error) + Disks() ([]*proto.DiskPartition, error) DiskDevices() ([]string, error) + DiskUsage(mountPoint string) (*DiskUsage, error) GetContainerID() (string, error) GetNetOverflow() (float64, error) IsContainer() bool @@ -80,6 +83,13 @@ type Process struct { Command string } +type DiskUsage struct { + Total float64 + Used float64 + Free float64 + UsedPercentage float64 +} + const ( lengthOfContainerId = 64 versionId = "VERSION_ID" @@ -91,16 +101,19 @@ const ( KernProc = 14 // struct: process entries KernProcPathname = 12 // path to executable SYS_SYSCTL = 202 + IsContainerKey = "isContainer" + GetContainerIDKey = "GetContainerID" + GetSystemUUIDKey = "GetSystemUUIDKey" ) var ( - virtualizationFunc = host.VirtualizationWithContext - _ Environment = &EnvironmentType{} - basePattern = regexp.MustCompile("/([a-f0-9]{64})$") - colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") - scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) - containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") - containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") + virtualizationFunc = host.VirtualizationWithContext + singleflightGroup = &singleflight.Group{} + basePattern = regexp.MustCompile("/([a-f0-9]{64})$") + colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") + scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) + containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") + containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") ) func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { @@ -123,6 +136,12 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer tags = &[]string{} } + disks, err := env.Disks() + if err != nil { + log.Warnf("Unable to get disks information from the host: %v", err) + disks = nil + } + hostInfoFacacde := &proto.HostInfo{ Agent: agentVersion, Boot: hostInformation.BootTime, @@ -131,7 +150,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer OsType: hostInformation.OS, Uuid: env.GetSystemUUID(), Uname: getUnixName(), - Partitons: diskPartitions(), + Partitons: disks, Network: env.networks(), Processor: processors(hostInformation.KernelArch), Release: releaseInfo("/etc/os-release"), @@ -194,24 +213,32 @@ func (env *EnvironmentType) GetHostname() string { } func (env *EnvironmentType) GetSystemUUID() string { - ctx := context.Background() - defer ctx.Done() + res, err, _ := singleflightGroup.Do(GetSystemUUIDKey, func() (interface{}, error) { + var err error + ctx := context.Background() + defer ctx.Done() - if env.IsContainer() { - containerID, err := env.GetContainerID() - if err != nil { - log.Errorf("Unable to read docker container ID: %v", err) - return "" + if env.IsContainer() { + containerID, err := env.GetContainerID() + if err != nil { + return "", fmt.Errorf("unable to read docker container ID: %v", err) + } + return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String(), err } - return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String() - } - hostInfo, err := host.InfoWithContext(ctx) + hostID, err := host.HostIDWithContext(ctx) + if err != nil { + log.Warnf("Unable to read host id from dataplane, defaulting value. Error: %v", err) + return "", err + } + return uuid.NewMD5(uuid.Nil, []byte(hostID)).String(), err + }) if err != nil { - log.Infof("Unable to read host id from dataplane, defaulting value. Error: %v", err) + log.Warnf("Unable to set hostname due to %v", err) return "" } - return uuid.NewMD5(uuid.Nil, []byte(hostInfo.HostID)).String() + + return res.(string) } func (env *EnvironmentType) ReadDirectory(dir string, ext string) ([]string, error) { @@ -307,18 +334,25 @@ func (env *EnvironmentType) IsContainer() bool { k8sServiceAcct = "/var/run/secrets/kubernetes.io/serviceaccount" ) - for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { - if _, err := os.Stat(filename); err == nil { - log.Debugf("is a container because (%s) exists", filename) - return true + res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) { + for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { + if _, err := os.Stat(filename); err == nil { + log.Debugf("Is a container because (%s) exists", filename) + return true, nil + } } - } - // v1 check - if result, err := cGroupV1Check(selfCgroup); err == nil && result { - return result + // v1 check + if result, err := cGroupV1Check(selfCgroup); err == nil && result { + return result, err + } + return false, nil + }) + + if err != nil { + log.Warnf("Unable to retrieve values from cache (%v)", err) } - return false + return res.(bool) } // cGroupV1Check returns if running cgroup v1 @@ -345,20 +379,24 @@ func cGroupV1Check(cgroupFile string) (bool, error) { // GetContainerID returns the ID of the current environment if running in a container func (env *EnvironmentType) GetContainerID() (string, error) { - const mountInfo = "/proc/self/mountinfo" + res, err, _ := singleflightGroup.Do(GetContainerIDKey, func() (interface{}, error) { + const mountInfo = "/proc/self/mountinfo" - if !env.IsContainer() { - return "", errors.New("not in docker") - } + if !env.IsContainer() { + return "", errors.New("not in docker") + } - containerID, err := getContainerID(mountInfo) - if err != nil { - return "", fmt.Errorf("could not get container ID: %v", err) - } + containerID, err := getContainerID(mountInfo) + if err != nil { + return "", fmt.Errorf("could not get container ID: %v", err) + } + + log.Debugf("Container ID: %s", containerID) - log.Debugf("Container ID: %s", containerID) + return containerID, err + }) - return containerID, err + return res.(string), err } // getContainerID returns the container ID of the current running environment. @@ -428,6 +466,42 @@ func (env *EnvironmentType) DiskDevices() ([]string, error) { } } +func (env *EnvironmentType) Disks() (partitions []*proto.DiskPartition, err error) { + ctx := context.Background() + defer ctx.Done() + parts, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + // return an array of 0 + log.Errorf("Could not read disk partitions for host: %v", err) + return []*proto.DiskPartition{}, err + } + for _, part := range parts { + pm := proto.DiskPartition{ + MountPoint: part.Mountpoint, + Device: part.Device, + FsType: part.Fstype, + } + partitions = append(partitions, &pm) + } + return partitions, nil +} + +func (env *EnvironmentType) DiskUsage(mountPoint string) (*DiskUsage, error) { + ctx := context.Background() + defer ctx.Done() + usage, err := disk.UsageWithContext(ctx, mountPoint) + if err != nil { + return nil, errors.New("unable to obtain disk usage stats") + } + + return &DiskUsage{ + Total: float64(usage.Total), + Used: float64(usage.Used), + Free: float64(usage.Free), + UsedPercentage: float64(usage.UsedPercent), + }, nil +} + func (env *EnvironmentType) GetNetOverflow() (float64, error) { return network.GetNetOverflow() } @@ -795,26 +869,6 @@ func virtualization() (string, string) { return virtualizationSystem, virtualizationRole } -func diskPartitions() (partitions []*proto.DiskPartition) { - ctx := context.Background() - defer ctx.Done() - parts, err := disk.PartitionsWithContext(ctx, false) - if err != nil { - // return an array of 0 - log.Errorf("Could not read disk partitions for host: %v", err) - return []*proto.DiskPartition{} - } - for _, part := range parts { - pm := proto.DiskPartition{ - MountPoint: part.Mountpoint, - Device: part.Device, - FsType: part.Fstype, - } - partitions = append(partitions, &pm) - } - return partitions -} - func releaseInfo(osReleaseFile string) (release *proto.ReleaseInfo) { hostReleaseInfo := getHostReleaseInfo() osRelease, err := getOsRelease(osReleaseFile) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go index 0aa1060b3..a92c4a7b8 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go @@ -40,7 +40,7 @@ func NewSystemCollector(env core.Environment, conf *config.Config) *SystemCollec systemSources = []metrics.Source{ sources.NewVirtualMemorySource(sources.SystemNamespace, env), sources.NewCPUTimesSource(sources.SystemNamespace, env), - sources.NewDiskSource(sources.SystemNamespace), + sources.NewDiskSource(sources.SystemNamespace, env), sources.NewDiskIOSource(sources.SystemNamespace, env), sources.NewNetIOSource(sources.SystemNamespace, env), sources.NewLoadSource(sources.SystemNamespace), diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go index 29b788953..472133097 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go @@ -13,8 +13,8 @@ import ( "sync" "github.com/nginx/agent/sdk/v2/proto" + "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - "github.com/shirou/gopsutil/v3/disk" ) const MOUNT_POINT = "mount_point" @@ -22,25 +22,24 @@ const MOUNT_POINT = "mount_point" type Disk struct { logger *MetricSourceLogger *namedMetric - disks []disk.PartitionStat + disks []*proto.DiskPartition + env core.Environment } -func NewDiskSource(namespace string) *Disk { - ctx := context.Background() - defer ctx.Done() - disks, _ := disk.PartitionsWithContext(ctx, false) - return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks} +func NewDiskSource(namespace string, env core.Environment) *Disk { + disks, _ := env.Disks() + return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks, env} } func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { defer wg.Done() for _, part := range c.disks { - if part.Device == "" || part.Fstype == "" { + if part.Device == "" || part.FsType == "" { continue } - usage, err := disk.UsageWithContext(ctx, part.Mountpoint) + usage, err := c.env.DiskUsage(part.MountPoint) if err != nil { - c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.Mountpoint, err)) + c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.MountPoint, err)) continue } @@ -48,14 +47,14 @@ func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metric "total": float64(usage.Total), "used": float64(usage.Used), "free": float64(usage.Free), - "in_use": float64(usage.UsedPercent), + "in_use": float64(usage.UsedPercentage), }) select { case <-ctx.Done(): return // mount point is not a common dim - case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.Mountpoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): + case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.MountPoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 04dcc944e..f29f410ee 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -131,7 +131,6 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []*Proc n.nginxWorkersMap[nginxDetails.GetNginxId()] = append(n.nginxWorkersMap[nginxDetails.GetNginxId()], nginxDetails) } } - } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go index 512f2d578..2fe5e86ef 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go @@ -18,6 +18,7 @@ import ( "net/http" "os" "regexp" + "sync" "time" "github.com/nginx/agent/v2/src/core/metrics" @@ -60,6 +61,7 @@ type AgentAPI struct { nginxBinary core.NginxBinary nginxHandler *NginxHandler exporter *prometheus_metrics.Exporter + processes []*core.Process } type NginxHandler struct { @@ -69,6 +71,8 @@ type NginxHandler struct { nginxBinary core.NginxBinary responseChannel chan *proto.Command_NginxConfigResponse configResponseStatuses map[string]*proto.NginxConfigStatus + processesMutex sync.RWMutex + processes []*core.Process } // swagger:parameters apply-nginx-config @@ -133,12 +137,13 @@ const ( jsonMimeType = "application/json" ) -func NewAgentAPI(config *config.Config, env core.Environment, nginxBinary core.NginxBinary) *AgentAPI { +func NewAgentAPI(config *config.Config, env core.Environment, nginxBinary core.NginxBinary, processes []*core.Process) *AgentAPI { return &AgentAPI{ config: config, env: env, nginxBinary: nginxBinary, exporter: prometheus_metrics.NewExporter(&proto.MetricsReport{}), + processes: processes, } } @@ -181,6 +186,11 @@ func (a *AgentAPI) Process(message *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, response) } + case core.NginxDetailProcUpdate: + a.processes = message.Data().([]*core.Process) + if a.nginxHandler != nil { + a.nginxHandler.syncProcessInfo(a.processes) + } } } @@ -195,6 +205,7 @@ func (a *AgentAPI) Subscriptions() []string { core.NginxConfigValidationPending, core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, + core.NginxDetailProcUpdate, } } @@ -206,6 +217,7 @@ func (a *AgentAPI) createHttpServer() { nginxBinary: a.nginxBinary, responseChannel: make(chan *proto.Command_NginxConfigResponse), configResponseStatuses: make(map[string]*proto.NginxConfigStatus), + processes: a.processes, } mux := http.NewServeMux() @@ -462,7 +474,7 @@ func readFileFromRequest(r *http.Request) (*bytes.Buffer, error) { func (h *NginxHandler) getNginxDetails() []*proto.NginxDetails { var nginxDetails []*proto.NginxDetails - for _, proc := range h.env.Processes() { + for _, proc := range h.getNginxProccessInfo() { if proc.IsMaster { nginxDetails = append(nginxDetails, h.nginxBinary.GetNginxDetailsFromProcess(proc)) } @@ -581,6 +593,18 @@ func (h *NginxHandler) getConfigStatus(w http.ResponseWriter, r *http.Request) e return writeObjectToResponseBody(w, agentAPIConfigApplyStatusResponse) } +func (h *NginxHandler) getNginxProccessInfo() []*core.Process { + h.processesMutex.RLock() + defer h.processesMutex.RUnlock() + return h.processes +} + +func (h *NginxHandler) syncProcessInfo(processInfo []*core.Process) { + h.processesMutex.Lock() + defer h.processesMutex.Unlock() + h.processes = processInfo +} + func writeObjectToResponseBody(w http.ResponseWriter, response any) error { respBody := new(bytes.Buffer) err := json.NewEncoder(respBody).Encode(response) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go index 397db17bf..1c13793f5 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/common.go @@ -19,6 +19,8 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E var corePlugins []core.Plugin var extensionPlugins []core.ExtensionPlugin + processes := env.Processes() + if commander != nil { corePlugins = append(corePlugins, NewCommander(commander, loadedConfig), @@ -40,18 +42,18 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E corePlugins = append(corePlugins, NewConfigReader(loadedConfig), - NewNginx(commander, binary, env, loadedConfig), + NewNginx(commander, binary, env, loadedConfig, processes), NewExtensions(loadedConfig, env), - NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, agentEventsMeta), + NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta), ) if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) { - corePlugins = append(corePlugins, NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()))) + corePlugins = append(corePlugins, NewOneTimeRegistration(loadedConfig, binary, env, sdkGRPC.NewMessageMeta(uuid.NewString()), processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsCollection) || (len(loadedConfig.Nginx.NginxCountingSocket) > 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxCounting)) { - corePlugins = append(corePlugins, NewMetrics(loadedConfig, env, binary)) + corePlugins = append(corePlugins, NewMetrics(loadedConfig, env, binary, processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsThrottle) { @@ -59,11 +61,11 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E } if loadedConfig.IsFeatureEnabled(agent_config.FeatureDataPlaneStatus) { - corePlugins = append(corePlugins, NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env)) + corePlugins = append(corePlugins, NewDataPlaneStatus(loadedConfig, sdkGRPC.NewMessageMeta(uuid.NewString()), binary, env, processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureProcessWatcher) { - corePlugins = append(corePlugins, NewProcessWatcher(env, binary)) + corePlugins = append(corePlugins, NewProcessWatcher(env, binary, processes)) } if loadedConfig.IsFeatureEnabled(agent_config.FeatureActivityEvents) { @@ -71,7 +73,7 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E } if loadedConfig.AgentAPI.Port != 0 && loadedConfig.IsFeatureEnabled(agent_config.FeatureAgentAPI) { - corePlugins = append(corePlugins, NewAgentAPI(loadedConfig, env, binary)) + corePlugins = append(corePlugins, NewAgentAPI(loadedConfig, env, binary, processes)) } else { log.Info("Agent API not configured") } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index d1aef6ae7..6b6014a5f 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -42,13 +42,14 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + processes []*core.Process } const ( defaultMinInterval = time.Second * 30 ) -func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment) *DataPlaneStatus { +func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core.NginxBinary, env core.Environment, processes []*core.Process) *DataPlaneStatus { log.Tracef("Dataplane status interval %s", config.Dataplane.Status.PollInterval) pollInt := config.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { @@ -69,6 +70,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), + processes: processes, } } @@ -129,6 +131,8 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } + case msg.Exact(core.NginxDetailProcUpdate): + dps.processes = msg.Data().([]*core.Process) } } @@ -139,6 +143,7 @@ func (dps *DataPlaneStatus) Subscriptions() []string { core.NginxConfigValidationPending, core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, + core.NginxDetailProcUpdate, } } @@ -182,7 +187,6 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface) } func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus { - processes := dps.env.Processes() forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails) agentActivityStatuses := []*proto.AgentActivityStatus{} @@ -200,8 +204,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), - Details: dps.detailsForProcess(processes, forceDetails), - Healths: dps.healthForProcess(processes), + Details: dps.detailsForProcess(dps.processes, forceDetails), + Healths: dps.healthForProcess(dps.processes), DataplaneSoftwareDetails: dataplaneSoftwareDetails, AgentActivityStatus: agentActivityStatuses, } @@ -210,7 +214,7 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send - hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, true) + hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go index 44aabefed..2c21b300e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/features.go @@ -27,16 +27,26 @@ type Features struct { binary core.NginxBinary version string featureMap map[string]func(data string) []core.Plugin + processes []*core.Process agentEventsMeta *events.AgentEventMeta } -func NewFeatures(commander client.Commander, conf *config.Config, env core.Environment, binary core.NginxBinary, version string, agentEventsMeta *events.AgentEventMeta) *Features { +func NewFeatures( + commander client.Commander, + conf *config.Config, + env core.Environment, + binary core.NginxBinary, + version string, + processes []*core.Process, + agentEventsMeta *events.AgentEventMeta, +) *Features { return &Features{ commander: commander, conf: conf, env: env, binary: binary, version: version, + processes: processes, agentEventsMeta: agentEventsMeta, } } @@ -118,6 +128,8 @@ func (f *Features) Process(msg *core.Message) { for _, plugin := range plugins { plugin.Init(f.pipeline) } + } else if msg.Topic() == core.NginxDetailProcUpdate { + f.processes = msg.Data().([]*core.Process) } } @@ -130,7 +142,7 @@ func (f *Features) enableMetricsFeature(data string) []core.Plugin { } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, f.processes) metricsThrottle := NewMetricsThrottle(f.conf, f.env) metricsSender := NewMetricsSender(f.commander) @@ -149,7 +161,7 @@ func (f *Features) enableMetricsCollectionFeature(data string) []core.Plugin { } f.conf = conf - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, f.processes) return []core.Plugin{metrics} } @@ -198,7 +210,7 @@ func (f *Features) enableAgentAPIFeature(data string) []core.Plugin { } f.conf = conf - api := NewAgentAPI(f.conf, f.env, f.binary) + api := NewAgentAPI(f.conf, f.env, f.binary, f.processes) return []core.Plugin{api} } @@ -213,7 +225,7 @@ func (f *Features) enableRegistrationFeature(data string) []core.Plugin { } f.conf = conf - registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString())) + registration := NewOneTimeRegistration(f.conf, f.binary, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.processes) return []core.Plugin{registration} } @@ -228,7 +240,7 @@ func (f *Features) enableDataPlaneStatusFeature(data string) []core.Plugin { } f.conf = conf - dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env) + dataPlaneStatus := NewDataPlaneStatus(f.conf, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.env, f.processes) return []core.Plugin{dataPlaneStatus} } @@ -243,7 +255,7 @@ func (f *Features) enableProcessWatcherFeature(data string) []core.Plugin { } f.conf = conf - processWatcher := NewProcessWatcher(f.env, f.binary) + processWatcher := NewProcessWatcher(f.env, f.binary, f.processes) return []core.Plugin{processWatcher} } @@ -294,7 +306,7 @@ func (f *Features) enableNginxCountingFeature(data string) []core.Plugin { if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) && !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) { - metrics := NewMetrics(f.conf, f.env, f.binary) + metrics := NewMetrics(f.conf, f.env, f.binary, f.processes) countingPlugins = append(countingPlugins, metrics) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 77d5d7309..eaf35d07d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -39,10 +39,12 @@ type Metrics struct { env core.Environment conf *config.Config binary core.NginxBinary + processesMutex sync.RWMutex + processes []*core.Process } -func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { - collectorConfigsMap := createCollectorConfigsMap(config, env, binary) +func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, processes []*core.Process) *Metrics { + collectorConfigsMap := createCollectorConfigsMap(config, env, binary, processes) return &Metrics{ collectorsUpdate: atomic.NewBool(false), ticker: time.NewTicker(config.AgentMetrics.CollectionInterval), @@ -56,6 +58,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi env: env, conf: config, binary: binary, + processes: processes, } } @@ -77,7 +80,7 @@ func (m *Metrics) Process(msg *core.Message) { case msg.Exact(core.AgentConfigChanged), msg.Exact(core.NginxConfigApplySucceeded): // If the agent config on disk changed or the NGINX statusAPI was updated // Then update Metrics with relevant config info - collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary) + collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary, m.getNginxProccessInfo()) m.collectorConfigsMapMutex.Lock() m.collectorConfigsMap = collectorConfigsMap m.collectorConfigsMapMutex.Unlock() @@ -97,7 +100,8 @@ func (m *Metrics) Process(msg *core.Message) { return case msg.Exact(core.NginxDetailProcUpdate): - collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary) + m.syncProcessInfo(msg.Data().([]*core.Process)) + collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary, m.getNginxProccessInfo()) for key, collectorConfig := range collectorConfigsMap { if _, ok := m.collectorConfigsMap[key]; !ok { log.Debugf("Adding new nginx collector for nginx id: %s", collectorConfig.NginxId) @@ -299,10 +303,9 @@ func (m *Metrics) syncAgentConfigChange() { m.conf = conf } -func createCollectorConfigsMap(config *config.Config, env core.Environment, binary core.NginxBinary) map[string]*metrics.NginxCollectorConfig { +func createCollectorConfigsMap(config *config.Config, env core.Environment, binary core.NginxBinary, processes []*core.Process) map[string]*metrics.NginxCollectorConfig { collectorConfigsMap := make(map[string]*metrics.NginxCollectorConfig) - processes := env.Processes() for _, p := range processes { if !p.IsMaster { continue @@ -357,3 +360,15 @@ func (m *Metrics) updateCollectorsSources() { } } } + +func (m *Metrics) getNginxProccessInfo() []*core.Process { + m.processesMutex.RLock() + defer m.processesMutex.RUnlock() + return m.processes +} + +func (m *Metrics) syncProcessInfo(processInfo []*core.Process) { + m.processesMutex.Lock() + defer m.processesMutex.Unlock() + m.processes = processInfo +} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go index 09b5848ab..1937ca8dc 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go @@ -89,14 +89,14 @@ type NginxConfigValidationResponse struct { elapsedTime time.Duration } -func NewNginx(cmdr client.Commander, nginxBinary core.NginxBinary, env core.Environment, loadedConfig *config.Config) *Nginx { +func NewNginx(cmdr client.Commander, nginxBinary core.NginxBinary, env core.Environment, loadedConfig *config.Config, processes []*core.Process) *Nginx { isFeatureNginxConfigEnabled := loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxConfig) || loadedConfig.IsFeatureEnabled(agent_config.FeatureNginxConfigAsync) isNginxAppProtectEnabled := loadedConfig.IsExtensionEnabled(agent_config.NginxAppProtectExtensionPlugin) return &Nginx{ nginxBinary: nginxBinary, - processes: env.Processes(), + processes: processes, env: env, cmdr: cmdr, config: loadedConfig, @@ -530,9 +530,6 @@ func (n *Nginx) completeConfigApply(response *NginxConfigValidationResponse) (st }, } - n.syncProcessInfo(n.env.Processes()) - n.nginxBinary.UpdateNginxDetailsFromProcesses(n.processes) - n.messagePipeline.Process(core.NewMessage(core.NginxConfigApplySucceeded, agentActivityStatus)) status = &proto.Command_NginxConfigResponse{ @@ -693,7 +690,6 @@ func (n *Nginx) handleErrorStatus(status *proto.Command_NginxConfigResponse, mes func (n *Nginx) uploadConfigs() { systemId := n.env.GetSystemUUID() - n.syncProcessInfo(n.env.Processes()) for nginxID := range n.nginxBinary.GetNginxDetailsMapFromProcesses(n.getNginxProccessInfo()) { err := n.uploadConfig( &proto.ConfigDescriptor{ diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/process_watcher.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/process_watcher.go index 9df477aa2..37c123bb9 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/process_watcher.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/process_watcher.go @@ -29,27 +29,27 @@ type ProcessWatcher struct { wg sync.WaitGroup env core.Environment binary core.NginxBinary + processes []*core.Process } -func NewProcessWatcher(env core.Environment, nginxBinary core.NginxBinary) *ProcessWatcher { +func NewProcessWatcher(env core.Environment, nginxBinary core.NginxBinary, processes []*core.Process) *ProcessWatcher { return &ProcessWatcher{ - ticker: time.NewTicker(time.Millisecond * 500), + ticker: time.NewTicker(5 * time.Second), seenMasterProcs: make(map[int32]*core.Process), seenWorkerProcs: make(map[int32]*core.Process), nginxDetails: make(map[int32]*proto.NginxDetails), wg: sync.WaitGroup{}, env: env, binary: nginxBinary, + processes: processes, } } func (pw *ProcessWatcher) Init(pipeline core.MessagePipeInterface) { log.Info("ProcessWatcher initializing") - pw.messagePipeline = pipeline - nginxProcesses := pw.env.Processes() - for _, proc := range nginxProcesses { + for _, proc := range pw.processes { if proc.IsMaster { pw.seenMasterProcs[proc.Pid] = proc } else { @@ -61,7 +61,7 @@ func (pw *ProcessWatcher) Init(pipeline core.MessagePipeInterface) { pw.wg.Add(1) go pw.watchProcLoop(pipeline.Context()) - pw.messagePipeline.Process(core.NewMessage(core.NginxDetailProcUpdate, nginxProcesses)) + pw.messagePipeline.Process(core.NewMessage(core.NginxDetailProcUpdate, pw.processes)) } func (pw *ProcessWatcher) Info() *core.Info { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go index c57209fc0..535e01fee 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go @@ -42,6 +42,7 @@ type OneTimeRegistration struct { dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails pipeline core.MessagePipeInterface dataplaneSoftwareDetailsMutex sync.Mutex + processes []*core.Process } func NewOneTimeRegistration( @@ -49,6 +50,7 @@ func NewOneTimeRegistration( binary core.NginxBinary, env core.Environment, meta *proto.Metadata, + processes []*core.Process, ) *OneTimeRegistration { // this might be slow so do on startup host := env.NewHostInfo(config.Version, &config.Tags, config.ConfigDirs, true) @@ -62,6 +64,7 @@ func NewOneTimeRegistration( binary: binary, dataplaneSoftwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), dataplaneSoftwareDetailsMutex: sync.Mutex{}, + processes: processes, } } @@ -150,7 +153,7 @@ func (r *OneTimeRegistration) areDataplaneSoftwareDetailsReady() error { func (r *OneTimeRegistration) registerAgent() { var details []*proto.NginxDetails - for _, proc := range r.env.Processes() { + for _, proc := range r.processes { // only need master process for registration if proc.IsMaster { nginxDetails := r.binary.GetNginxDetailsFromProcess(proc) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go index da6787a24..2039a2ce2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go @@ -12,6 +12,31 @@ type MockEnvironment struct { mock.Mock } +// Disks implements core.Environment. +func (*MockEnvironment) Disks() ([]*proto.DiskPartition, error) { + return []*proto.DiskPartition{ + { + Device: "sd01", + MountPoint: "/sd01", + FsType: "ext4", + }, + { + Device: "sd02", + MountPoint: "/sd02", + FsType: "ext4", + }, + }, nil +} + +func (*MockEnvironment) DiskUsage(mountpoint string) (*core.DiskUsage, error) { + return &core.DiskUsage{ + Total: 20, + Used: 10, + Free: 10, + UsedPercentage: 100, + }, nil +} + func GetProcesses() []*core.Process { return []*core.Process{ {Pid: 1, Name: "12345", IsMaster: true}, diff --git a/test/performance/vendor/golang.org/x/sync/singleflight/singleflight.go b/test/performance/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..8473fb792 --- /dev/null +++ b/test/performance/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/test/performance/vendor/modules.txt b/test/performance/vendor/modules.txt index db5d0da4a..f13622221 100644 --- a/test/performance/vendor/modules.txt +++ b/test/performance/vendor/modules.txt @@ -338,6 +338,7 @@ golang.org/x/net/trace # golang.org/x/sync v0.3.0 ## explicit; go 1.17 golang.org/x/sync/errgroup +golang.org/x/sync/singleflight # golang.org/x/sys v0.11.0 ## explicit; go 1.17 golang.org/x/sys/cpu diff --git a/test/utils/environment.go b/test/utils/environment.go index da6787a24..2039a2ce2 100644 --- a/test/utils/environment.go +++ b/test/utils/environment.go @@ -12,6 +12,31 @@ type MockEnvironment struct { mock.Mock } +// Disks implements core.Environment. +func (*MockEnvironment) Disks() ([]*proto.DiskPartition, error) { + return []*proto.DiskPartition{ + { + Device: "sd01", + MountPoint: "/sd01", + FsType: "ext4", + }, + { + Device: "sd02", + MountPoint: "/sd02", + FsType: "ext4", + }, + }, nil +} + +func (*MockEnvironment) DiskUsage(mountpoint string) (*core.DiskUsage, error) { + return &core.DiskUsage{ + Total: 20, + Used: 10, + Free: 10, + UsedPercentage: 100, + }, nil +} + func GetProcesses() []*core.Process { return []*core.Process{ {Pid: 1, Name: "12345", IsMaster: true}, diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..8473fb792 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f73f4696f..02d4349e8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1635,6 +1635,7 @@ golang.org/x/net/trace ## explicit; go 1.17 golang.org/x/sync/errgroup golang.org/x/sync/semaphore +golang.org/x/sync/singleflight # golang.org/x/sys v0.11.0 ## explicit; go 1.17 golang.org/x/sys/cpu