From c84e574497f3bbcb38c0fd0b7575efb4597c2542 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Sat, 14 Dec 2024 13:14:31 -0500 Subject: [PATCH 1/5] fix(api): send to closed channel in mergeLogStreams (#7006) Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- server/application/logs_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/server/application/logs_test.go b/server/application/logs_test.go index 76bd5df134ae9..ad47c6d94fc2a 100644 --- a/server/application/logs_test.go +++ b/server/application/logs_test.go @@ -75,3 +75,33 @@ func TestMergeLogStreams(t *testing.T) { assert.Equal(t, []string{"1", "2", "3", "4"}, lines) } + +func TestMergeLogStreams_RaceCondition(t *testing.T) { + // Test for regression of this issue: https://github.com/argoproj/argo-cd/issues/7006 + for i := 0; i < 100; i++ { + first := make(chan logEntry) + second := make(chan logEntry) + + go func() { + parseLogsStream("first", io.NopCloser(strings.NewReader(`2021-02-09T00:00:01Z 1`)), first) + time.Sleep(time.Duration(i%5) * time.Millisecond) + close(first) + }() + + go func() { + parseLogsStream("second", io.NopCloser(strings.NewReader(`2021-02-09T00:00:02Z 2`)), second) + time.Sleep(time.Duration((i+2)%5) * time.Millisecond) + close(second) + }() + + merged := mergeLogStreams([]chan logEntry{first, second}, 1*time.Millisecond) + + var lines []string + for entry := range merged { + lines = append(lines, entry.line) + } + + expected := []string{"1", "2"} + assert.Equal(t, expected, lines) + } +} From e38d948f3e4151a34f11cd79a3fff476dbc79b28 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Sat, 14 Dec 2024 13:45:17 -0500 Subject: [PATCH 2/5] more intense test Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- server/application/logs_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/application/logs_test.go b/server/application/logs_test.go index ad47c6d94fc2a..92e89f036391a 100644 --- a/server/application/logs_test.go +++ b/server/application/logs_test.go @@ -78,23 +78,23 @@ func TestMergeLogStreams(t *testing.T) { func TestMergeLogStreams_RaceCondition(t *testing.T) { // Test for regression of this issue: https://github.com/argoproj/argo-cd/issues/7006 - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { first := make(chan logEntry) second := make(chan logEntry) go func() { parseLogsStream("first", io.NopCloser(strings.NewReader(`2021-02-09T00:00:01Z 1`)), first) - time.Sleep(time.Duration(i%5) * time.Millisecond) + time.Sleep(time.Duration(i%10) * time.Millisecond) close(first) }() go func() { parseLogsStream("second", io.NopCloser(strings.NewReader(`2021-02-09T00:00:02Z 2`)), second) - time.Sleep(time.Duration((i+2)%5) * time.Millisecond) + time.Sleep(time.Duration((i+5)%10) * time.Millisecond) close(second) }() - merged := mergeLogStreams([]chan logEntry{first, second}, 1*time.Millisecond) + merged := mergeLogStreams([]chan logEntry{first, second}, 5*time.Millisecond) var lines []string for entry := range merged { From d23d5b115b5ab47a45ada1c71aaf94ec86145bd1 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Sat, 14 Dec 2024 14:34:10 -0500 Subject: [PATCH 3/5] even more intense Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- server/application/logs_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/server/application/logs_test.go b/server/application/logs_test.go index 92e89f036391a..b90605b3f86a7 100644 --- a/server/application/logs_test.go +++ b/server/application/logs_test.go @@ -78,30 +78,27 @@ func TestMergeLogStreams(t *testing.T) { func TestMergeLogStreams_RaceCondition(t *testing.T) { // Test for regression of this issue: https://github.com/argoproj/argo-cd/issues/7006 - for i := 0; i < 1000; i++ { + for i := 0; i < 5000; i++ { first := make(chan logEntry) second := make(chan logEntry) go func() { parseLogsStream("first", io.NopCloser(strings.NewReader(`2021-02-09T00:00:01Z 1`)), first) - time.Sleep(time.Duration(i%10) * time.Millisecond) + time.Sleep(time.Duration(i%3) * time.Millisecond) // Reduce the variability in timing close(first) }() go func() { parseLogsStream("second", io.NopCloser(strings.NewReader(`2021-02-09T00:00:02Z 2`)), second) - time.Sleep(time.Duration((i+5)%10) * time.Millisecond) + time.Sleep(time.Duration((i+1)%3) * time.Millisecond) close(second) }() - merged := mergeLogStreams([]chan logEntry{first, second}, 5*time.Millisecond) + merged := mergeLogStreams([]chan logEntry{first, second}, 1*time.Millisecond) var lines []string for entry := range merged { lines = append(lines, entry.line) } - - expected := []string{"1", "2"} - assert.Equal(t, expected, lines) } } From d31a228757d433f793046d500dd96c608d3cea23 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Sat, 14 Dec 2024 14:36:41 -0500 Subject: [PATCH 4/5] remove unnecessary comment Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- server/application/logs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/application/logs_test.go b/server/application/logs_test.go index b90605b3f86a7..d182b535bebd1 100644 --- a/server/application/logs_test.go +++ b/server/application/logs_test.go @@ -84,7 +84,7 @@ func TestMergeLogStreams_RaceCondition(t *testing.T) { go func() { parseLogsStream("first", io.NopCloser(strings.NewReader(`2021-02-09T00:00:01Z 1`)), first) - time.Sleep(time.Duration(i%3) * time.Millisecond) // Reduce the variability in timing + time.Sleep(time.Duration(i%3) * time.Millisecond) close(first) }() From a3a0156de132c607a93c92f5175fc010d00aa937 Mon Sep 17 00:00:00 2001 From: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Date: Sat, 14 Dec 2024 14:49:32 -0500 Subject: [PATCH 5/5] fix the race condition Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- server/application/logs.go | 27 +++++++++++++++++++-------- server/application/logs_test.go | 9 ++++++--- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/server/application/logs.go b/server/application/logs.go index 778f04edec66e..b52eef81e2e77 100644 --- a/server/application/logs.go +++ b/server/application/logs.go @@ -120,16 +120,22 @@ func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) c var sentAt time.Time ticker := time.NewTicker(bufferingDuration) + done := make(chan struct{}) go func() { - for range ticker.C { - sentAtLock.Lock() - // waited long enough for logs from each streams, send everything accumulated - if sentAt.Add(bufferingDuration).Before(time.Now()) { - _ = send(true) - sentAt = time.Now() - } + for { + select { + case <-done: + return + case <-ticker.C: + sentAtLock.Lock() + // waited long enough for logs from each streams, send everything accumulated + if sentAt.Add(bufferingDuration).Before(time.Now()) { + _ = send(true) + sentAt = time.Now() + } - sentAtLock.Unlock() + sentAtLock.Unlock() + } } }() @@ -145,6 +151,11 @@ func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) c _ = send(true) ticker.Stop() + // ticker.Stop() does not close the channel, and it does not wait for the channel to be drained. So we need to + // explicitly prevent the gorountine from leaking by closing the channel. We also need to prevent the goroutine + // from calling `send` again, because `send` pushes to the `merged` channel which we're about to close. + // This describes the approach nicely: https://stackoverflow.com/questions/17797754/ticker-stop-behaviour-in-golang + done <- struct{}{} close(merged) }() return merged diff --git a/server/application/logs_test.go b/server/application/logs_test.go index d182b535bebd1..7a565e37efa79 100644 --- a/server/application/logs_test.go +++ b/server/application/logs_test.go @@ -96,9 +96,12 @@ func TestMergeLogStreams_RaceCondition(t *testing.T) { merged := mergeLogStreams([]chan logEntry{first, second}, 1*time.Millisecond) - var lines []string - for entry := range merged { - lines = append(lines, entry.line) + // Drain the channel + for range merged { } + + // This test intentionally doesn't test the order of the output. Under these intense conditions, the test would + // fail often due to out of order entries. This test is only meant to reproduce a race between a channel writer + // and channel closer. } }