From 082e81b5906449ee7a1d6dc025bb512b77e6f580 Mon Sep 17 00:00:00 2001 From: oliveromahony Date: Mon, 17 Apr 2023 18:11:07 +0100 Subject: [PATCH 1/3] remove pids draining --- src/plugins/nginx.go | 72 ++------------------- src/plugins/nginx_test.go | 133 -------------------------------------- 2 files changed, 5 insertions(+), 200 deletions(-) diff --git a/src/plugins/nginx.go b/src/plugins/nginx.go index 6a1285d6a..96475c7ff 100644 --- a/src/plugins/nginx.go +++ b/src/plugins/nginx.go @@ -568,28 +568,18 @@ func (n *Nginx) monitor(processInfo []core.Process) error { errorLogs := n.nginxBinary.GetErrorLogs() logErrorChannel := make(chan string, len(errorLogs)) - pidsChannel := make(chan string) defer close(logErrorChannel) - defer close(pidsChannel) go n.monitorLogs(errorLogs, logErrorChannel) - go n.monitorPids(processInfo, pidsChannel) - // Expect to receive one message from the pidsChannel and a message for each NGINX error log file in the logErrorChannel + // Expect to receive one message from a message for each NGINX error log file in the logErrorChannel numberOfExpectedMessages := 1 + len(errorLogs) for i := 0; i < numberOfExpectedMessages; i++ { - select { - case err := <-pidsChannel: - log.Tracef("message received in pidsChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } - case err := <-logErrorChannel: - log.Tracef("message received in logErrorChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } + err := <-logErrorChannel + log.Tracef("message received in logErrorChannel: %s", err) + if err != "" { + errorsFound = append(errorsFound, err) } } @@ -613,58 +603,6 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin } } -func (n *Nginx) monitorPids(processInfo []core.Process, errorChannel chan string) { - ticker := time.NewTicker(500 * time.Millisecond) - startingPids := parseIntList(processInfo) - // wait 200 milliseconds for process information to change - time.Sleep(200 * time.Millisecond) - timeout := time.After(n.config.Nginx.ConfigReloadMonitoringPeriod) - - for { - select { - case <-timeout: - log.Trace("Timed out monitoring PIDs") - ticker.Stop() - errorChannel <- "Timed out" - return - case tick := <-ticker.C: - log.Tracef("Monitoring Pids %v", tick) - currentList := parseIntList(n.getNginxProccessInfo()) - difference := intersection(startingPids, currentList) - // if there is one pid leftover, that's ok - if len(difference) <= 1 { - errorChannel <- "" - log.Tracef("Success monitoring PIDs") - ticker.Stop() - return - } - } - } -} - -func intersection(list1 []int, list2 []int) []int { - m := make(map[int]bool) - intersection := []int{} - - for _, item := range list1 { - m[item] = true - } - for _, item := range list2 { - if m[item] { - intersection = append(intersection, item) - } - } - return intersection -} - -func parseIntList(data []core.Process) []int { - result := make([]int, len(data)) - for index, process := range data { - result[index] = int(process.Pid) - } - return result -} - func (n *Nginx) tailLog(logFile string, errorChannel chan string) { t, err := tailer.NewTailer(logFile) if err != nil { diff --git a/src/plugins/nginx_test.go b/src/plugins/nginx_test.go index ab93f783a..c6136f900 100644 --- a/src/plugins/nginx_test.go +++ b/src/plugins/nginx_test.go @@ -1086,14 +1086,6 @@ func TestNginx_monitor(t *testing.T) { time.Sleep(1 * time.Second) - pluginUnderTest.syncProcessInfo([]core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false}, - {Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false}, - }) - - time.Sleep(1 * time.Second) - errorsChannel := make(chan error, 1) // Validate that errors in the logs returned @@ -1101,15 +1093,8 @@ func TestNginx_monitor(t *testing.T) { errorFound := pluginUnderTest.monitor(pluginUnderTest.getNginxProccessInfo()) errorsChannel <- errorFound }() - time.Sleep(1 * time.Second) - pluginUnderTest.syncProcessInfo([]core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 6, ParentPid: 1, Name: "worker-6", IsMaster: false}, - {Pid: 7, ParentPid: 1, Name: "worker-7", IsMaster: false}, - }) - _, err = errorLogFile.WriteString("2023/03/14 14:16:23 [emerg] 3871#3871: bind() to 0.0.0.0:8081 failed (98: Address already in use)") require.NoError(t, err, "Error writing data to error log file") @@ -1172,121 +1157,3 @@ func TestNginx_monitorLog(t *testing.T) { } } } - -func TestNginx_monitorPids(t *testing.T) { - tests := []struct { - name string - startingProcesses []core.Process - updatedProcesses []core.Process - errorStr string - expectedTestTimeout bool - ticker time.Duration - monitoringPeriod time.Duration - }{ - { - name: "postive case", - startingProcesses: tutils.GetProcesses(), - updatedProcesses: []core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false}, - {Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false}, - }, - errorStr: "", - expectedTestTimeout: false, - ticker: 1 * time.Second, - monitoringPeriod: 2 * time.Second, - }, - { - name: "timeout case", - startingProcesses: tutils.GetProcesses(), - updatedProcesses: []core.Process{ - {Pid: 1, Name: "12345", IsMaster: true}, - {Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false}, - {Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false}, - }, - errorStr: "", - expectedTestTimeout: true, - ticker: 2 * time.Second, - monitoringPeriod: 2 * time.Second, - }, - { - name: "ticker timeout case", - startingProcesses: tutils.GetProcesses(), - updatedProcesses: tutils.GetProcesses(), - errorStr: "Timed out", - expectedTestTimeout: false, - ticker: 2 * time.Second, - monitoringPeriod: 100 * time.Millisecond, - }, - } - - for _, test := range tests { - t.Logf("Running test %s", test.name) - errorChannel := make(chan string, 1) - env := tutils.GetMockEnvWithProcess() - config := tutils.GetMockAgentConfig() - config.Nginx.ConfigReloadMonitoringPeriod = test.monitoringPeriod - pluginUnderTest := NewNginx(tutils.GetMockCommandClient(&proto.NginxConfig{}), tutils.NewMockNginxBinary(), env, config) - - go pluginUnderTest.monitorPids(test.startingProcesses, errorChannel) - pluginUnderTest.syncProcessInfo(test.updatedProcesses) - - select { - case err := <-errorChannel: - // Check that the function completed with expected errors - assert.Equal(t, test.errorStr, err) - case <-time.After(test.ticker): - // Timeout if the function takes too long - // t.Error("Timed out waiting for function to complete") - assert.True(t, test.expectedTestTimeout) - } - } -} - -func TestParseIntList(t *testing.T) { - processes := tutils.GetProcesses() - - pidList := parseIntList(processes) - - expectedPids := []int{1, 2, 3} - for i, expected := range expectedPids { - if pidList[i] != expected { - t.Errorf("parseIntList returned value %d at index %d, expected %d", pidList[i], i, expected) - } - } -} - -func TestIntersection(t *testing.T) { - tests := []struct { - list1 []int - list2 []int - expected []int - }{ - { - list1: []int{}, - list2: []int{}, - expected: []int{}, - }, - { - list1: []int{1, 2, 3}, - list2: []int{4, 5, 6}, - expected: []int{}, - }, - { - list1: []int{1, 2, 3}, - list2: []int{}, - expected: []int{}, - }, - { - list1: []int{1, 2, 3}, - list2: []int{2, 3, 4, 5}, - expected: []int{2, 3}, - }, - } - - for _, tt := range tests { - result := intersection(tt.list1, tt.list2) - // Check that the output matches the expected result - assert.Equal(t, tt.expected, result, "Intersection should match expected result") - } -} From 131700a30a407a204126ef9594cb460bbf96a131 Mon Sep 17 00:00:00 2001 From: oliveromahony Date: Tue, 18 Apr 2023 10:14:35 +0100 Subject: [PATCH 2/3] fixed tests --- src/plugins/nginx.go | 2 +- .../nginx/agent/v2/src/plugins/nginx.go | 74 ++----------------- 2 files changed, 7 insertions(+), 69 deletions(-) diff --git a/src/plugins/nginx.go b/src/plugins/nginx.go index 96475c7ff..8b921a984 100644 --- a/src/plugins/nginx.go +++ b/src/plugins/nginx.go @@ -573,7 +573,7 @@ func (n *Nginx) monitor(processInfo []core.Process) error { go n.monitorLogs(errorLogs, logErrorChannel) // Expect to receive one message from a message for each NGINX error log file in the logErrorChannel - numberOfExpectedMessages := 1 + len(errorLogs) + numberOfExpectedMessages := len(errorLogs) for i := 0; i < numberOfExpectedMessages; i++ { err := <-logErrorChannel diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go index 6a1285d6a..8b921a984 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go @@ -568,28 +568,18 @@ func (n *Nginx) monitor(processInfo []core.Process) error { errorLogs := n.nginxBinary.GetErrorLogs() logErrorChannel := make(chan string, len(errorLogs)) - pidsChannel := make(chan string) defer close(logErrorChannel) - defer close(pidsChannel) go n.monitorLogs(errorLogs, logErrorChannel) - go n.monitorPids(processInfo, pidsChannel) - // Expect to receive one message from the pidsChannel and a message for each NGINX error log file in the logErrorChannel - numberOfExpectedMessages := 1 + len(errorLogs) + // Expect to receive one message from a message for each NGINX error log file in the logErrorChannel + numberOfExpectedMessages := len(errorLogs) for i := 0; i < numberOfExpectedMessages; i++ { - select { - case err := <-pidsChannel: - log.Tracef("message received in pidsChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } - case err := <-logErrorChannel: - log.Tracef("message received in logErrorChannel: %s", err) - if err != "" { - errorsFound = append(errorsFound, err) - } + err := <-logErrorChannel + log.Tracef("message received in logErrorChannel: %s", err) + if err != "" { + errorsFound = append(errorsFound, err) } } @@ -613,58 +603,6 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin } } -func (n *Nginx) monitorPids(processInfo []core.Process, errorChannel chan string) { - ticker := time.NewTicker(500 * time.Millisecond) - startingPids := parseIntList(processInfo) - // wait 200 milliseconds for process information to change - time.Sleep(200 * time.Millisecond) - timeout := time.After(n.config.Nginx.ConfigReloadMonitoringPeriod) - - for { - select { - case <-timeout: - log.Trace("Timed out monitoring PIDs") - ticker.Stop() - errorChannel <- "Timed out" - return - case tick := <-ticker.C: - log.Tracef("Monitoring Pids %v", tick) - currentList := parseIntList(n.getNginxProccessInfo()) - difference := intersection(startingPids, currentList) - // if there is one pid leftover, that's ok - if len(difference) <= 1 { - errorChannel <- "" - log.Tracef("Success monitoring PIDs") - ticker.Stop() - return - } - } - } -} - -func intersection(list1 []int, list2 []int) []int { - m := make(map[int]bool) - intersection := []int{} - - for _, item := range list1 { - m[item] = true - } - for _, item := range list2 { - if m[item] { - intersection = append(intersection, item) - } - } - return intersection -} - -func parseIntList(data []core.Process) []int { - result := make([]int, len(data)) - for index, process := range data { - result[index] = int(process.Pid) - } - return result -} - func (n *Nginx) tailLog(logFile string, errorChannel chan string) { t, err := tailer.NewTailer(logFile) if err != nil { From a4949126d8d2c96130994c4d24a28d19ab673a11 Mon Sep 17 00:00:00 2001 From: dhurley Date: Tue, 18 Apr 2023 11:51:24 +0100 Subject: [PATCH 3/3] Fix monitoring of error logs after a NGINX reload --- src/plugins/nginx.go | 3 ++- .../vendor/github.com/nginx/agent/v2/src/plugins/nginx.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/plugins/nginx.go b/src/plugins/nginx.go index 8b921a984..7b442a3cf 100644 --- a/src/plugins/nginx.go +++ b/src/plugins/nginx.go @@ -598,7 +598,7 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin return } - for _, logFile := range errorLogs { + for logFile := range errorLogs { go n.tailLog(logFile, errorChannel) } } @@ -631,6 +631,7 @@ func (n *Nginx) tailLog(logFile string, errorChannel chan string) { } } case <-tick.C: + errorChannel <- "" return } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go index 8b921a984..7b442a3cf 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go @@ -598,7 +598,7 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin return } - for _, logFile := range errorLogs { + for logFile := range errorLogs { go n.tailLog(logFile, errorChannel) } } @@ -631,6 +631,7 @@ func (n *Nginx) tailLog(logFile string, errorChannel chan string) { } } case <-tick.C: + errorChannel <- "" return } }