Skip to content

[beatreceiver] Drop failed doc inputs from ES exporter#8738

Closed
khushijain21 wants to merge 4 commits intoelastic:mainfrom
khushijain21:eventLogs
Closed

[beatreceiver] Drop failed doc inputs from ES exporter#8738
khushijain21 wants to merge 4 commits intoelastic:mainfrom
khushijain21:eventLogs

Conversation

@khushijain21
Copy link
Contributor

@khushijain21 khushijain21 commented Jun 30, 2025

What does this PR do?

This PR adds support to redirect rejected documents from ES exporter to -event.log file.
Note: Documents rejected by ES exporter are logged at debug level only. Unlike our own code where we log it at warn level Ref

Why is it important?

Checklist

  • I have read and understood the pull request guidelines of this project.
  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in ./changelog/fragments using the changelog tool
  • I have added an integration test or an E2E test

How to test this PR locally

  1. Build elastic agent with this commit
  2. Run following from Kibana's Dev Console. This enables strict dynamic mapping
PUT _index_template/no-dynamic-template
{
  "index_patterns": ["logs-test-*"],
  "template": {
    "mappings": {
      "dynamic": "strict",
      "properties": {
        "@timestamp": { "type": "date" },
        "message":     { "type": "text" },
        "user": {
          "properties": {
            "age": { "type": "integer" }
          }
        }
      }
    }
  },
  "priority": 500
}
  1. Run following elastic-agent.yml
elastic-agent.yml
agent.grpc:
  port: 6799

inputs:
  - type: filestream
    # Input ID allowing Elastic Agent to track the state of this input. Must be unique.
    id: your-input-id
    _runtime_experimental: otel
    streams:
      # Stream ID for this data stream allowing Filebeat to track the state of the ingested files. Must be unique.
      # Each filestream data stream creates a separate instance of the Filebeat filestream input.
      - id: your-filestream-stream-id
        paths:
          - /var/log/*.log
        data_stream:
          dataset: test
        processors:
          - add_fields:
              target: ''
              fields:
                "user.age": "20-"          

agent.monitoring:
   enabled: true
   logs: true
   metrics: false

agent.logging.level: debug

outputs:
  default:
    type: elasticsearch
    hosts: [127.0.0.1:9200]

    username: "admin"
    password: "testing"

    # Performance preset for elasticsearch outputs. One of "balanced", "throughput",
    # "scale", "latency" and "custom".
    # The default if unspecified is "custom".
    preset: balanced
  1. Observe rejected documents in log.ndjson file and you can observe it is not indexed in logs-elastic_agent.filebeat.default index

For example

{
  "log.level": "debug",
  "@timestamp": "2025-06-26T11:23:26.408+0530",
  "log.origin": {
    "function": "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter.flushBulkIndexer",
    "file.name": "elasticsearchexporter@v0.127.0/bulkindexer.go",
    "file.line": 384
  },
  "message": "failed to index document; input may contain sensitive data",
  "log": {
    "source": "elastic-agent"
  },
  "resource": {},
  "otelcol.component.id": "elasticsearch/_agent-component/default",
  "otelcol.component.kind": "exporter",
  "otelcol.signal": "logs",
  "index": "logs-test-default",
  "error.type": "strict_dynamic_mapping_exception",
  "error.reason": "",
  "input": "{\n  \"create\": {\n    \"_index\": \"logs-test-default\"\n  }\n}\n{\n  \"event\": {\n    \"dataset\": \"test\"\n  },\n  \"elastic_agent\": {\n    \"id\": \"9ee42a04-1662-496e-9b88-e29cea822668\",\n    \"snapshot\": false,\n    \"version\": \"9.1.0\"\n  },\n  \"agent\": {\n    \"name\": \"Khushis-MacBook-Pro.local\",\n    \"type\": \"filebeat\",\n    \"id\": \"9ee42a04-1662-496e-9b88-e29cea822668\",\n    \"version\": \"9.1.0\",\n    \"ephemeral_id\": \"e54150e8-ae12-4415-84cd-e5ee8ef7915e\"\n  },\n  \"ecs\": {\n    \"version\": \"8.0.0\"\n  },\n  \"host\": {\n    \"name\": \"khushis-macbook-pro.local\",\n    \"hostname\": \"Khushis-MacBook-Pro.local\",\n    \"architecture\": \"arm64\",\n    \"os\": {\n      \"build\": \"24F74\",\n      \"type\": \"macos\",\n      \"platform\": \"darwin\",\n      \"version\": \"15.5\",\n      \"family\": \"darwin\",\n      \"name\": \"macOS\",\n      \"kernel\": \"24.5.0\"\n    },\n    \"id\": \"194CE554-9A32-579D-B00E-029E6412C7D4\",\n    \"ip\": [\n      \"fe80::8:efd2:c3b2:2502\",\n      \"192.168.29.114\",\n      \"2405:201:d00a:e8b1:4b:c2c3:30e0:8045\",\n      \"2405:201:d00a:e8b1:90b1:f578:e9ee:5f3c\",\n      \"fe80::642b:2bff:fe93:7ceb\",\n      \"fe80::642b:2bff:fe93:7ceb\",\n      \"fe80::4977:48e6:7d97:df8f\",\n      \"192.168.64.1\",\n      \"fe80::7cf3:4dff:feae:ef64\",\n      \"fd63:b2ca:f9c5:50e6:14:bb00:f72a:a174\",\n      \"fe80::21d6:b5bb:76e9:b890\",\n      \"fe80::8cd3:24f3:733e:e444\",\n      \"fe80::ce81:b1c:bd2c:69e\"\n    ],\n    \"mac\": [\n      \"1E-2B-07-D9-EC-0B\",\n      \"1E-2B-07-D9-EC-0C\",\n      \"1E-2B-07-D9-EC-0D\",\n      \"1E-2B-07-D9-EC-EB\",\n      \"1E-2B-07-D9-EC-EC\",\n      \"1E-2B-07-D9-EC-ED\",\n      \"36-B4-B1-E5-79-80\",\n      \"36-B4-B1-E5-79-84\",\n      \"36-B4-B1-E5-79-88\",\n      \"66-2B-2B-93-7C-EB\",\n      \"7E-F3-4D-AE-EF-64\",\n      \"96-3B-EB-76-C1-59\",\n      \"B2-41-D2-A9-2F-4E\",\n      \"DE-ED-2B-E5-DA-82\"\n    ]\n  },\n  \"message\": \"Thu Jun 26 11:22:56.476 [airport]/399 @[1879405.433913] (airportdMain.m:1730) Health check alive, checkin@[1879405.433850], count[15642] \",\n  \"user\": {\n    \"age\": \"20-\"\n  },\n  \"@timestamp\": \"2025-06-26T05:53:16.401Z\",\n  \"log\": {\n    \"offset\": 875412,\n    \"file\": {\n      \"path\": \"/var/log/wifi.log\",\n      \"device_id\": \"16777229\",\n      \"inode\": \"64177493\",\n      \"fingerprint\": \"6e11d2344496827f1629678a95e339364fbb0063ad58b1af715338b9d16ed44c\"\n    }\n  },\n  \"input\": {\n    \"type\": \"filestream\"\n  },\n  \"data_stream\": {\n    \"type\": \"logs\",\n    \"dataset\": \"test\",\n    \"namespace\": \"default\"\n  }\n}",
  "log.type": "event",
  "ecs.version": "1.6.0"
}

Related issues

@mergify
Copy link
Contributor

mergify bot commented Jun 30, 2025

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b eventLogs upstream/eventLogs
git merge upstream/main
git push upstream eventLogs

@mergify
Copy link
Contributor

mergify bot commented Jun 30, 2025

This pull request does not have a backport label. Could you fix it @khushijain21? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-./d./d is the label that automatically backports to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"regexp": map[string]interface{}{
"input": `\{"create":\{"_index":.*`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES exporter always uses a create action when indexing log records. If a document fails for any reason and the original input is part of the response - the log message will always contain

"input":"{\"create\":{\"_index\": some-index-name. .... // the rest of the input

We drop this event to ensure sensitive data is not sent to fleet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the collector is a sub-process, we could also look at directing these logs into our event log using a similar pattern to preserve the existing behavior where logs from events can't clutter the main logs. We could perhaps do this now I just know once we are a sub-process we'll be consuming the logs from the stdout/stderr stream instead of via our own in-process logger.

CC @pkoutsovasilis.

Copy link
Contributor Author

@khushijain21 khushijain21 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good and is possible to do.
This is the code change we would need to do when otel collector runs as sub process.

https://github.com/elastic/elastic-agent/compare/main...khushijain21:elastic-agent:collectorEvent?expand=

that would make the current changeset unnecessary - maybe we should wait until the collector runs as a subprocess by default. wdyt?

Copy link
Contributor Author

@khushijain21 khushijain21 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick testing and ndjson parser in filebeat monitoring https://github.com/khushijain21/elastic-agent/blob/eventLogs/internal/pkg/agent/application/monitoring/v1_monitor.go#L579 does not parse json recursively.

"input":"{\"create\":{\"_index\": some-index-name. .... // the rest of the input

This field is never parsed and indexed. We should be good even without dropping this event explicitly :))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc : @cmacknz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is never parsed and indexed. We should be good even without dropping this event explicitly :))

OK this is nice, I think we should have an explicit test for this behavior as it is a security concern (in agent, that these errors cannot be indexed). Can you add one as a follow up?

We have similar tests of the event log functionality.

@elastic-sonarqube
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[beat_receivers] Redact or drop logs that could contain event data in the monitoring filestream instance

3 participants