diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index 122f4f954e3..f68d52dc7fb 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -510,8 +510,11 @@ func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string // we also want to use dynamic log ids esConfig["logs_dynamic_id"] = map[string]any{"enabled": true} - // for compatibility with beats, we want bodymap mapping - esConfig["mapping"] = map[string]any{"mode": "bodymap"} + // logs failed documents at debug level + esConfig["telemetry"] = map[string]any{ + "log_failed_docs_input": true, + } + return esConfig, nil } diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go index ac453abe955..51a86d3ad25 100644 --- a/internal/pkg/otel/translate/otelconfig_test.go +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -303,6 +303,9 @@ func TestGetOtelConfig(t *testing.T) { "logs_dynamic_id": map[string]any{ "enabled": true, }, + "telemetry": map[string]any{ + "log_failed_docs_input": true, + }, "auth": map[string]any{ "authenticator": "beatsauth/_agent-component/" + outputName, }, diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index bb9e45d771e..81010fbce3b 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -669,6 +669,7 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { // Exec provides a way of performing subcommand on the prepared Elastic Agent binary. func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOption) ([]byte, error) { + f.t.Helper() err := f.EnsurePrepared(ctx) if err != nil { return nil, fmt.Errorf("failed to prepare before exec: %w", err) diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 3777e13569c..c7531f9b8d7 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "net/http" + "os" "os/exec" "runtime" "strings" @@ -21,6 +22,7 @@ import ( "time" "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/go-elasticsearch/v8" "github.com/gofrs/uuid/v5" "gopkg.in/yaml.v2" @@ -980,3 +982,257 @@ func genIgnoredFields(goos string) []string { } } } + +// TestSensitiveLogsESExporter tests sensitive logs from ex-exporter are not sent to fleet +func TestSensitiveLogsESExporter(t *testing.T) { + // The ES exporter logs the original document on indexing failure only if + // the "telemetry::log_failed_docs_input" setting is enabled and the log level is set to debug. + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 50 + // Create the data file to ingest + inputFile, err := os.CreateTemp(tmpDir, "input.txt") + require.NoError(t, err, "failed to create temp file to hold data to ingest") + inputFilePath := inputFile.Name() + + // these messages will fail to index as message is expected to be of integer type + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data temp file") + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + // Create the otel configuration file + type otelConfigOptions struct { + InputPath string + ESEndpoint string + ESApiKey string + Namespace string + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + decodedApiKey, err := getDecodedApiKey(esApiKey) + require.NoError(t, err) + + configTemplate := ` +inputs: + - type: filestream + id: filestream-e2e + use_output: default + _runtime_experimental: otel + streams: + - id: e2e + data_stream: + dataset: sensitive + namespace: {{ .Namespace }} + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: "{{.ESApiKey}}" +agent: + monitoring: + enabled: true + metrics: false + logs: true + _runtime_experimental: otel +agent.logging.level: debug +agent.logging.stderr: true +` + index := "logs-sensitive-" + info.Namespace + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + InputPath: inputFilePath, + ESEndpoint: esEndpoint, + ESApiKey: decodedApiKey, + Namespace: info.Namespace, + })) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + err = fixture.Configure(ctx, configBuffer.Bytes()) + require.NoError(t, err) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err) + + err = setStrictMapping(info.ESClient, index) + require.NoError(t, err, "could not set strict mapping due to %v", err) + + timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + output := strings.Builder{} + cmd.Stderr = &output + cmd.Stdout = &output + + err = cmd.Start() + require.NoError(t, err) + + // Make sure the Elastic-Agent process is not running before + // exiting the test + t.Cleanup(func() { + // Ignore the error because we cancelled the context, + // and that always returns an error + _ = cmd.Wait() + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + assert.NoError(collect, statusErr) + assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 2) + }, 1*time.Minute, 1*time.Second) + + // Check 1: + // Ensure sensitive logs from ES exporter are not shipped to ES + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": map[string]any{ + "match_phrase": map[string]any{ + // this message comes from ES exporter + "message": "failed to index document; input may contain sensitive data", + }, + }, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var monitoringDoc estools.Documents + assert.EventuallyWithT(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + monitoringDoc, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-elastic_agent-default*", info.ESClient) + require.NoError(ct, err) + + assert.GreaterOrEqual(ct, monitoringDoc.Hits.Total.Value, 1) + }, + 2*time.Minute, 5*time.Second, + "Expected at least %d log, got %d", 1, monitoringDoc.Hits.Total.Value) + + inputField := monitoringDoc.Hits.Hits[0].Source["input"] + inputFieldStr, ok := inputField.(string) + if ok { + // we check if it contains the original message line + assert.NotContains(t, inputFieldStr, "message: Line", "monitoring logs contain original input") + } + + // Check 2: + // Ensure event logs from elastic owned components is not shipped i.e drop_processor works correctly + rawQuery = map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": map[string]any{ + "match": map[string]any{ + // event logs contain a special field on them + "log.type": "event", + }, + }, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, "logs-elastic_agent*", map[string]interface{}{ + "log.type": "event", + }) + + assert.NoError(t, err) + assert.Zero(t, docs.Hits.Total.Value) +} + +// setStrictMapping takes es client and index name +// and sets strict mapping for that index. +// Useful to reproduce mapping conflicts required for testing +func setStrictMapping(client *elasticsearch.Client, index string) error { + // Define the body + body := map[string]interface{}{ + "index_patterns": []string{index + "*"}, + "template": map[string]interface{}{ + "mappings": map[string]interface{}{ + "dynamic": "strict", + "properties": map[string]interface{}{ + "@timestamp": map[string]string{"type": "date"}, + "message": map[string]string{"type": "integer"}, // we set message type to integer to cause mapping conflict + }, + }, + }, + "priority": 500, + } + + // Marshal body to JSON + jsonData, err := json.Marshal(body) + if err != nil { + panic(err) + } + + esEndpoint, err := integration.GetESHost() + if err != nil { + return fmt.Errorf("error getting elasticsearch endpoint: %v", err) + } + + // Create a context + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Build request + req, err := http.NewRequestWithContext(ctx, http.MethodPut, + esEndpoint+"/_index_template/no-dynamic-template", + bytes.NewReader(jsonData)) + if err != nil { + return fmt.Errorf("could not create http request to ES server: %v", err) + } + + // Set content type header + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Perform(req) + if err != nil { + return fmt.Errorf("error performing request: %v", err) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("incorrect response code: %v", err) + } + return nil +}