diff --git a/src/plugins/nginx.go b/src/plugins/nginx.go index 6a1285d6a..7b442a3cf 100644 --- a/src/plugins/nginx.go +++ b/src/plugins/nginx.go @@ -568,28 +568,18 @@ func (n *Nginx) monitor(processInfo []core.Process) error { errorLogs := n.nginxBinary.GetErrorLogs() logErrorChannel := make(chan string, len(errorLogs)) - pidsChannel := make(chan string) defer close(logErrorChannel) - defer close(pidsChannel) go n.monitorLogs(errorLogs, logErrorChannel) - go n.monitorPids(processInfo, pidsChannel) - // Expect to receive one message from the pidsChannel and a message for each NGINX error log file in the logErrorChannel - numberOfExpectedMessages := 1 + len(errorLogs) + // Expect to receive one message from a message for each NGINX error log file in the logErrorChannel + numberOfExpectedMessages := len(errorLogs) for i := 0; i < numberOfExpectedMessages; i++ { - select { - case err := <-pidsChannel: - log.Tracef("message received in pidsChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } - case err := <-logErrorChannel: - log.Tracef("message received in logErrorChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } + err := <-logErrorChannel + log.Tracef("message received in logErrorChannel: %s", err) + if err != "" { + errorsFound = append(errorsFound, err) } } @@ -608,63 +598,11 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin return } - for _, logFile := range errorLogs { + for logFile := range errorLogs { go n.tailLog(logFile, errorChannel) } } -func (n *Nginx) monitorPids(processInfo []core.Process, errorChannel chan string) { - ticker := time.NewTicker(500 * time.Millisecond) - startingPids := parseIntList(processInfo) - // wait 200 milliseconds for process information to change - time.Sleep(200 * time.Millisecond) - timeout := time.After(n.config.Nginx.ConfigReloadMonitoringPeriod) - - for { - select { - case <-timeout: - log.Trace("Timed out monitoring PIDs") - ticker.Stop() - errorChannel <- "Timed out" - return - case tick := <-ticker.C: - log.Tracef("Monitoring Pids %v", tick) - currentList := parseIntList(n.getNginxProccessInfo()) - difference := intersection(startingPids, currentList) - // if there is one pid leftover, that's ok - if len(difference) <= 1 { - errorChannel <- "" - log.Tracef("Success monitoring PIDs") - ticker.Stop() - return - } - } - } -} - -func intersection(list1 []int, list2 []int) []int { - m := make(map[int]bool) - intersection := []int{} - - for _, item := range list1 { - m[item] = true - } - for _, item := range list2 { - if m[item] { - intersection = append(intersection, item) - } - } - return intersection -} - -func parseIntList(data []core.Process) []int { - result := make([]int, len(data)) - for index, process := range data { - result[index] = int(process.Pid) - } - return result -} - func (n *Nginx) tailLog(logFile string, errorChannel chan string) { t, err := tailer.NewTailer(logFile) if err != nil { @@ -693,6 +631,7 @@ func (n *Nginx) tailLog(logFile string, errorChannel chan string) { } } case <-tick.C: + errorChannel <- "" return } } diff --git a/src/plugins/nginx_test.go b/src/plugins/nginx_test.go index ab93f783a..c6136f900 100644 --- a/src/plugins/nginx_test.go +++ b/src/plugins/nginx_test.go @@ -1086,14 +1086,6 @@ func TestNginx_monitor(t *testing.T) { time.Sleep(1 * time.Second) - pluginUnderTest.syncProcessInfo([]core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false}, - {Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false}, - }) - - time.Sleep(1 * time.Second) - errorsChannel := make(chan error, 1) // Validate that errors in the logs returned @@ -1101,15 +1093,8 @@ func TestNginx_monitor(t *testing.T) { errorFound := pluginUnderTest.monitor(pluginUnderTest.getNginxProccessInfo()) errorsChannel <- errorFound }() - time.Sleep(1 * time.Second) - pluginUnderTest.syncProcessInfo([]core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 6, ParentPid: 1, Name: "worker-6", IsMaster: false}, - {Pid: 7, ParentPid: 1, Name: "worker-7", IsMaster: false}, - }) - _, err = errorLogFile.WriteString("2023/03/14 14:16:23 [emerg] 3871#3871: bind() to 0.0.0.0:8081 failed (98: Address already in use)") require.NoError(t, err, "Error writing data to error log file") @@ -1172,121 +1157,3 @@ func TestNginx_monitorLog(t *testing.T) { } } } - -func TestNginx_monitorPids(t *testing.T) { - tests := []struct { - name string - startingProcesses []core.Process - updatedProcesses []core.Process - errorStr string - expectedTestTimeout bool - ticker time.Duration - monitoringPeriod time.Duration - }{ - { - name: "postive case", - startingProcesses: tutils.GetProcesses(), - updatedProcesses: []core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false}, - {Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false}, - }, - errorStr: "", - expectedTestTimeout: false, - ticker: 1 * time.Second, - monitoringPeriod: 2 * time.Second, - }, - { - name: "timeout case", - startingProcesses: tutils.GetProcesses(), - updatedProcesses: []core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false}, - {Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false}, - }, - errorStr: "", - expectedTestTimeout: true, - ticker: 2 * time.Second, - monitoringPeriod: 2 * time.Second, - }, - { - name: "ticker timeout case", - startingProcesses: tutils.GetProcesses(), - updatedProcesses: tutils.GetProcesses(), - errorStr: "Timed out", - expectedTestTimeout: false, - ticker: 2 * time.Second, - monitoringPeriod: 100 * time.Millisecond, - }, - } - - for _, test := range tests { - t.Logf("Running test %s", test.name) - errorChannel := make(chan string, 1) - env := tutils.GetMockEnvWithProcess() - config := tutils.GetMockAgentConfig() - config.Nginx.ConfigReloadMonitoringPeriod = test.monitoringPeriod - pluginUnderTest := NewNginx(tutils.GetMockCommandClient(&proto.NginxConfig{}), tutils.NewMockNginxBinary(), env, config) - - go pluginUnderTest.monitorPids(test.startingProcesses, errorChannel) - pluginUnderTest.syncProcessInfo(test.updatedProcesses) - - select { - case err := <-errorChannel: - // Check that the function completed with expected errors - assert.Equal(t, test.errorStr, err) - case <-time.After(test.ticker): - // Timeout if the function takes too long - // t.Error("Timed out waiting for function to complete") - assert.True(t, test.expectedTestTimeout) - } - } -} - -func TestParseIntList(t *testing.T) { - processes := tutils.GetProcesses() - - pidList := parseIntList(processes) - - expectedPids := []int{1, 2, 3} - for i, expected := range expectedPids { - if pidList[i] != expected { - t.Errorf("parseIntList returned value %d at index %d, expected %d", pidList[i], i, expected) - } - } -} - -func TestIntersection(t *testing.T) { - tests := []struct { - list1 []int - list2 []int - expected []int - }{ - { - list1: []int{}, - list2: []int{}, - expected: []int{}, - }, - { - list1: []int{1, 2, 3}, - list2: []int{4, 5, 6}, - expected: []int{}, - }, - { - list1: []int{1, 2, 3}, - list2: []int{}, - expected: []int{}, - }, - { - list1: []int{1, 2, 3}, - list2: []int{2, 3, 4, 5}, - expected: []int{2, 3}, - }, - } - - for _, tt := range tests { - result := intersection(tt.list1, tt.list2) - // Check that the output matches the expected result - assert.Equal(t, tt.expected, result, "Intersection should match expected result") - } -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go index 6a1285d6a..7b442a3cf 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go @@ -568,28 +568,18 @@ func (n *Nginx) monitor(processInfo []core.Process) error { errorLogs := n.nginxBinary.GetErrorLogs() logErrorChannel := make(chan string, len(errorLogs)) - pidsChannel := make(chan string) defer close(logErrorChannel) - defer close(pidsChannel) go n.monitorLogs(errorLogs, logErrorChannel) - go n.monitorPids(processInfo, pidsChannel) - // Expect to receive one message from the pidsChannel and a message for each NGINX error log file in the logErrorChannel - numberOfExpectedMessages := 1 + len(errorLogs) + // Expect to receive one message from a message for each NGINX error log file in the logErrorChannel + numberOfExpectedMessages := len(errorLogs) for i := 0; i < numberOfExpectedMessages; i++ { - select { - case err := <-pidsChannel: - log.Tracef("message received in pidsChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } - case err := <-logErrorChannel: - log.Tracef("message received in logErrorChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } + err := <-logErrorChannel + log.Tracef("message received in logErrorChannel: %s", err) + if err != "" { + errorsFound = append(errorsFound, err) } } @@ -608,63 +598,11 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin return } - for _, logFile := range errorLogs { + for logFile := range errorLogs { go n.tailLog(logFile, errorChannel) } } -func (n *Nginx) monitorPids(processInfo []core.Process, errorChannel chan string) { - ticker := time.NewTicker(500 * time.Millisecond) - startingPids := parseIntList(processInfo) - // wait 200 milliseconds for process information to change - time.Sleep(200 * time.Millisecond) - timeout := time.After(n.config.Nginx.ConfigReloadMonitoringPeriod) - - for { - select { - case <-timeout: - log.Trace("Timed out monitoring PIDs") - ticker.Stop() - errorChannel <- "Timed out" - return - case tick := <-ticker.C: - log.Tracef("Monitoring Pids %v", tick) - currentList := parseIntList(n.getNginxProccessInfo()) - difference := intersection(startingPids, currentList) - // if there is one pid leftover, that's ok - if len(difference) <= 1 { - errorChannel <- "" - log.Tracef("Success monitoring PIDs") - ticker.Stop() - return - } - } - } -} - -func intersection(list1 []int, list2 []int) []int { - m := make(map[int]bool) - intersection := []int{} - - for _, item := range list1 { - m[item] = true - } - for _, item := range list2 { - if m[item] { - intersection = append(intersection, item) - } - } - return intersection -} - -func parseIntList(data []core.Process) []int { - result := make([]int, len(data)) - for index, process := range data { - result[index] = int(process.Pid) - } - return result -} - func (n *Nginx) tailLog(logFile string, errorChannel chan string) { t, err := tailer.NewTailer(logFile) if err != nil { @@ -693,6 +631,7 @@ func (n *Nginx) tailLog(logFile string, errorChannel chan string) { } } case <-tick.C: + errorChannel <- "" return } }