diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9455dfe519..6252ac4a25 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -238,7 +238,7 @@ jobs: name: official-oss-integration-test-logs-${{ matrix.container.image }}-${{ matrix.container.version }} path: /tmp/integration-test-logs/ retention-days: 3 - + official-plus-image-integration-tests: name: Integration Tests - Official Plus Images needs: build-unsigned-snapshot diff --git a/.github/workflows/f5-cla.yml b/.github/workflows/f5-cla.yml new file mode 100644 index 0000000000..de0dbc8a55 --- /dev/null +++ b/.github/workflows/f5-cla.yml @@ -0,0 +1,51 @@ +name: F5 CLA + +on: + issue_comment: + types: + - created + pull_request_target: + types: + - opened + - synchronize + - reopened + +concurrency: + group: ${{ github.ref_name }}-cla + +permissions: + contents: read + +jobs: + f5-cla: + name: F5 CLA + runs-on: ubuntu-22.04 + permissions: + actions: write + contents: read + pull-requests: write + statuses: write + steps: + - name: Run F5 Contributor License Agreement (CLA) assistant + if: (github.event.comment.body == 'recheck' || github.event.comment.body == 'I have hereby read the F5 CLA and agree to its terms') || github.event_name == 'pull_request_target' + uses: contributor-assistant/github-action@f41946747f85d28e9a738f4f38dbcc74b69c7e0e # v2.5.1 + with: + # Any pull request targeting the following branch will trigger a CLA check. + branch: "main" + # Path to the CLA document. + path-to-document: "https://github.com/f5/.github/blob/main/CLA/cla-markdown.md" + # Custom CLA messages. + custom-notsigned-prcomment: "🎉 Thank you for your contribution! It appears you have not yet signed the F5 Contributor License Agreement (CLA), which is required for your changes to be incorporated into an F5 Open Source Software (OSS) project. Please kindly read the [F5 CLA](https://github.com/f5/.github/blob/main/CLA/cla-markdown.md) and reply on a new comment with the following text to agree:" + custom-pr-sign-comment: "I have hereby read the F5 CLA and agree to its terms" + custom-allsigned-prcomment: "✅ All required contributors have signed the F5 CLA for this PR. Thank you!" + # Remote repository storing CLA signatures. + remote-organization-name: "f5" + remote-repository-name: "f5-cla-data" + path-to-signatures: "signatures/beta/signatures.json" + # Comma separated list of usernames for maintainers or any other individuals who should not be prompted for a CLA. + allowlist: bot* + # Do not lock PRs after a merge. + lock-pullrequest-aftermerge: false + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + PERSONAL_ACCESS_TOKEN: ${{ secrets.F5_CLA_TOKEN }} diff --git a/.github/workflows/release-branch.yml b/.github/workflows/release-branch.yml index 78fcb34ba6..d1b6e9aae1 100644 --- a/.github/workflows/release-branch.yml +++ b/.github/workflows/release-branch.yml @@ -10,11 +10,6 @@ on: required: true type: boolean default: false - createPullRequest: - description: 'Create pull request back into main' - required: true - type: boolean - default: false uploadJWT: description: 'Temporary JWT to publish packages to up-ap.nginx.com' required: true @@ -125,12 +120,27 @@ jobs: run: | sudo apt-get update sudo apt-get install -y gpgv1 monkeysphere - go install github.com/goreleaser/nfpm/v2/cmd/nfpm@${{ env.NFPM_VERSION }} - - name: Tag release + go install github.com/goreleaser/nfpm/v2/cmd/nfpm@v2.35.3 + - name: Generate Changelog + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | + git clone https://github.com/nginx/agent-changelog.git + cd ./agent-changelog/source + pip install -r requirements.txt + python agent.py + - name: Push Changelog + run: | + mv agent-changelog/source/changelog.md ./site/content/ + git config --global user.name 'github-actions' git config --global user.email '41898282+github-actions[bot]@users.noreply.github.com' + git add ./site/content/changelog.md + git commit -m "Add generated changelog" + git push origin HEAD:${{ github.ref_name }} + - name: Tag release + run: | git tag -a "v${{env.VERSION}}" -m "CI Autogenerated" git tag -a "sdk/v${{env.VERSION}}" -m "CI Autogenerated" - name: Push Tags @@ -175,8 +185,6 @@ jobs: az logout if: always() - name: Upload Release Assets - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # clobber overwrites existing assets of the same name run: | gh release upload --clobber v${{env.VERSION}} \ @@ -202,7 +210,7 @@ jobs: })) console.log(`Release published: ${release.data.html_url}`) - name: Create Pull Request - if: ${{ inputs.publishPackages == true && inputs.createPullRequest == true }} + if: ${{ inputs.publishPackages == true }} uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1 with: script: | diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d8ec3cb01e..bc6bf43094 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -35,6 +35,15 @@ To suggest a feature or enhancement, please create an issue on GitHub with the l Note: if you'd like to implement a new feature, please consider creating a feature request issue first to start a discussion about the feature. +### F5 Contributor License Agreement (CLA) + +F5 requires all external contributors to agree to the terms of the F5 CLA (available [here](https://github.com/f5/.github/blob/main/CLA/cla-markdown.md)) +before any of their changes can be incorporated into an F5 Open Source repository. + +If you have not yet agreed to the F5 CLA terms and submit a PR to this repository, a bot will prompt you to view and +agree to the F5 CLA. You will have to agree to the F5 CLA terms through a comment in the PR before any of your changes +can be merged. Your agreement signature will be safely stored by F5 and no longer be required in future PRs. + ## Code Guidelines diff --git a/Makefile.packaging b/Makefile.packaging index 84976dd54d..ab5b7b7f46 100644 --- a/Makefile.packaging +++ b/Makefile.packaging @@ -182,11 +182,18 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## gpg-key: ## Generate GPG public key $$(gpg --import $(NFPM_SIGNING_KEY_FILE)); \ keyid=$$(gpg --list-keys NGINX | egrep -A1 "^pub" | egrep -v "^pub" | tr -d '[:space:]'); \ + if [ -z "$$keyid" ]; then echo "Error: GPG key not found."; exit 1; fi; \ + # Check if the key is expired \ + # Look for the 'e' (expired) flag in the 'pub' or 'uid' lines \ + if gpg --list-keys --with-colons "$$keyid" | grep -E '^pub:e:|^uid:e:'; then \ + echo "Error: GPG key has expired."; \ + exit 1; \ + fi; \ expiry=1y; \ $$(gpg --quick-set-expire $$keyid $$expiry '*'); \ - # we need to convert the private gpg key to rsa pem format for pkg signing \ + # Convert the private GPG key to RSA PEM format for pkg signing \ $$(gpg --export-secret-key $$keyid | openpgp2ssh $$keyid > .key.rsa); \ - $$(gpg --output $(GPG_PUBLIC_KEY) --armor --export) + $$(gpg --output $(GPG_PUBLIC_KEY) --armor --export $$keyid) release: ## Publish tarball to the UPLOAD_URL echo "Publishing nginx-agent packages to ${UPLOAD_URL}"; \ diff --git a/nginx-agent.conf b/nginx-agent.conf index 5a714bb5c7..792665e3c8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -46,4 +46,4 @@ config_dirs: "/etc/nginx:/usr/local/etc/nginx:/usr/share/nginx/modules:/etc/nms" # host: 127.0.0.1 # # Set this value to a secure port number to prevent information leaks. - # port: 8038 \ No newline at end of file + # port: 8038 diff --git a/scripts/docker/nginx-plus/debian/Dockerfile b/scripts/docker/nginx-plus/debian/Dockerfile index 5e745e7200..ece3e314aa 100644 --- a/scripts/docker/nginx-plus/debian/Dockerfile +++ b/scripts/docker/nginx-plus/debian/Dockerfile @@ -20,6 +20,7 @@ RUN --mount=type=secret,id=nginx-crt,dst=/nginx-repo.crt \ ca-certificates \ gnupg \ lsb-release \ + procps \ && \ NGINX_GPGKEY=573BFD6B3D8FBC641079A6ABABF5BD827BD9BF62; \ found=''; \ diff --git a/sdk/client/metric_reporter.go b/sdk/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/sdk/client/metric_reporter.go +++ b/sdk/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/sdk/config_helpers.go b/sdk/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/sdk/config_helpers.go +++ b/sdk/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/site/content/installation-upgrade/container-environments/docker-images.md b/site/content/installation-upgrade/container-environments/docker-images.md index df6f4e46b8..bd457f471d 100644 --- a/site/content/installation-upgrade/container-environments/docker-images.md +++ b/site/content/installation-upgrade/container-environments/docker-images.md @@ -17,6 +17,11 @@ If you want to use NGINX Agent with NGINX Plus, you need to purchase an NGINX Pl See the requirements and supported operating systems in the [NGINX Agent Technical Specifications]({{< relref "technical-specifications.md" >}}) topic. +## Deploy Offical NGINX and NGINX Plus Containers + +Docker images are available in the [Deploying NGINX and NGINX Plus on Docker](https://docs.nginx.com/nginx/admin-guide/installing-nginx/installing-nginx-docker/) NGINX documentation. + +This guide provides instructions on how to build images with NGINX Agent and NGINX packaged together. It includes steps for downloading the necessary Docker images, configuring your Docker environment, and deploying NGINX and NGINX Plus containers. ## Set up your environment @@ -72,16 +77,6 @@ git clone git@github.com:nginx/agent.git {{% /tabs %}} -### Download the agent binary {#agent-binary} - -Before you can build a container image with NGINX, you must build or download the **agent** binary. - -The **agent** binary packages are available from the [NGINX Agent Releases](https://github.com/nginx/agent/releases) page on GitHub. - -Download the binary package for the operating system that you will use in the container image. - -Note the location and name of the downloaded package. You will need to use this when running the **make** command to build the image (referred to as `[PATH-TO-PACKAGE]` in the example commands below). - ### Download the NGINX Plus certificate and key {#myf5-download} {{< fa "circle-info" "text-muted" >}} **This step is required if you are using NGINX Plus. If you are using NGINX open source, you can skip this section.** @@ -105,61 +100,17 @@ In order to build a container image with NGINX Plus, you must provide the SSL ce - Be sure to replace the example certificate and key filenames shown in the example command with your actual file names. - The file names in the *build/certs* directory must match those shown in the example. -## Build the official NGINX Agent image with Docker - -{{}} - -{{%tab name="NGINX Open Source"%}} +## Run the NGINX Agent container -Change to the directory where the Dockerfile is located: +To run NGINX Agent container using Docker use the following command: ```shell -$ cd scripts/docker/official/nginx-oss-with-nginx-agent/alpine/ +docker pull docker-registry.nginx.com/nginx/agent:mainline ``` - -- To build an image that contains the latest NGINX Agent and the latest mainline version of NGINX run the following command: - - ```shell - $ docker build -t nginx-agent . --no-cache -f ./Dockerfile.mainline - ``` - -- To build an image that contains the latest NGINX Agent and the latest stable version of NGINX run the following command: - - ```shell - $ docker build -t nginx-agent . --no-cache -f ./Dockerfile.stable - ``` - -{{% /tab %}} - -{{%tab name="NGINX Plus"%}} - -1. Log in to [MyF5 Customer Portal](https://account.f5.com/myf5) and download your "nginx-repo.crt" and "nginx-repo.key" files. These files are also provided with the NGINX Plus trial package. - -1. Copy the files to the directory where the Dockerfile is located **scripts/docker/official/nginx-plus-with-nginx-agent/alpine/**. - -1. To build an image that contains the latest NGINX Agent and the latest version of NGINX Plus change to the directory where the Dockerfile is located: - ```shell -$ cd scripts/docker/official/nginx-plus-with-nginx-agent/alpine/ +docker tag docker-registry.nginx.com/nginx/agent:mainline nginx-agent ``` - -1. Run the following command to build the image: - ```shell -$ docker build -t nginx-agent . \ - --no-cache -f ./Dockerfile \ - --secret id=nginx-crt,src=nginx-repo.crt \ - --secret id=nginx-key,src=nginx-repo.key -``` -{{% /tab %}} -{{% /tabs %}} - - -## Run the NGINX Agent container - -Here is an example of how to run the NGINX Agent container using Docker: - -```console docker run --name nginx-agent -d nginx-agent ``` @@ -199,6 +150,20 @@ docker run --name nginx-agent -d \ nginx-agent ``` +To ensure that the REST Interface is correctly configured, you can use the `curl` command targeting the following endpoint from your terminal: + +```shell +curl 0.0.0.0:8038/nginx/ +``` + +If the REST Interface is configured correctly, then you should see a JSON object ouputted to the terminal containing metadata such as NGINX version, path to the NGINX conf, and runtime modules. + +**Sample Output:** + +```code +[{"nginx_id":"b636d4376dea15405589692d3c5d3869ff3a9b26b0e7bb4bb1aa7e658ace1437","version":"1.27.1","conf_path":"/etc/nginx/nginx.conf","process_id":"7","process_path":"/usr/sbin/nginx","start_time":1725878806000,"built_from_source":false,"loadable_modules":null,"runtime_modules":["http_addition_module","http_auth_request_module","http_dav_module","http_flv_module","http_gunzip_module","http_gzip_static_module","http_mp4_module","http_random_index_module","http_realip_module","http_secure_link_module","http_slice_module","http_ssl_module","http_stub_status_module","http_sub_module","http_v2_module","http_v3_module","mail_ssl_module","stream_realip_module","stream_ssl_module","stream_ssl_preread_module"],"plus":{"enabled":false,"release":""},"ssl":{"ssl_type":0,"details":["OpenSSL","3.3.0","9 Apr 2024 (running with OpenSSL 3.3.1 4 Jun 2024)"]},"status_url":"","configure_args":["","prefix=/etc/nginx","sbin-path=/usr/sbin/nginx","modules-path=/usr/lib/nginx/modules","conf-path=/etc/nginx/nginx.conf","error-log-path=/var/log/nginx/error.log","http-log-path=/var/log/nginx/access.log","pid-path=/var/run/nginx.pid","lock-path=/var/run/nginx.lock","http-client-body-temp-path=/var/cache/nginx/client_temp","http-proxy-temp-path=/var/cache/nginx/proxy_temp","http-fastcgi-temp-path=/var/cache/nginx/fastcgi_temp","http-uwsgi-temp-path=/var/cache/nginx/uwsgi_temp","http-scgi-temp-path=/var/cache/nginx/scgi_temp","with-perl_modules_path=/usr/lib/perl5/vendor_perl","user=nginx","group=nginx","with-compat","with-file-aio","with-threads","with-http_addition_module","with-http_auth_request_module","with-http_dav_module","with-http_flv_module","with-http_gunzip_module","with-http_gzip_static_module","with-http_mp4_module","with-http_random_index_module","with-http_realip_module","with-http_secure_link_module","with-http_slice_module","with-http_ssl_module","with-http_stub_status_module","with-http_sub_module","with-http_v2_module","with-http_v3_module","with-mail","with-mail_ssl_module","with-stream","with-stream_realip_module","with-stream_ssl_module","with-stream_ssl_preread_module","with-cc-opt='-Os -fstack-clash-protection -Wformat -Werror=format-security -g'","with-ld-opt=-Wl,--as-needed,-O1,--sort-common"],"error_log_paths":null}] +``` +
## Build the NGINX Agent images for specific OS targets @@ -207,17 +172,18 @@ docker run --name nginx-agent -d \ The NGINX Agent GitHub repo has a set of Make commands that you can use to build a container image for an specific operating system and version: -- `make official-oss-image` builds an image containing NGINX Agent and NGINX open source. -- `make official-plus-image` builds an image containing NGINX Agent and NGINX Plus. +- `make oss-image` builds an image containing NGINX Agent and NGINX open source. +- `make image` builds an image containing NGINX Agent and NGINX Plus. You can pass the following arguments when running the **make** command to build an NGINX Agent container image. {{}} | Argument | Definition | | ---------------- | -------------------------| -| PACKAGE_NAME | **Required.** The full path to the downloaded [agent binary package](#agent-binary).
Must precede the **make** command. | | OS_RELEASE | The Linux distribution to use as the base image.
Can also be set in the repo Makefile.| | OS_VERSION | The version of the Linux distribution to use as the base image.
Can also be set in the repo Makefile.| +| AGENT_VERSION | The versions of NGINX agent that you want installed on the image.| + {{
}} Refer to the [Supported distributions]({{< relref "/technical-specifications.md#supported-distributions" >}}) table to find out which base images you can use. @@ -230,75 +196,18 @@ Keep the following information in mind when using the NGINX Agent [Dockerfiles]( ### Build NGINX open source images -Run the following **make** command to build the default image, which uses Alpine 3.19 as the base image. - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] make official-oss-image -``` - -### Example build commands by distribution - -{{}} - -{{%tab name="alma linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=almalinux make oss-image -``` - -{{% /tab %}} - -{{%tab name="alpine linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=alpine make oss-image -``` - -{{% /tab %}} - -{{%tab name="amazon linux"%}} +Run the following `make` command to build the default image, which uses Alpine as the base image: ```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=amazonlinux make oss-image +IMAGE_BUILD_TARGET=install-agent-repo make oss-image ``` -{{% /tab %}} - -{{%tab name="debian"%}} +To build an image with Debian and an older version of NGINX Agent you can run the following command: ```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=debian make oss-image +IMAGE_BUILD_TARGET=install-agent-repo NGINX_AGENT_VERSION=2.37.0~bullseye OS_RELEASE=debian OS_VERSION=bullseye-slim make oss-image ``` -{{% /tab %}} - -{{%tab name="oracle linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=oraclelinux make oss-image -``` - -{{% /tab %}} - -{{%tab name="rocky linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=rockylinux make oss-image -``` - -{{% /tab %}} - -{{%tab name="ubuntu"%}} - -The command below creates a base image using the most recent LTS version of Ubuntu as the base image: - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] make oss-image OS_RELEASE=ubuntu -``` - -{{% /tab %}} - -{{% /tabs %}} ### Build NGINX Plus images @@ -307,75 +216,13 @@ PACKAGE_NAME=[PATH-TO-PACKAGE] make oss-image OS_RELEASE=ubuntu Run the following `make` command to build the default image, which uses Ubuntu 24.04 (Noble) as the base image. ```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] make official-plus-image -``` - -### Example NGINX Plus build commands by distribution - -{{}} - -{{%tab name="alpine linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=alpine make official-plus-image -``` - -{{% /tab %}} - -{{%tab name="amazon linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=amazonlinux make official-plus-image -``` - -{{% /tab %}} - -{{%tab name="centos"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=centos OS_VERSION=7 make official-plus-image -``` - -{{% /tab %}} - -{{%tab name="debian"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=debian OS_VERSION=bullseye-slim make official-plus-image -``` - -{{% /tab %}} - -{{%tab name="oracle linux"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=oraclelinux make official-plus-image +IMAGE_BUILD_TARGET=install-agent-repo make image ``` -{{% /tab %}} - -{{%tab name="rhel"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=redhatenterprise make official-plus-image -``` - -{{% /tab %}} - -{{%tab name="suse"%}} +To build an image with Debian and an older version of NGINX Agent you can run the following command: ```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=suse OS_VERSION=sle15 make official-plus-image +IMAGE_BUILD_TARGET=install-agent-repo NGINX_AGENT_VERSION=2.37.0~bullseye OS_RELEASE=debian OS_VERSION=bullseye-slim make image ``` -{{% /tab %}} - -{{%tab name="ubuntu"%}} - -```shell -PACKAGE_NAME=[PATH-TO-PACKAGE] OS_RELEASE=ubuntu make official-plus-image -``` - -{{% /tab %}} -{{% /tabs %}} diff --git a/src/core/config/config.go b/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/src/core/config/config_test.go b/src/core/config/config_test.go index 9d5767cfe8..7ffcea761e 100644 --- a/src/core/config/config_test.go +++ b/src/core/config/config_test.go @@ -163,7 +163,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, []string{}, config.Tags) assert.Equal(t, Defaults.Features, config.Features) - assert.Equal(t, []string{}, config.Extensions) + assert.Equal(t, []string(nil), config.Extensions) }) t.Run("test override defaults with flags", func(t *testing.T) { @@ -274,7 +274,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, Defaults.AgentMetrics.Mode, config.AgentMetrics.Mode) assert.Equal(t, 10*time.Minute, config.AgentMetrics.Backoff.MaxInterval) assert.Equal(t, Defaults.Features, config.Features) - assert.Equal(t, []string{}, config.Extensions) + assert.Equal(t, []string(nil), config.Extensions) }) t.Run("test override config values with ENV variables", func(t *testing.T) { diff --git a/src/core/environment.go b/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/src/core/environment.go +++ b/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/src/core/metrics/sources/disk.go b/src/core/metrics/sources/disk.go index 98a3847a2d..3bf7eafb5c 100644 --- a/src/core/metrics/sources/disk.go +++ b/src/core/metrics/sources/disk.go @@ -14,7 +14,6 @@ import ( "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - log "github.com/sirupsen/logrus" ) const MOUNT_POINT = "mount_point" @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper "in_use": float64(usage.UsedPercentage), }) - log.Debugf("disk metrics collected: %v", len(simpleMetrics)) - select { case <-ctx.Done(): return diff --git a/src/core/metrics/sources/net_io.go b/src/core/metrics/sources/net_io.go index e3ef91cd83..13fcc2f24b 100644 --- a/src/core/metrics/sources/net_io.go +++ b/src/core/metrics/sources/net_io.go @@ -12,11 +12,12 @@ import ( "fmt" "sync" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" "github.com/shirou/gopsutil/v3/net" - log "github.com/sirupsen/logrus" ) const NETWORK_INTERFACE = "network_interface" @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap } simpleMetrics := nio.convertSamplesToSimpleMetrics(v) - log.Debugf("net IO stats count: %d", len(simpleMetrics)) select { case <-ctx.Done(): @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats) m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM) + log.Debugf("net IO stats: %v", currentNetIOStats) nio.netIOStats = currentNetIOStats } diff --git a/src/core/metrics/sources/nginx_access_log.go b/src/core/metrics/sources/nginx_access_log.go index 46c50b5830..b52a758428 100644 --- a/src/core/metrics/sources/nginx_access_log.go +++ b/src/core/metrics/sources/nginx_access_log.go @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() { fn() delete(c.logs, f) } - log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId) } func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string mu.Unlock() case <-tick.C: + mu.Lock() + c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() - if len(requestLengths) > 0 { httpCounters["request.length"] = getAverageMetricValue(requestLengths) } diff --git a/src/core/metrics/sources/nginx_error_log.go b/src/core/metrics/sources/nginx_error_log.go index 06368eb93a..52eb63ca4d 100644 --- a/src/core/metrics/sources/nginx_error_log.go +++ b/src/core/metrics/sources/nginx_error_log.go @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() { func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.mu.Lock() - defer c.mu.Unlock() c.baseDimensions = dimensions @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met // add, remove or update existing log trailers c.syncLogs() } + c.mu.Unlock() } func (c *NginxErrorLog) recreateLogs() { @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel delete(c.logFormats, logFile) } -func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { +func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) { c.mu.Lock() defer c.mu.Unlock() @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) { mu.Unlock() case <-tick.C: + mu.Lock() c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() simpleMetrics := c.convertSamplesToSimpleMetrics(counters) log.Tracef("Error log metrics collected: %v", simpleMetrics) diff --git a/src/core/nginx.go b/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/src/core/pipe.go b/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/src/plugins/commander.go b/src/plugins/commander.go index be11792834..582796de8d 100644 --- a/src/plugins/commander.go +++ b/src/plugins/commander.go @@ -102,8 +102,6 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { func (c *Commander) agentRegistered(cmd *proto.Command) { switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: - log.Infof("config command %v", commandData) - if agtCfg := commandData.AgentConnectResponse.AgentConfig; agtCfg != nil && agtCfg.Configs != nil && len(agtCfg.Configs.Configs) > 0 { for _, config := range agtCfg.Configs.Configs { diff --git a/src/plugins/config_reader.go b/src/plugins/config_reader.go index eb9606eac9..978f01b7f6 100644 --- a/src/plugins/config_reader.go +++ b/src/plugins/config_reader.go @@ -27,6 +27,7 @@ type ConfigReader struct { messagePipeline core.MessagePipeInterface config *config.Config mu sync.RWMutex + detailsMu sync.RWMutex } func NewConfigReader(config *config.Config) *ConfigReader { @@ -85,7 +86,10 @@ func (r *ConfigReader) Subscriptions() []string { } func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) { - if payloadAgentConfig != nil && payloadAgentConfig.Details != nil { + r.mu.Lock() + defer r.mu.Unlock() + + if payloadAgentConfig.Details != nil { onDiskAgentConfig, err := config.GetConfig(r.config.ClientID) if err != nil { log.Errorf("Failed to update Agent config - %v", err) @@ -96,21 +100,29 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags := false if payloadAgentConfig.Details.Features != nil { - + r.detailsMu.Lock() for index, feature := range payloadAgentConfig.Details.Features { payloadAgentConfig.Details.Features[index] = strings.Replace(feature, "features_", "", 1) } sort.Strings(onDiskAgentConfig.Features) sort.Strings(payloadAgentConfig.Details.Features) + r.detailsMu.Unlock() + + r.detailsMu.RLock() synchronizeFeatures = !reflect.DeepEqual(payloadAgentConfig.Details.Features, onDiskAgentConfig.Features) + r.detailsMu.RUnlock() } else { + r.detailsMu.Lock() payloadAgentConfig.Details.Features = onDiskAgentConfig.Features + r.detailsMu.Unlock() } if payloadAgentConfig.Details.Tags == nil { + r.detailsMu.Lock() payloadAgentConfig.Details.Tags = []string{} + r.detailsMu.Unlock() } sort.Strings(onDiskAgentConfig.Tags) @@ -118,7 +130,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags = !reflect.DeepEqual(payloadAgentConfig.Details.Tags, onDiskAgentConfig.Tags) if synchronizeFeatures || synchronizeTags { - configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, payloadAgentConfig.Details.Tags, payloadAgentConfig.Details.Features) + tags := payloadAgentConfig.Details.Tags + features := payloadAgentConfig.Details.Features + configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, tags, features) if err != nil { log.Errorf("Failed updating Agent config - %v", err) } @@ -142,25 +156,22 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) } r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig)) - } } func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) { if r.config != nil { + r.detailsMu.RLock() for _, feature := range r.config.Features { if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync { - r.mu.Lock() r.deRegisterPlugin(feature) - r.mu.Unlock() } } + r.detailsMu.RUnlock() } if agtCfg.Details != nil { - r.mu.Lock() r.messagePipeline.Process(core.NewMessage(core.EnableFeature, agtCfg.Details.Features)) - r.mu.Unlock() } } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 6b6014a5ff..99f1ffac72 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -42,6 +40,7 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + structMu sync.RWMutex processes []*core.Process } @@ -67,7 +66,6 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, - softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), processes: processes, @@ -84,11 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -103,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -112,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -121,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -132,7 +125,9 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } case msg.Exact(core.NginxDetailProcUpdate): + dps.structMu.Lock() dps.processes = msg.Data().([]*core.Process) + dps.structMu.Unlock() } } @@ -193,15 +188,12 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -214,6 +206,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil @@ -227,7 +221,6 @@ func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -246,7 +239,9 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo return nil } + dps.structMu.Lock() dps.lastSendDetails = nowUTC + dps.structMu.Unlock() return details } @@ -255,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -277,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -301,20 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) } // Update DataPlaneStatus with relevant config info + dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + + dps.structMu.Unlock() } diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 55238474c4..97b0de01f1 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -71,7 +71,9 @@ func (m *Metrics) Init(pipeline core.MessagePipeInterface) { } func (m *Metrics) Close() { + m.collectorsMutex.Lock() m.collectors = nil + m.collectorsMutex.Unlock() log.Info("Metrics is wrapping up") } @@ -336,6 +338,8 @@ func createCollectorConfigsMap(config *config.Config, binary core.NginxBinary, p } func (m *Metrics) updateCollectorsConfig() { + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() log.Trace("Updating collector config") for _, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 249ff15ef2..91983146b2 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -106,7 +106,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } collection := metrics.SaveCollections(*r.metricsCollections[report.Type], report) r.metricsCollections[report.Type] = &collection - log.Debugf("MetricsThrottle: Metrics collection saved [Type: %d]", report.Type) } } r.mu.Unlock() diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go index 98a3847a2d..3bf7eafb5c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go @@ -14,7 +14,6 @@ import ( "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - log "github.com/sirupsen/logrus" ) const MOUNT_POINT = "mount_point" @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper "in_use": float64(usage.UsedPercentage), }) - log.Debugf("disk metrics collected: %v", len(simpleMetrics)) - select { case <-ctx.Done(): return diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go index e3ef91cd83..13fcc2f24b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go @@ -12,11 +12,12 @@ import ( "fmt" "sync" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" "github.com/shirou/gopsutil/v3/net" - log "github.com/sirupsen/logrus" ) const NETWORK_INTERFACE = "network_interface" @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap } simpleMetrics := nio.convertSamplesToSimpleMetrics(v) - log.Debugf("net IO stats count: %d", len(simpleMetrics)) select { case <-ctx.Done(): @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats) m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM) + log.Debugf("net IO stats: %v", currentNetIOStats) nio.netIOStats = currentNetIOStats } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go index 46c50b5830..b52a758428 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() { fn() delete(c.logs, f) } - log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId) } func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string mu.Unlock() case <-tick.C: + mu.Lock() + c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() - if len(requestLengths) > 0 { httpCounters["request.length"] = getAverageMetricValue(requestLengths) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go index 06368eb93a..52eb63ca4d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() { func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.mu.Lock() - defer c.mu.Unlock() c.baseDimensions = dimensions @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met // add, remove or update existing log trailers c.syncLogs() } + c.mu.Unlock() } func (c *NginxErrorLog) recreateLogs() { @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel delete(c.logFormats, logFile) } -func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { +func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) { c.mu.Lock() defer c.mu.Unlock() @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) { mu.Unlock() case <-tick.C: + mu.Lock() c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() simpleMetrics := c.convertSamplesToSimpleMetrics(counters) log.Tracef("Error log metrics collected: %v", simpleMetrics) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go index be11792834..582796de8d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go @@ -102,8 +102,6 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { func (c *Commander) agentRegistered(cmd *proto.Command) { switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: - log.Infof("config command %v", commandData) - if agtCfg := commandData.AgentConnectResponse.AgentConfig; agtCfg != nil && agtCfg.Configs != nil && len(agtCfg.Configs.Configs) > 0 { for _, config := range agtCfg.Configs.Configs { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go index eb9606eac9..978f01b7f6 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go @@ -27,6 +27,7 @@ type ConfigReader struct { messagePipeline core.MessagePipeInterface config *config.Config mu sync.RWMutex + detailsMu sync.RWMutex } func NewConfigReader(config *config.Config) *ConfigReader { @@ -85,7 +86,10 @@ func (r *ConfigReader) Subscriptions() []string { } func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) { - if payloadAgentConfig != nil && payloadAgentConfig.Details != nil { + r.mu.Lock() + defer r.mu.Unlock() + + if payloadAgentConfig.Details != nil { onDiskAgentConfig, err := config.GetConfig(r.config.ClientID) if err != nil { log.Errorf("Failed to update Agent config - %v", err) @@ -96,21 +100,29 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags := false if payloadAgentConfig.Details.Features != nil { - + r.detailsMu.Lock() for index, feature := range payloadAgentConfig.Details.Features { payloadAgentConfig.Details.Features[index] = strings.Replace(feature, "features_", "", 1) } sort.Strings(onDiskAgentConfig.Features) sort.Strings(payloadAgentConfig.Details.Features) + r.detailsMu.Unlock() + + r.detailsMu.RLock() synchronizeFeatures = !reflect.DeepEqual(payloadAgentConfig.Details.Features, onDiskAgentConfig.Features) + r.detailsMu.RUnlock() } else { + r.detailsMu.Lock() payloadAgentConfig.Details.Features = onDiskAgentConfig.Features + r.detailsMu.Unlock() } if payloadAgentConfig.Details.Tags == nil { + r.detailsMu.Lock() payloadAgentConfig.Details.Tags = []string{} + r.detailsMu.Unlock() } sort.Strings(onDiskAgentConfig.Tags) @@ -118,7 +130,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags = !reflect.DeepEqual(payloadAgentConfig.Details.Tags, onDiskAgentConfig.Tags) if synchronizeFeatures || synchronizeTags { - configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, payloadAgentConfig.Details.Tags, payloadAgentConfig.Details.Features) + tags := payloadAgentConfig.Details.Tags + features := payloadAgentConfig.Details.Features + configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, tags, features) if err != nil { log.Errorf("Failed updating Agent config - %v", err) } @@ -142,25 +156,22 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) } r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig)) - } } func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) { if r.config != nil { + r.detailsMu.RLock() for _, feature := range r.config.Features { if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync { - r.mu.Lock() r.deRegisterPlugin(feature) - r.mu.Unlock() } } + r.detailsMu.RUnlock() } if agtCfg.Details != nil { - r.mu.Lock() r.messagePipeline.Process(core.NewMessage(core.EnableFeature, agtCfg.Details.Features)) - r.mu.Unlock() } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 6b6014a5ff..99f1ffac72 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -42,6 +40,7 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + structMu sync.RWMutex processes []*core.Process } @@ -67,7 +66,6 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, - softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), processes: processes, @@ -84,11 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -103,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -112,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -121,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -132,7 +125,9 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } case msg.Exact(core.NginxDetailProcUpdate): + dps.structMu.Lock() dps.processes = msg.Data().([]*core.Process) + dps.structMu.Unlock() } } @@ -193,15 +188,12 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -214,6 +206,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil @@ -227,7 +221,6 @@ func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -246,7 +239,9 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo return nil } + dps.structMu.Lock() dps.lastSendDetails = nowUTC + dps.structMu.Unlock() return details } @@ -255,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -277,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -301,20 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) } // Update DataPlaneStatus with relevant config info + dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + + dps.structMu.Unlock() } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 55238474c4..97b0de01f1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -71,7 +71,9 @@ func (m *Metrics) Init(pipeline core.MessagePipeInterface) { } func (m *Metrics) Close() { + m.collectorsMutex.Lock() m.collectors = nil + m.collectorsMutex.Unlock() log.Info("Metrics is wrapping up") } @@ -336,6 +338,8 @@ func createCollectorConfigsMap(config *config.Config, binary core.NginxBinary, p } func (m *Metrics) updateCollectorsConfig() { + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() log.Trace("Updating collector config") for _, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 249ff15ef2..91983146b2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -106,7 +106,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } collection := metrics.SaveCollections(*r.metricsCollections[report.Type], report) r.metricsCollections[report.Type] = &collection - log.Debugf("MetricsThrottle: Metrics collection saved [Type: %d]", report.Type) } } r.mu.Unlock() diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/test/utils/nginx.go b/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/utils/nginx.go +++ b/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil }