diff --git a/pkg/stanza/operator/transformer/recombine/package_test.go b/pkg/stanza/operator/transformer/recombine/package_test.go new file mode 100644 index 000000000000..76aa4014d0d1 --- /dev/null +++ b/pkg/stanza/operator/transformer/recombine/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package recombine + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index fffa50ce4c89..28ec0b8a8e0a 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -504,6 +504,7 @@ func TestTransformer(t *testing.T) { op, err := tc.config.Build(testutil.Logger(t)) require.NoError(t, err) require.NoError(t, op.Start(testutil.NewUnscopedMockPersister())) + defer func() { require.NoError(t, op.Stop()) }() recombine := op.(*Transformer) fake := testutil.NewFakeOutput(t) @@ -709,13 +710,21 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { require.NoError(t, recombine.Start(nil)) require.NoError(t, recombine.Process(ctx, e)) + done := make(chan struct{}) + ticker := time.NewTicker(cfg.ForceFlushTimeout / 2) go func() { next := entry.New() next.Timestamp = time.Now() next.Body = "next" for { - time.Sleep(cfg.ForceFlushTimeout / 2) - require.NoError(t, recombine.Process(ctx, next)) + select { + case <-done: + ticker.Stop() + return + case <-ticker.C: + require.NoError(t, recombine.Process(ctx, next)) + + } } }() @@ -726,6 +735,7 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { t.FailNow() } require.NoError(t, recombine.Stop()) + close(done) } func TestSourceBatchDelete(t *testing.T) {