diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index e42178a2005..2e98850b670 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1099,3 +1099,168 @@ service: fixtureWg.Wait() require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) } + +func TestOtelFBReceiverBatchSplitE2E(t *testing.T) { + // This test asserts that the batcher configuration from the elasticseachexporter + // splits batches to avoid a 413 Request Entity Too Large error from Elasticsearch. + info := define.Require(t, define.Requirements{ + Group: Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + + // Batcher flush::bytes is set to 5MB by default, so we need to make + // sure we ingest more than that in a single batch. + lineSize := 5 << 20 // 5MB + // Elasticsearch has a default limit for http.max_content_length of 100MB, + // We shouldn't hit this limit as the batcher should flush before that. + numEvents := 100/lineSize + 1 + + // Create the data file to ingest + inputFile, err := os.CreateTemp(tmpDir, "input.txt") + require.NoError(t, err, "failed to create temp file to hold data to ingest") + inputFilePath := inputFile.Name() + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("%s\n", strings.Repeat("a", lineSize)))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data temp file") + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(inputFilePath) + if err != nil { + t.Logf("no data file to import at %s", inputFilePath) + return + } + } + }) + + // Create the otel configuration file + type otelConfigOptions struct { + InputPath string + HomeDir string + ESEndpoint string + ESApiKey string + Index string + } + esEndpoint, err := getESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + index := "logs-integration-default" + otelConfigTemplate := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-end-to-end + enabled: true + paths: + - {{.InputPath}} + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}} + queue.mem.flush.timeout: 0s +exporters: + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log +` + otelConfigPath := filepath.Join(tmpDir, "otel.yml") + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + InputPath: inputFilePath, + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + Index: index, + })) + require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600)) + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(otelConfigPath) + if err != nil { + t.Logf("No otel configuration file at %s", otelConfigPath) + return + } + t.Logf("Contents of otel config file:\n%s\n", string(contents)) + } + }) + // Now we can actually create the fixture and run it + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath})) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx, fakeComponent) + require.NoError(t, err) + + var fixtureWg sync.WaitGroup + fixtureWg.Add(1) + go func() { + defer fixtureWg.Done() + err = fixture.RunOtelWithClient(ctx) + }() + + // Make sure find all the logs + actualHits := &struct { + Hits int + Docs []estools.ESDoc + }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(t, err) + + actualHits.Hits = docs.Hits.Total.Value + actualHits.Docs = docs.Hits.Hits + + return actualHits.Hits == numEvents + }, + 2*time.Minute, 1*time.Second, + "Expected %d logs, got %v", numEvents, actualHits) + + // Check that the logs were not truncated + require.Len(t, actualHits.Docs, numEvents) + for i, hit := range actualHits.Docs { + require.NotEmpty(t, hit.Source, "expected a non-empty doc") + require.Len(t, hit.Source["message"], lineSize, "message size mismatch for log line %d", i) + } + + cancel() + fixtureWg.Wait() + require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) +}