Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,3 +1099,168 @@ service:
fixtureWg.Wait()
require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error())
}

func TestOtelFBReceiverBatchSplitE2E(t *testing.T) {
// This test asserts that the batcher configuration from the elasticseachexporter
// splits batches to avoid a 413 Request Entity Too Large error from Elasticsearch.
info := define.Require(t, define.Requirements{
Comment on lines +1104 to +1106
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking why we need this to be an integration test that runs the entire collector. What additional test coverage is this adding? Why can't this be covered by a unit test in the exporter?

I like integration tests, they catch a lot of problems, but they are also driving our CI execution time to infinity :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think this is better suited to be in the exporter. Actually, it might as well be covered in the exporter already by open-telemetry/opentelemetry-collector-contrib#36396. I will double check.

Group: Default,
Local: true,
OS: []define.OS{
{Type: define.Windows},
{Type: define.Linux},
{Type: define.Darwin},
},
Stack: &define.Stack{},
})
tmpDir := t.TempDir()

// Batcher flush::bytes is set to 5MB by default, so we need to make
// sure we ingest more than that in a single batch.
lineSize := 5 << 20 // 5MB
// Elasticsearch has a default limit for http.max_content_length of 100MB,
// We shouldn't hit this limit as the batcher should flush before that.
numEvents := 100/lineSize + 1

// Create the data file to ingest
inputFile, err := os.CreateTemp(tmpDir, "input.txt")
require.NoError(t, err, "failed to create temp file to hold data to ingest")
inputFilePath := inputFile.Name()
for i := 0; i < numEvents; i++ {
_, err = inputFile.Write([]byte(fmt.Sprintf("%s\n", strings.Repeat("a", lineSize))))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close data temp file")
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(inputFilePath)
if err != nil {
t.Logf("no data file to import at %s", inputFilePath)
return
}
}
})

// Create the otel configuration file
type otelConfigOptions struct {
InputPath string
HomeDir string
ESEndpoint string
ESApiKey string
Index string
}
esEndpoint, err := getESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
index := "logs-integration-default"
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
inputs:
- type: filestream
id: filestream-end-to-end
enabled: true
paths:
- {{.InputPath}}
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
path.home: {{.HomeDir}}
queue.mem.flush.timeout: 0s
exporters:
elasticsearch/log:
endpoints:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
- filebeatreceiver
exporters:
- elasticsearch/log
`
otelConfigPath := filepath.Join(tmpDir, "otel.yml")
var otelConfigBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer,
otelConfigOptions{
InputPath: inputFilePath,
HomeDir: tmpDir,
ESEndpoint: esEndpoint,
ESApiKey: esApiKey.Encoded,
Index: index,
}))
require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600))
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(otelConfigPath)
if err != nil {
t.Logf("No otel configuration file at %s", otelConfigPath)
return
}
t.Logf("Contents of otel config file:\n%s\n", string(contents))
}
})
// Now we can actually create the fixture and run it
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath}))
require.NoError(t, err)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()
err = fixture.Prepare(ctx, fakeComponent)
require.NoError(t, err)

var fixtureWg sync.WaitGroup
fixtureWg.Add(1)
go func() {
defer fixtureWg.Done()
err = fixture.RunOtelWithClient(ctx)
}()

// Make sure find all the logs
actualHits := &struct {
Hits int
Docs []estools.ESDoc
}{}
require.Eventually(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{
"log.file.path": inputFilePath,
})
require.NoError(t, err)

actualHits.Hits = docs.Hits.Total.Value
actualHits.Docs = docs.Hits.Hits

return actualHits.Hits == numEvents
},
2*time.Minute, 1*time.Second,
"Expected %d logs, got %v", numEvents, actualHits)

// Check that the logs were not truncated
require.Len(t, actualHits.Docs, numEvents)
for i, hit := range actualHits.Docs {
require.NotEmpty(t, hit.Source, "expected a non-empty doc")
require.Len(t, hit.Source["message"], lineSize, "message size mismatch for log line %d", i)
}

cancel()
fixtureWg.Wait()
require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error())
}
Loading