From 00a376a99a1d9e0c3b58fc5470162390e9a9972c Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Tue, 6 Feb 2024 16:40:09 -0800 Subject: [PATCH 1/2] [chore][pkg/stanza/operator/transformer/recombine] Enable goleak check --- .../transformer/recombine/package_test.go | 14 ++++++++++++++ .../transformer/recombine/recombine_test.go | 18 ++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 pkg/stanza/operator/transformer/recombine/package_test.go diff --git a/pkg/stanza/operator/transformer/recombine/package_test.go b/pkg/stanza/operator/transformer/recombine/package_test.go new file mode 100644 index 000000000000..76aa4014d0d1 --- /dev/null +++ b/pkg/stanza/operator/transformer/recombine/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package recombine + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index fffa50ce4c89..eee500354ca1 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -504,6 +505,7 @@ func TestTransformer(t *testing.T) { op, err := tc.config.Build(testutil.Logger(t)) require.NoError(t, err) require.NoError(t, op.Start(testutil.NewUnscopedMockPersister())) + defer func() { require.NoError(t, op.Stop()) }() recombine := op.(*Transformer) fake := testutil.NewFakeOutput(t) @@ -709,13 +711,23 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { require.NoError(t, recombine.Start(nil)) require.NoError(t, recombine.Process(ctx, e)) + quit := make(chan int) + var stopWG sync.WaitGroup + stopWG.Add(1) go func() { + defer stopWG.Done() + next := entry.New() next.Timestamp = time.Now() next.Body = "next" for { - time.Sleep(cfg.ForceFlushTimeout / 2) - require.NoError(t, recombine.Process(ctx, next)) + select { + case <-quit: + return + case <-time.After(cfg.ForceFlushTimeout / 2): + require.NoError(t, recombine.Process(ctx, next)) + + } } }() @@ -726,6 +738,8 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { t.FailNow() } require.NoError(t, recombine.Stop()) + quit <- 0 + stopWG.Wait() } func TestSourceBatchDelete(t *testing.T) { From c99106b111f60c02b88d4b8a5bf6a0d5d0d2b363 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Wed, 7 Feb 2024 08:25:34 -0800 Subject: [PATCH 2/2] Cleanup channel and timer usage --- .../transformer/recombine/recombine_test.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index eee500354ca1..28ec0b8a8e0a 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "strings" - "sync" "testing" "time" @@ -711,20 +710,18 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { require.NoError(t, recombine.Start(nil)) require.NoError(t, recombine.Process(ctx, e)) - quit := make(chan int) - var stopWG sync.WaitGroup - stopWG.Add(1) + done := make(chan struct{}) + ticker := time.NewTicker(cfg.ForceFlushTimeout / 2) go func() { - defer stopWG.Done() - next := entry.New() next.Timestamp = time.Now() next.Body = "next" for { select { - case <-quit: + case <-done: + ticker.Stop() return - case <-time.After(cfg.ForceFlushTimeout / 2): + case <-ticker.C: require.NoError(t, recombine.Process(ctx, next)) } @@ -738,8 +735,7 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { t.FailNow() } require.NoError(t, recombine.Stop()) - quit <- 0 - stopWG.Wait() + close(done) } func TestSourceBatchDelete(t *testing.T) {