Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove pids draining #292

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 8 additions & 69 deletions src/plugins/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,28 +568,18 @@ func (n *Nginx) monitor(processInfo []core.Process) error {
errorLogs := n.nginxBinary.GetErrorLogs()

logErrorChannel := make(chan string, len(errorLogs))
pidsChannel := make(chan string)
defer close(logErrorChannel)
defer close(pidsChannel)

go n.monitorLogs(errorLogs, logErrorChannel)
go n.monitorPids(processInfo, pidsChannel)

// Expect to receive one message from the pidsChannel and a message for each NGINX error log file in the logErrorChannel
numberOfExpectedMessages := 1 + len(errorLogs)
// Expect to receive one message from a message for each NGINX error log file in the logErrorChannel
numberOfExpectedMessages := len(errorLogs)

for i := 0; i < numberOfExpectedMessages; i++ {
select {
case err := <-pidsChannel:
log.Tracef("message received in pidsChannel: %s", err)
if err != "" {
errorsFound = append(errorsFound, err)
}
case err := <-logErrorChannel:
log.Tracef("message received in logErrorChannel: %s", err)
if err != "" {
errorsFound = append(errorsFound, err)
}
err := <-logErrorChannel
log.Tracef("message received in logErrorChannel: %s", err)
if err != "" {
errorsFound = append(errorsFound, err)
}
}

Expand All @@ -608,63 +598,11 @@ func (n *Nginx) monitorLogs(errorLogs map[string]string, errorChannel chan strin
return
}

for _, logFile := range errorLogs {
for logFile := range errorLogs {
go n.tailLog(logFile, errorChannel)
}
}

func (n *Nginx) monitorPids(processInfo []core.Process, errorChannel chan string) {
ticker := time.NewTicker(500 * time.Millisecond)
startingPids := parseIntList(processInfo)
// wait 200 milliseconds for process information to change
time.Sleep(200 * time.Millisecond)
timeout := time.After(n.config.Nginx.ConfigReloadMonitoringPeriod)

for {
select {
case <-timeout:
log.Trace("Timed out monitoring PIDs")
ticker.Stop()
errorChannel <- "Timed out"
return
case tick := <-ticker.C:
log.Tracef("Monitoring Pids %v", tick)
currentList := parseIntList(n.getNginxProccessInfo())
difference := intersection(startingPids, currentList)
// if there is one pid leftover, that's ok
if len(difference) <= 1 {
errorChannel <- ""
log.Tracef("Success monitoring PIDs")
ticker.Stop()
return
}
}
}
}

func intersection(list1 []int, list2 []int) []int {
m := make(map[int]bool)
intersection := []int{}

for _, item := range list1 {
m[item] = true
}
for _, item := range list2 {
if m[item] {
intersection = append(intersection, item)
}
}
return intersection
}

func parseIntList(data []core.Process) []int {
result := make([]int, len(data))
for index, process := range data {
result[index] = int(process.Pid)
}
return result
}

func (n *Nginx) tailLog(logFile string, errorChannel chan string) {
t, err := tailer.NewTailer(logFile)
if err != nil {
Expand Down Expand Up @@ -693,6 +631,7 @@ func (n *Nginx) tailLog(logFile string, errorChannel chan string) {
}
}
case <-tick.C:
errorChannel <- ""
return
}
}
Expand Down
133 changes: 0 additions & 133 deletions src/plugins/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,30 +1086,15 @@ func TestNginx_monitor(t *testing.T) {

time.Sleep(1 * time.Second)

pluginUnderTest.syncProcessInfo([]core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false},
{Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false},
})

time.Sleep(1 * time.Second)

errorsChannel := make(chan error, 1)

// Validate that errors in the logs returned
go func() {
errorFound := pluginUnderTest.monitor(pluginUnderTest.getNginxProccessInfo())
errorsChannel <- errorFound
}()

time.Sleep(1 * time.Second)

pluginUnderTest.syncProcessInfo([]core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 6, ParentPid: 1, Name: "worker-6", IsMaster: false},
{Pid: 7, ParentPid: 1, Name: "worker-7", IsMaster: false},
})

_, err = errorLogFile.WriteString("2023/03/14 14:16:23 [emerg] 3871#3871: bind() to 0.0.0.0:8081 failed (98: Address already in use)")
require.NoError(t, err, "Error writing data to error log file")

Expand Down Expand Up @@ -1172,121 +1157,3 @@ func TestNginx_monitorLog(t *testing.T) {
}
}
}

func TestNginx_monitorPids(t *testing.T) {
tests := []struct {
name string
startingProcesses []core.Process
updatedProcesses []core.Process
errorStr string
expectedTestTimeout bool
ticker time.Duration
monitoringPeriod time.Duration
}{
{
name: "postive case",
startingProcesses: tutils.GetProcesses(),
updatedProcesses: []core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false},
{Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false},
},
errorStr: "",
expectedTestTimeout: false,
ticker: 1 * time.Second,
monitoringPeriod: 2 * time.Second,
},
{
name: "timeout case",
startingProcesses: tutils.GetProcesses(),
updatedProcesses: []core.Process{
{Pid: 1, Name: "12345", IsMaster: true},
{Pid: 4, ParentPid: 1, Name: "worker-4", IsMaster: false},
{Pid: 5, ParentPid: 1, Name: "worker-5", IsMaster: false},
},
errorStr: "",
expectedTestTimeout: true,
ticker: 2 * time.Second,
monitoringPeriod: 2 * time.Second,
},
{
name: "ticker timeout case",
startingProcesses: tutils.GetProcesses(),
updatedProcesses: tutils.GetProcesses(),
errorStr: "Timed out",
expectedTestTimeout: false,
ticker: 2 * time.Second,
monitoringPeriod: 100 * time.Millisecond,
},
}

for _, test := range tests {
t.Logf("Running test %s", test.name)
errorChannel := make(chan string, 1)
env := tutils.GetMockEnvWithProcess()
config := tutils.GetMockAgentConfig()
config.Nginx.ConfigReloadMonitoringPeriod = test.monitoringPeriod
pluginUnderTest := NewNginx(tutils.GetMockCommandClient(&proto.NginxConfig{}), tutils.NewMockNginxBinary(), env, config)

go pluginUnderTest.monitorPids(test.startingProcesses, errorChannel)
pluginUnderTest.syncProcessInfo(test.updatedProcesses)

select {
case err := <-errorChannel:
// Check that the function completed with expected errors
assert.Equal(t, test.errorStr, err)
case <-time.After(test.ticker):
// Timeout if the function takes too long
// t.Error("Timed out waiting for function to complete")
assert.True(t, test.expectedTestTimeout)
}
}
}

func TestParseIntList(t *testing.T) {
processes := tutils.GetProcesses()

pidList := parseIntList(processes)

expectedPids := []int{1, 2, 3}
for i, expected := range expectedPids {
if pidList[i] != expected {
t.Errorf("parseIntList returned value %d at index %d, expected %d", pidList[i], i, expected)
}
}
}

func TestIntersection(t *testing.T) {
tests := []struct {
list1 []int
list2 []int
expected []int
}{
{
list1: []int{},
list2: []int{},
expected: []int{},
},
{
list1: []int{1, 2, 3},
list2: []int{4, 5, 6},
expected: []int{},
},
{
list1: []int{1, 2, 3},
list2: []int{},
expected: []int{},
},
{
list1: []int{1, 2, 3},
list2: []int{2, 3, 4, 5},
expected: []int{2, 3},
},
}

for _, tt := range tests {
result := intersection(tt.list1, tt.list2)
// Check that the output matches the expected result
assert.Equal(t, tt.expected, result, "Intersection should match expected result")
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.