Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ func TestFBOtelRestartE2E(t *testing.T) {
// It starts a filebeat receiver, waits for some logs and then stops it.
// It then restarts the collector for the remaining of the test.
// At the end it asserts that the unique number of logs in ES is equal to the number of
// lines in the input file. It is likely that there are duplicates due to the restart.
// lines in the input file.
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
Expand Down Expand Up @@ -1656,7 +1656,8 @@ func TestFBOtelRestartE2E(t *testing.T) {
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
index := "logs-integration-default"
// Use a unique index to avoid conflicts with other parallel runners
index := strings.ToLower("logs-generic-default-" + randStr(8))
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
Expand Down Expand Up @@ -1694,6 +1695,8 @@ exporters:
flush_timeout: 1s
mapping:
mode: bodymap
logs_dynamic_id:
enabled: true
service:
pipelines:
logs:
Expand Down Expand Up @@ -1744,14 +1747,14 @@ service:
}

_, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
assert.NoErrorf(t, err, "failed to write line %d to temp file", i)
_, err = inputFile.Write([]byte("\n"))
require.NoErrorf(t, err, "failed to write newline to temp file")
assert.NoError(t, err, "failed to write newline to temp file")
inputLinesCounter.Add(1)
time.Sleep(100 * time.Millisecond)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close input file")
assert.NoError(t, err, "failed to close input file")
}()

t.Cleanup(func() {
Expand All @@ -1771,7 +1774,9 @@ service:
go func() {
err = fixture.RunOtelWithClient(fCtx)
cancel()
require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err)
assert.Conditionf(t, func() bool {
return err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)
}, "unexpected error: %v", err)
close(stoppedCh)
}()

Expand Down Expand Up @@ -1832,9 +1837,7 @@ service:
require.True(t, found, "expected message field in document %q", hit.Source)
msg, ok := message.(string)
require.True(t, ok, "expected message field to be a string, got %T", message)
if _, found := uniqueIngestedLogs[msg]; found {
t.Logf("log line %q was ingested more than once", message)
}
require.NotContainsf(t, uniqueIngestedLogs, msg, "found duplicated log message %q", msg)
uniqueIngestedLogs[msg] = struct{}{}
}
actualHits.UniqueHits = len(uniqueIngestedLogs)
Expand Down