Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions internal/pkg/otel/translate/otelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,11 @@ func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string
// we also want to use dynamic log ids
esConfig["logs_dynamic_id"] = map[string]any{"enabled": true}

// for compatibility with beats, we want bodymap mapping
esConfig["mapping"] = map[string]any{"mode": "bodymap"}
// logs failed documents at debug level
esConfig["telemetry"] = map[string]any{
"log_failed_docs_input": true,
}

return esConfig, nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/otel/translate/otelconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ func TestGetOtelConfig(t *testing.T) {
"logs_dynamic_id": map[string]any{
"enabled": true,
},
"telemetry": map[string]any{
"log_failed_docs_input": true,
},
"auth": map[string]any{
"authenticator": "beatsauth/_agent-component/" + outputName,
},
Expand Down
1 change: 1 addition & 0 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error {

// Exec provides a way of performing subcommand on the prepared Elastic Agent binary.
func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOption) ([]byte, error) {
f.t.Helper()
err := f.EnsurePrepared(ctx)
if err != nil {
return nil, fmt.Errorf("failed to prepare before exec: %w", err)
Expand Down
256 changes: 256 additions & 0 deletions testing/integration/ess/beat_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"os/exec"
"runtime"
"strings"
Expand All @@ -21,6 +22,7 @@ import (
"time"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/go-elasticsearch/v8"

"github.com/gofrs/uuid/v5"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -980,3 +982,257 @@ func genIgnoredFields(goos string) []string {
}
}
}

// TestSensitiveLogsESExporter tests sensitive logs from ex-exporter are not sent to fleet
func TestSensitiveLogsESExporter(t *testing.T) {
// The ES exporter logs the original document on indexing failure only if
// the "telemetry::log_failed_docs_input" setting is enabled and the log level is set to debug.
info := define.Require(t, define.Requirements{
Group: integration.Default,
Local: true,
OS: []define.OS{
{Type: define.Windows},
{Type: define.Linux},
{Type: define.Darwin},
},
Stack: &define.Stack{},
})
tmpDir := t.TempDir()
numEvents := 50
// Create the data file to ingest
inputFile, err := os.CreateTemp(tmpDir, "input.txt")
require.NoError(t, err, "failed to create temp file to hold data to ingest")
inputFilePath := inputFile.Name()

// these messages will fail to index as message is expected to be of integer type
for i := 0; i < numEvents; i++ {
_, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close data temp file")

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

// Create the otel configuration file
type otelConfigOptions struct {
InputPath string
ESEndpoint string
ESApiKey string
Namespace string
}
esEndpoint, err := integration.GetESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
decodedApiKey, err := getDecodedApiKey(esApiKey)
require.NoError(t, err)

configTemplate := `
inputs:
- type: filestream
id: filestream-e2e
use_output: default
_runtime_experimental: otel
streams:
- id: e2e
data_stream:
dataset: sensitive
namespace: {{ .Namespace }}
paths:
- {{.InputPath}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
outputs:
default:
type: elasticsearch
hosts: [{{.ESEndpoint}}]
api_key: "{{.ESApiKey}}"
agent:
monitoring:
enabled: true
metrics: false
logs: true
_runtime_experimental: otel
agent.logging.level: debug
agent.logging.stderr: true
`
index := "logs-sensitive-" + info.Namespace
var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
otelConfigOptions{
InputPath: inputFilePath,
ESEndpoint: esEndpoint,
ESApiKey: decodedApiKey,
Namespace: info.Namespace,
}))

ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
defer cancel()
err = fixture.Prepare(ctx)
require.NoError(t, err)

err = fixture.Configure(ctx, configBuffer.Bytes())
require.NoError(t, err)

cmd, err := fixture.PrepareAgentCommand(ctx, nil)
require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err)

err = setStrictMapping(info.ESClient, index)
require.NoError(t, err, "could not set strict mapping due to %v", err)

timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

output := strings.Builder{}
cmd.Stderr = &output
cmd.Stdout = &output

err = cmd.Start()
require.NoError(t, err)

// Make sure the Elastic-Agent process is not running before
// exiting the test
t.Cleanup(func() {
// Ignore the error because we cancelled the context,
// and that always returns an error
_ = cmd.Wait()
if t.Failed() {
t.Log("Elastic-Agent output:")
t.Log(output.String())
}
})

require.EventuallyWithT(t, func(collect *assert.CollectT) {
var statusErr error
status, statusErr := fixture.ExecStatus(ctx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 2)
}, 1*time.Minute, 1*time.Second)

// Check 1:
// Ensure sensitive logs from ES exporter are not shipped to ES
rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": map[string]any{
"match_phrase": map[string]any{
// this message comes from ES exporter
"message": "failed to index document; input may contain sensitive data",
},
},
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

var monitoringDoc estools.Documents
assert.EventuallyWithT(t,
func(ct *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second)
defer findCancel()

monitoringDoc, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-elastic_agent-default*", info.ESClient)
require.NoError(ct, err)

assert.GreaterOrEqual(ct, monitoringDoc.Hits.Total.Value, 1)
},
2*time.Minute, 5*time.Second,
"Expected at least %d log, got %d", 1, monitoringDoc.Hits.Total.Value)

inputField := monitoringDoc.Hits.Hits[0].Source["input"]
inputFieldStr, ok := inputField.(string)
if ok {
// we check if it contains the original message line
assert.NotContains(t, inputFieldStr, "message: Line", "monitoring logs contain original input")
}

// Check 2:
// Ensure event logs from elastic owned components is not shipped i.e drop_processor works correctly
rawQuery = map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": map[string]any{
"match": map[string]any{
// event logs contain a special field on them
"log.type": "event",
},
},
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, "logs-elastic_agent*", map[string]interface{}{
"log.type": "event",
})

assert.NoError(t, err)
assert.Zero(t, docs.Hits.Total.Value)
}

// setStrictMapping takes es client and index name
// and sets strict mapping for that index.
// Useful to reproduce mapping conflicts required for testing
func setStrictMapping(client *elasticsearch.Client, index string) error {
// Define the body
body := map[string]interface{}{
"index_patterns": []string{index + "*"},
"template": map[string]interface{}{
"mappings": map[string]interface{}{
"dynamic": "strict",
"properties": map[string]interface{}{
"@timestamp": map[string]string{"type": "date"},
"message": map[string]string{"type": "integer"}, // we set message type to integer to cause mapping conflict
},
},
},
"priority": 500,
}

// Marshal body to JSON
jsonData, err := json.Marshal(body)
if err != nil {
panic(err)
}

esEndpoint, err := integration.GetESHost()
if err != nil {
return fmt.Errorf("error getting elasticsearch endpoint: %v", err)
}

// Create a context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Build request
req, err := http.NewRequestWithContext(ctx, http.MethodPut,
esEndpoint+"/_index_template/no-dynamic-template",
bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("could not create http request to ES server: %v", err)
}

// Set content type header
req.Header.Set("Content-Type", "application/json")

resp, err := client.Perform(req)
if err != nil {
return fmt.Errorf("error performing request: %v", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("incorrect response code: %v", err)
}
return nil
}