Skip to content
7 changes: 5 additions & 2 deletions internal/pkg/otel/translate/otelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,11 @@ func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string
// we also want to use dynamic log ids
esConfig["logs_dynamic_id"] = map[string]any{"enabled": true}

// for compatibility with beats, we want bodymap mapping
esConfig["mapping"] = map[string]any{"mode": "bodymap"}
// logs failed documents at debug level
esConfig["telemetry"] = map[string]any{
"log_failed_docs_input": true,
}

return esConfig, nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/otel/translate/otelconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ func TestGetOtelConfig(t *testing.T) {
"logs_dynamic_id": map[string]any{
"enabled": true,
},
"telemetry": map[string]any{
"log_failed_docs_input": true,
},
"auth": map[string]any{
"authenticator": "beatsauth/_agent-component/" + outputName,
},
Expand Down
1 change: 1 addition & 0 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error {

// Exec provides a way of performing subcommand on the prepared Elastic Agent binary.
func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOption) ([]byte, error) {
f.t.Helper()
err := f.EnsurePrepared(ctx)
if err != nil {
return nil, fmt.Errorf("failed to prepare before exec: %w", err)
Expand Down
256 changes: 256 additions & 0 deletions testing/integration/ess/beat_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"os/exec"
"runtime"
"strings"
Expand All @@ -21,6 +22,7 @@ import (
"time"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/go-elasticsearch/v8"

"github.com/gofrs/uuid/v5"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -999,3 +1001,257 @@ func genIgnoredFields(goos string) []string {
}
}
}

// TestSensitiveLogsESExporter tests sensitive logs from ex-exporter are not sent to fleet
func TestSensitiveLogsESExporter(t *testing.T) {
// The ES exporter logs the original document on indexing failure only if
// the "telemetry::log_failed_docs_input" setting is enabled and the log level is set to debug.
info := define.Require(t, define.Requirements{
Group: integration.Default,
Local: true,
OS: []define.OS{
{Type: define.Windows},
{Type: define.Linux},
{Type: define.Darwin},
},
Stack: &define.Stack{},
})
tmpDir := t.TempDir()
numEvents := 50
// Create the data file to ingest
inputFile, err := os.CreateTemp(tmpDir, "input.txt")
require.NoError(t, err, "failed to create temp file to hold data to ingest")
inputFilePath := inputFile.Name()

// these messages will fail to index as message is expected to be of integer type
for i := 0; i < numEvents; i++ {
_, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close data temp file")

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

// Create the otel configuration file
type otelConfigOptions struct {
InputPath string
ESEndpoint string
ESApiKey string
Namespace string
}
esEndpoint, err := integration.GetESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
decodedApiKey, err := getDecodedApiKey(esApiKey)
require.NoError(t, err)

configTemplate := `
inputs:
- type: filestream
id: filestream-e2e
use_output: default
_runtime_experimental: otel
streams:
- id: e2e
data_stream:
dataset: sensitive
namespace: {{ .Namespace }}
paths:
- {{.InputPath}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
outputs:
default:
type: elasticsearch
hosts: [{{.ESEndpoint}}]
api_key: "{{.ESApiKey}}"
agent:
monitoring:
enabled: true
metrics: false
logs: true
_runtime_experimental: otel
agent.logging.level: debug
agent.logging.stderr: true
`
index := "logs-sensitive-" + info.Namespace
var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
otelConfigOptions{
InputPath: inputFilePath,
ESEndpoint: esEndpoint,
ESApiKey: decodedApiKey,
Namespace: info.Namespace,
}))

ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
defer cancel()
err = fixture.Prepare(ctx)
require.NoError(t, err)

err = fixture.Configure(ctx, configBuffer.Bytes())
require.NoError(t, err)

cmd, err := fixture.PrepareAgentCommand(ctx, nil)
require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err)

err = setStrictMapping(info.ESClient, index)
require.NoError(t, err, "could not set strict mapping due to %v", err)

timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

output := strings.Builder{}
cmd.Stderr = &output
cmd.Stdout = &output

err = cmd.Start()
require.NoError(t, err)

// Make sure the Elastic-Agent process is not running before
// exiting the test
t.Cleanup(func() {
// Ignore the error because we cancelled the context,
// and that always returns an error
_ = cmd.Wait()
if t.Failed() {
t.Log("Elastic-Agent output:")
t.Log(output.String())
}
})

require.EventuallyWithT(t, func(collect *assert.CollectT) {
var statusErr error
status, statusErr := fixture.ExecStatus(ctx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 2)
}, 1*time.Minute, 1*time.Second)

// Check 1:
// Ensure sensitive logs from ES exporter are not shipped to ES
rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": map[string]any{
"match_phrase": map[string]any{
// this message comes from ES exporter
"message": "failed to index document; input may contain sensitive data",
},
},
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

var monitoringDoc estools.Documents
assert.EventuallyWithT(t,
func(ct *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second)
defer findCancel()

monitoringDoc, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-elastic_agent-default*", info.ESClient)
require.NoError(ct, err)

assert.GreaterOrEqual(ct, monitoringDoc.Hits.Total.Value, 1)
},
2*time.Minute, 5*time.Second,
"Expected at least %d log, got %d", 1, monitoringDoc.Hits.Total.Value)

inputField := monitoringDoc.Hits.Hits[0].Source["input"]
inputFieldStr, ok := inputField.(string)
if ok {
// we check if it contains the original message line
assert.NotContains(t, inputFieldStr, "message: Line", "monitoring logs contain original input")
}

// Check 2:
// Ensure event logs from elastic owned components is not shipped i.e drop_processor works correctly
rawQuery = map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": map[string]any{
"match": map[string]any{
// event logs contain a special field on them
"log.type": "event",
},
},
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, "logs-elastic_agent*", map[string]interface{}{
"log.type": "event",
})

assert.NoError(t, err)
assert.Zero(t, docs.Hits.Total.Value)
}

// setStrictMapping takes es client and index name
// and sets strict mapping for that index.
// Useful to reproduce mapping conflicts required for testing
func setStrictMapping(client *elasticsearch.Client, index string) error {
// Define the body
body := map[string]interface{}{
"index_patterns": []string{index + "*"},
"template": map[string]interface{}{
"mappings": map[string]interface{}{
"dynamic": "strict",
"properties": map[string]interface{}{
"@timestamp": map[string]string{"type": "date"},
"message": map[string]string{"type": "integer"}, // we set message type to integer to cause mapping conflict
},
},
},
"priority": 500,
}

// Marshal body to JSON
jsonData, err := json.Marshal(body)
if err != nil {
panic(err)
}

esEndpoint, err := integration.GetESHost()
if err != nil {
return fmt.Errorf("error getting elasticsearch endpoint: %v", err)
}

// Create a context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Build request
req, err := http.NewRequestWithContext(ctx, http.MethodPut,
esEndpoint+"/_index_template/no-dynamic-template",
bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("could not create http request to ES server: %v", err)
}

// Set content type header
req.Header.Set("Content-Type", "application/json")

resp, err := client.Perform(req)
if err != nil {
return fmt.Errorf("error performing request: %v", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("incorrect response code: %v", err)
}
return nil
}