diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index a74a4bd5bbc..c816b2beff5 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1626,7 +1626,7 @@ func TestFBOtelRestartE2E(t *testing.T) { // It starts a filebeat receiver, waits for some logs and then stops it. // It then restarts the collector for the remaining of the test. // At the end it asserts that the unique number of logs in ES is equal to the number of - // lines in the input file. It is likely that there are duplicates due to the restart. + // lines in the input file. info := define.Require(t, define.Requirements{ Group: Default, Local: true, @@ -1656,7 +1656,8 @@ func TestFBOtelRestartE2E(t *testing.T) { esApiKey, err := createESApiKey(info.ESClient) require.NoError(t, err, "error creating API key") require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) - index := "logs-integration-default" + // Use a unique index to avoid conflicts with other parallel runners + index := strings.ToLower("logs-generic-default-" + randStr(8)) otelConfigTemplate := `receivers: filebeatreceiver: filebeat: @@ -1694,6 +1695,8 @@ exporters: flush_timeout: 1s mapping: mode: bodymap + logs_dynamic_id: + enabled: true service: pipelines: logs: @@ -1744,14 +1747,14 @@ service: } _, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i))) - require.NoErrorf(t, err, "failed to write line %d to temp file", i) + assert.NoErrorf(t, err, "failed to write line %d to temp file", i) _, err = inputFile.Write([]byte("\n")) - require.NoErrorf(t, err, "failed to write newline to temp file") + assert.NoError(t, err, "failed to write newline to temp file") inputLinesCounter.Add(1) time.Sleep(100 * time.Millisecond) } err = inputFile.Close() - require.NoError(t, err, "failed to close input file") + assert.NoError(t, err, "failed to close input file") }() t.Cleanup(func() { @@ -1771,7 +1774,9 @@ service: go func() { err = fixture.RunOtelWithClient(fCtx) cancel() - require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err) + assert.Conditionf(t, func() bool { + return err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) + }, "unexpected error: %v", err) close(stoppedCh) }() @@ -1832,9 +1837,7 @@ service: require.True(t, found, "expected message field in document %q", hit.Source) msg, ok := message.(string) require.True(t, ok, "expected message field to be a string, got %T", message) - if _, found := uniqueIngestedLogs[msg]; found { - t.Logf("log line %q was ingested more than once", message) - } + require.NotContainsf(t, uniqueIngestedLogs, msg, "found duplicated log message %q", msg) uniqueIngestedLogs[msg] = struct{}{} } actualHits.UniqueHits = len(uniqueIngestedLogs)