Skip to content

Commit

Permalink
remove pids draining (#292) (#294)
Browse files Browse the repository at this point in the history
* remove pids draining and fixed monitoring of error logs after a NGINX reload

---------

Co-authored-by: Oliver O'Mahony <[email protected]>
Co-authored-by: dhurley <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2023
1 parent 9d46505 commit f76f3fc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 271 deletions.
77 changes: 8 additions & 69 deletions src/plugins/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,28 +568,18 @@ func (n *Nginx) monitor(processInfo []core.Process) error {
errorLogs := n.nginxBinary.GetErrorLogs()

logErrorChannel := make(chan string, len(errorLogs))
pidsChannel := make(chan string)
defer close(logErrorChannel)
defer close(pidsChannel)

go n.monitorLogs(errorLogs, logErrorChannel)
go n.monitorPids(processInfo, pidsChannel)

// Expect to receive one message from the pidsChannel and a message for each NGINX error log file in the logErrorChannel
numberOfExpectedMessages := 1 + len(errorLogs)
// Expect to receive one message from a message for each NGINX error log file in the logErrorChannel
numberOfExpectedMessages := len(errorLogs)

for i := 0; i < numberOfExpectedMessages; i++ {
select {
case err := <-pidsChannel:
log.Tracef("message received in pidsChannel: %s", err)
if err != "" {
errorsFound = append(errorsFound, err)
}
case err := <-logErrorChannel:
log.Tracef("message received in logErrorChannel: %s", err)
if err != "" {
errorsFound = append(errorsFound, err)
}
err := <-logErrorChannel
log.Tracef("message received in logErrorChannel: %s", err)
if err != "" {
errorsFound = append(errorsFound, err)
}
}

Expand All @@ -608,63 +598,11 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin
return
}

for _, logFile := range errorLogs {
for logFile := range errorLogs {
go n.tailLog(logFile, errorChannel)
}
}

func (n *Nginx) monitorPids(processInfo []core.Process, errorChannel chan string) {
ticker := time.NewTicker(500 * time.Millisecond)
startingPids := parseIntList(processInfo)
// wait 200 milliseconds for process information to change
time.Sleep(200 * time.Millisecond)
timeout := time.After(n.config.Nginx.ConfigReloadMonitoringPeriod)

for {
select {
case <-timeout:
log.Trace("Timed out monitoring PIDs")
ticker.Stop()
errorChannel <- "Timed out"
return
case tick := <-ticker.C:
log.Tracef("Monitoring Pids %v", tick)
currentList := parseIntList(n.getNginxProccessInfo())
difference := intersection(startingPids, currentList)
// if there is one pid leftover, that's ok
if len(difference) <= 1 {
errorChannel <- ""
log.Tracef("Success monitoring PIDs")
ticker.Stop()
return
}
}
}
}

func intersection(list1 []int, list2 []int) []int {
m := make(map[int]bool)
intersection := []int{}

for _, item := range list1 {
m[item] = true
}
for _, item := range list2 {
if m[item] {
intersection = append(intersection, item)
}
}
return intersection
}

func parseIntList(data []core.Process) []int {
result := make([]int, len(data))
for index, process := range data {
result[index] = int(process.Pid)
}
return result
}

func (n *Nginx) tailLog(logFile string, errorChannel chan string) {
t, err := tailer.NewTailer(logFile)
if err != nil {
Expand Down Expand Up @@ -693,6 +631,7 @@ func (n *Nginx) tailLog(logFile string, errorChannel chan string) {
}
}
case <-tick.C:
errorChannel <- ""
return
}
}
Expand Down
133 changes: 0 additions & 133 deletions src/plugins/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,30 +1086,15 @@ func TestNginx_monitor(t *testing.T) {

time.Sleep(1 * time.Second)

pluginUnderTest.syncProcessInfo([]core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false},
{Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false},
})

time.Sleep(1 * time.Second)

errorsChannel := make(chan error, 1)

// Validate that errors in the logs returned
go func() {
errorFound := pluginUnderTest.monitor(pluginUnderTest.getNginxProccessInfo())
errorsChannel <- errorFound
}()

time.Sleep(1 * time.Second)

pluginUnderTest.syncProcessInfo([]core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 6, ParentPid: 1, Name: "worker-6", IsMaster: false},
{Pid: 7, ParentPid: 1, Name: "worker-7", IsMaster: false},
})

_, err = errorLogFile.WriteString("2023/03/14 14:16:23 [emerg] 3871#3871: bind() to 0.0.0.0:8081 failed (98: Address already in use)")
require.NoError(t, err, "Error writing data to error log file")

Expand Down Expand Up @@ -1172,121 +1157,3 @@ func TestNginx_monitorLog(t *testing.T) {
}
}
}

func TestNginx_monitorPids(t *testing.T) {
tests := []struct {
name string
startingProcesses []core.Process
updatedProcesses []core.Process
errorStr string
expectedTestTimeout bool
ticker time.Duration
monitoringPeriod time.Duration
}{
{
name: "postive case",
startingProcesses: tutils.GetProcesses(),
updatedProcesses: []core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false},
{Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false},
},
errorStr: "",
expectedTestTimeout: false,
ticker: 1 * time.Second,
monitoringPeriod: 2 * time.Second,
},
{
name: "timeout case",
startingProcesses: tutils.GetProcesses(),
updatedProcesses: []core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false},
{Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false},
},
errorStr: "",
expectedTestTimeout: true,
ticker: 2 * time.Second,
monitoringPeriod: 2 * time.Second,
},
{
name: "ticker timeout case",
startingProcesses: tutils.GetProcesses(),
updatedProcesses: tutils.GetProcesses(),
errorStr: "Timed out",
expectedTestTimeout: false,
ticker: 2 * time.Second,
monitoringPeriod: 100 * time.Millisecond,
},
}

for _, test := range tests {
t.Logf("Running test %s", test.name)
errorChannel := make(chan string, 1)
env := tutils.GetMockEnvWithProcess()
config := tutils.GetMockAgentConfig()
config.Nginx.ConfigReloadMonitoringPeriod = test.monitoringPeriod
pluginUnderTest := NewNginx(tutils.GetMockCommandClient(&proto.NginxConfig{}), tutils.NewMockNginxBinary(), env, config)

go pluginUnderTest.monitorPids(test.startingProcesses, errorChannel)
pluginUnderTest.syncProcessInfo(test.updatedProcesses)

select {
case err := <-errorChannel:
// Check that the function completed with expected errors
assert.Equal(t, test.errorStr, err)
case <-time.After(test.ticker):
// Timeout if the function takes too long
// t.Error("Timed out waiting for function to complete")
assert.True(t, test.expectedTestTimeout)
}
}
}

func TestParseIntList(t *testing.T) {
processes := tutils.GetProcesses()

pidList := parseIntList(processes)

expectedPids := []int{1, 2, 3}
for i, expected := range expectedPids {
if pidList[i] != expected {
t.Errorf("parseIntList returned value %d at index %d, expected %d", pidList[i], i, expected)
}
}
}

func TestIntersection(t *testing.T) {
tests := []struct {
list1 []int
list2 []int
expected []int
}{
{
list1: []int{},
list2: []int{},
expected: []int{},
},
{
list1: []int{1, 2, 3},
list2: []int{4, 5, 6},
expected: []int{},
},
{
list1: []int{1, 2, 3},
list2: []int{},
expected: []int{},
},
{
list1: []int{1, 2, 3},
list2: []int{2, 3, 4, 5},
expected: []int{2, 3},
},
}

for _, tt := range tests {
result := intersection(tt.list1, tt.list2)
// Check that the output matches the expected result
assert.Equal(t, tt.expected, result, "Intersection should match expected result")
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f76f3fc

Please sign in to comment.