Skip to content

[Observability Onboarding] Wired streams onboarding#249705

Merged
rStelmach merged 63 commits into
elastic:mainfrom
rStelmach:301-wired-streams-onboarding
Feb 27, 2026
Merged

[Observability Onboarding] Wired streams onboarding#249705
rStelmach merged 63 commits into
elastic:mainfrom
rStelmach:301-wired-streams-onboarding

Conversation

@rStelmach
Copy link
Copy Markdown
Contributor

@rStelmach rStelmach commented Jan 20, 2026

closes: https://github.com/elastic/streams-program/issues/301

Summary

This PR adds an opt-in Wired Streams ingestion mode to onboarding quickstart flows while keeping classic ingestion as default. It also fixes a document_parsing_exception for ECS streams caused by a field mapping conflict between stream (a string field from Kubernetes container logs) and stream.name (an object-expanded field used by the streams platform).

Flows Covered

Flow Implementation
OTel Host (direct ES exporter) Inject logs_index: logs under elasticsearch/otel via sed/PowerShell
OTel Host (Managed OTLP) Inject resource/wired_streams processor with elasticsearch.index: logs and attach it only to logs/platformlogs pipeline
OTel Kubernetes (direct ES exporter) Helm --set 'collectors.gateway.config.exporters.elasticsearch\/otel.logs_index=logs'
OTel Kubernetes (Managed OTLP) Helm --set for resource/wired_streams processor (elasticsearch.index=logs) and add it to logs pipeline
Elastic Agent Kubernetes Use _write_to_logs_streams=true + @metadata.raw_index=logs on container logs processor settings
Auto-Detect Route only custom logs to Wired Streams (@metadata.raw_index=logs); keep registry integrations on classic routing

What Was Added

  1. Ingestion selector to switch between Classic ingestion and Wired Streams.
  2. Auto-enable with confirmation modal when Wired Streams is selected and not enabled yet; on failure, flow falls back to Classic and shows a toast.
  3. Telemetry (observability_onboarding_wired_streams_auto_enabled) with:
    • flow_type
    • success
    • error_message (on failure)
  4. Flow-specific routing behavior so we avoid breaking known dashboard/standard processing paths (e.g. registry integrations remain classic in auto-detect).

ECS Stream Mapping Fix (subobjects: false)

Problem

When Kubernetes container logs are ingested into ECS wired streams (logs.ecs), the documents contain a top-level stream field (with values like stdout/stderr). The streams platform also sets stream.name via an ingest pipeline script. Elasticsearch interprets stream.name as a nested object property under stream, conflicting with the existing stream string value and causing a document_parsing_exception:

object mapping for [stream] tried to parse field [stream] as object, but found a concrete value

The OTel path avoids this because the OTel Collector renames stream to log.iostream before indexing.

Fix

Set subobjects: false on ECS stream component template mappings. This tells Elasticsearch to treat dotted field names (like stream.name) as flat literal keys rather than expanding them into nested object hierarchies. With this setting, stream (string) and stream.name (constant_keyword) coexist without conflict.

Changes

  • generate_layer.ts: Conditionally apply subobjects: false to ECS stream component templates using ...(isEcsStream && { subobjects: false as const }).
  • generate_layer.test.ts: Added assertions verifying subobjects: false is present for both ECS root and child streams.
  • basic.ts (API integration tests):
    • Updated _source assertions for ECS streams: stream.name is now a flat dotted key (accessed as result._source['stream.name']) instead of a nested property (result._source.stream.name).
    • Updated expectFields helper to handle ECS mappings where dotted field names are literal property keys rather than nested properties paths.

Why Managed OTLP Needed A Different Approach

logs_index works with direct elasticsearch/otel exporter config, but Managed OTLP sends to MOTel (otlp/ingest).
For Managed OTLP, the working mechanism is dynamic routing via resource attribute:

resource/wired_streams:
  attributes:
    - action: upsert
      key: elasticsearch.index
      value: logs

This is applied only to logs pipelines in onboarding-generated snippets/commands.

Validation

  • Unit tests updated and passing for OTel Host/Kubernetes command builders, including Managed OTLP wired-streams injection behavior.
  • Unit tests updated and passing for generate_layer component template generation, verifying subobjects: false for ECS streams.
  • API integration tests updated and passing for ECS stream flows (basic.ts), accounting for flat dotted key _source representation and mapping structure under subobjects: false.
  • Manual serverless validation confirms:
    • with elasticsearch.index=logs -> documents land in logs stream/index.
    • without that attribute -> documents land in classic logs-generic.otel-default.

How To Test

Please follow the onboarding flow test guide:
https://github.com/elastic/observability-dev/tree/main/docs/obs-onboarding/onboarding-flows

Demo

(skip the middle part while waiting for data)

new-demo.md.mov

@rStelmach rStelmach added backport:skip This PR does not require backporting release_note:feature Makes this part of the condensed release notes Team:obs-onboarding Observability Onboarding Team Feature: Observability Onboarding Feature:Streams This is the label for the Streams Project labels Jan 23, 2026
@rStelmach rStelmach marked this pull request as ready for review January 23, 2026 11:18
@rStelmach rStelmach requested a review from a team as a code owner January 23, 2026 11:18
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/obs-onboarding-team (Team:obs-onboarding)

@rStelmach rStelmach changed the title Wired streams onboarding [Observability Onboarding] Wired streams onboarding Jan 23, 2026
@rStelmach rStelmach added release_note:skip Skip the PR/issue when compiling release notes and removed release_note:feature Makes this part of the condensed release notes labels Jan 23, 2026
@rStelmach
Copy link
Copy Markdown
Contributor Author

/ci

@flash1293
Copy link
Copy Markdown
Contributor

@rStelmach about the workaround you had to take for auto-detect - did you check with the fleets team? They might now about another approach. I don't think it's too bad, but I think it's worth asking.

@rStelmach
Copy link
Copy Markdown
Contributor Author

@flash1293

Pushed some changes to make onboarding flows work with 'logs' endpoint split. Tested each flow, seems to be working. Ready for another review

Copy link
Copy Markdown
Contributor

@flash1293 flash1293 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Elastic agent tested on linux, works as expected
  • OTel host flow still sends you to logs,logs.* - should be logs.otel,logs.otel.* now
  • Elastic agent k8s flow fails with "INSTALLATION FAILED: chart "elastic-agent" matching 9.3.1 not found in elastic index." - is this a general problem? We should check
  • Otel k8s flow also sends you to logs,logs.* Not all panels of the k8s standard dashboard work, but that's probably unrelated:
Image

const wiredStreamsConfig = (() => {
if (!useWiredStreams) return '';

// Route container logs (and APM logs when available) to wired streams by
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the APM logs out of wired streams, we have an issue to support this properly but we aren't there yet.

@rStelmach rStelmach requested a review from a team as a code owner February 27, 2026 15:08
stream.processors.unshift({
add_fields: {
target: '@metadata',
fields: { raw_index: 'logs' },
Copy link
Copy Markdown
Contributor

@mohamedhamed-ahmed mohamedhamed-ahmed Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with this part, but what does the raw_index here do exactly and should it really be logs or logs.ecs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metadata.raw_index is a metadata field that Elastic Agent uses to override default index routing. Instead of sending documents to the usual logs-{dataset}-{namespace}, they get routed to whichever index is specified, in this case the root logs wired stream. From there, the stream platform's normalize_for_stream processor inspects each document and routes it to logs.ecs or logs.otel based on its shape.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, I think Mohamed is right, this sounds wrong. It should either go to logs.ecs or logs.otel directly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure how normalize_for_stream does that, I don't think we have internal routing in this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, it doesn't. logs is deprecated, we shouldn't send data there

try {
const result = await esClient.search({
index: ['logs-*', 'metrics-*'],
index: ['logs-*', 'metrics-*', 'logs', 'logs.*'],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need logs here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add it because without these additional index patterns, the onboarding "waiting for data" check never detected incoming documents when the user selected Wired Streams mode, and the flow appeared stuck.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't they be going to logs.otel or logs.ecs and both are present as part of logs.* ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we shouldn't rely on the old logs,logs.* anywhere

expect(stream.processors![0]).toEqual({
add_fields: {
target: '@metadata',
fields: { raw_index: 'logs' },
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might have already noticed it but just in case, we need to fix this test as well.

try {
const result = await esClient.search({
index: ['logs-*', 'metrics-*', 'logs', 'logs.*'],
index: ['logs-*', 'metrics-*', 'logs.*'],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but @flash1293 do you think logs.* is alright here or should we rather explicitly put logs.ecs & logs.otel ?

Copy link
Copy Markdown
Contributor

@flash1293 flash1293 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this check it's OK I'd say - it's just a behind the scenes thing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rStelmach will look into this anyway as part of the new onboarding project

Copy link
Copy Markdown
Contributor

@mohamedhamed-ahmed mohamedhamed-ahmed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the fixes and for the great work here

stream.processors.unshift({
add_fields: {
target: '@metadata',
fields: { raw_index: 'logs.ecs' },
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flash1293 I assumed that we want logs.ecs here since fleet integrations use ECS fields if I am not wrong. That said, I guess they might just work fine with either logs.ecs or logs.ote wdyt? shoudl we keep it ecs here as suggested?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logs.ecs seems right to me. Yeah, these are used by Elastic agent, so they ship ECS data

@elasticmachine
Copy link
Copy Markdown
Contributor

⏳ Build in-progress

  • Buildkite Build
  • Commit: 3975b7d
  • Kibana Serverless Image: docker.elastic.co/kibana-ci/kibana-serverless:pr-249705-3975b7d0bd63

History

@rStelmach rStelmach merged commit 51258f6 into elastic:main Feb 27, 2026
16 checks passed
qn895 pushed a commit to qn895/kibana that referenced this pull request Mar 11, 2026
closes: elastic/streams-program#301

## Summary

This PR adds an opt-in Wired Streams ingestion mode to onboarding
quickstart flows while keeping classic ingestion as default. It also
fixes a `document_parsing_exception` for ECS streams caused by a field
mapping conflict between `stream` (a string field from Kubernetes
container logs) and `stream.name` (an object-expanded field used by the
streams platform).

## Flows Covered

| Flow | Implementation |
| ---------------------------------------- |
-----------------------------------------------------------------------------------------------------------------------------
|
| **OTel Host (direct ES exporter)** | Inject `logs_index: logs` under
`elasticsearch/otel` via `sed`/PowerShell |
| **OTel Host (Managed OTLP)** | Inject `resource/wired_streams`
processor with `elasticsearch.index: logs` and attach it only to
`logs/platformlogs` pipeline |
| **OTel Kubernetes (direct ES exporter)** | Helm `--set
'collectors.gateway.config.exporters.elasticsearch\/otel.logs_index=logs'`
|
| **OTel Kubernetes (Managed OTLP)** | Helm `--set` for
`resource/wired_streams` processor (`elasticsearch.index=logs`) and add
it to logs pipeline |
| **Elastic Agent Kubernetes** | Use `_write_to_logs_streams=true` +
`@metadata.raw_index=logs` on container logs processor settings |
| **Auto-Detect** | Route only custom logs to Wired Streams
(`@metadata.raw_index=logs`); keep registry integrations on classic
routing |

## What Was Added

1. **Ingestion selector** to switch between `Classic ingestion` and
`Wired Streams`.
2. **Auto-enable with confirmation modal** when Wired Streams is
selected and not enabled yet; on failure, flow falls back to Classic and
shows a toast.
3. **Telemetry** (`observability_onboarding_wired_streams_auto_enabled`)
with:
   - `flow_type`
   - `success`
   - `error_message` (on failure)
4. **Flow-specific routing behavior** so we avoid breaking known
dashboard/standard processing paths (e.g. registry integrations remain
classic in auto-detect).

## ECS Stream Mapping Fix (`subobjects: false`)

### Problem

When Kubernetes container logs are ingested into ECS wired streams
(`logs.ecs`), the documents contain a top-level `stream` field (with
values like `stdout`/`stderr`). The streams platform also sets
`stream.name` via an ingest pipeline script. Elasticsearch interprets
`stream.name` as a nested object property under `stream`, conflicting
with the existing `stream` string value and causing a
`document_parsing_exception`:

```
object mapping for [stream] tried to parse field [stream] as object, but found a concrete value
```

The OTel path avoids this because the OTel Collector renames `stream` to
`log.iostream` before indexing.

### Fix

Set `subobjects: false` on ECS stream component template mappings. This
tells Elasticsearch to treat dotted field names (like `stream.name`) as
flat literal keys rather than expanding them into nested object
hierarchies. With this setting, `stream` (string) and `stream.name`
(constant_keyword) coexist without conflict.

### Changes

- **`generate_layer.ts`**: Conditionally apply `subobjects: false` to
ECS stream component templates using `...(isEcsStream && { subobjects:
false as const })`.
- **`generate_layer.test.ts`**: Added assertions verifying `subobjects:
false` is present for both ECS root and child streams.
- **`basic.ts` (API integration tests)**:
- Updated `_source` assertions for ECS streams: `stream.name` is now a
flat dotted key (accessed as `result._source['stream.name']`) instead of
a nested property (`result._source.stream.name`).
- Updated `expectFields` helper to handle ECS mappings where dotted
field names are literal property keys rather than nested `properties`
paths.

## Why Managed OTLP Needed A Different Approach

`logs_index` works with direct `elasticsearch/otel` exporter config, but
Managed OTLP sends to MOTel (`otlp/ingest`).
For Managed OTLP, the working mechanism is dynamic routing via resource
attribute:

```yaml
resource/wired_streams:
  attributes:
    - action: upsert
      key: elasticsearch.index
      value: logs
```

This is applied only to logs pipelines in onboarding-generated
snippets/commands.

## Validation

- **Unit tests** updated and passing for OTel Host/Kubernetes command
builders, including Managed OTLP wired-streams injection behavior.
- **Unit tests** updated and passing for `generate_layer` component
template generation, verifying `subobjects: false` for ECS streams.
- **API integration tests** updated and passing for ECS stream flows
(`basic.ts`), accounting for flat dotted key `_source` representation
and mapping structure under `subobjects: false`.
- **Manual serverless validation** confirms:
- with `elasticsearch.index=logs` -> documents land in `logs`
stream/index.
- without that attribute -> documents land in classic
`logs-generic.otel-default`.

## How To Test

Please follow the onboarding flow test guide:  

https://github.com/elastic/observability-dev/tree/main/docs/obs-onboarding/onboarding-flows


## Demo

(skip the middle part while waiting for data)


https://github.com/user-attachments/assets/080c5543-59c6-4b61-92b6-7e4ff15fd4d4

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
rStelmach added a commit that referenced this pull request Apr 7, 2026
… onboarding flows (#257870)

closes: [elastic/observability-dev#5296
](elastic/observability-dev#5296)
## Summary 📚 

This PR adds data detection and loading indicators to all observability
onboarding flows. Previously, flows either had no data detection (OTel
Host, OTel APM, Cloud Forwarder) or lacked separation between logs and
metrics arrival (OTel K8s). Now there is a progress indicator while data
is being ingested, troubleshooting guidance if data doesn't arrive, and
contextual action links once the relevant data types are confirmed.


### Detection strategies per flow

| Flow | Detection | Polling trigger | Index patterns | Correlation |
|---|---|---|---|---|
| **OTel Host** | Time-window | Blur (user runs command) |
`logs-*.otel-*`, `logs.otel`, `metrics-*.otel-*` | `@timestamp >=
sessionStart` |
| **OTel APM** | Time-window | Blur (user runs command) | `traces-apm*`,
`logs-apm*`, `metrics-apm*`, `*-*.otel-*`, `apm-*` | `@timestamp >=
sessionStart` |
| **Cloud Forwarder** | Time-window | Button click (launches
CloudFormation) | `logs-aws.{vpcflow,elbaccess,cloudtrail}.otel-*` |
`@timestamp >= sessionStart` |
| **OTel K8s** | Correlation ID | Blur (user runs command) | `logs-*`,
`logs.*`, `metrics-*`, `metrics.*` | `onboarding.id` via Helm `--set`
processor |
| **K8s EA** | Correlation ID | Blur (existing) | `logs-*`, `logs.*`,
`metrics-*`, `metrics.*` | `fields.onboarding_id` (existing) |

### What's new

**Client-side**

- **`useTimeWindowDataDetection` hook:** shared hook replacing ~150
lines of duplicated polling/state/telemetry logic across OTel Host, OTel
APM, and Cloud Forwarder flows. Handles configurable polling interval,
telemetry on data received, and troubleshooting visibility after a
delay. Treats fetch failures as "no data" to prevent infinite spinner.
- **`DataIngestStatus` generalized:** now accepts `actionLinks` with a
`requires` field (`'any' | 'logs' | 'metrics' | 'traces'`) for
conditional link visibility. Separates telemetry (fires on any data)
from step completion (fires when all required data types arrive). Drives
both K8s EA and OTel K8s flows.
- **`ActionLink.requires`** on `GetStartedPanel`: action links declare
which data type they need, so Dashboard links wait for metrics while
Explore Logs links appear as soon as logs arrive.
- **Step status indicators:** all flows now show `current` → `complete`
step status based on actual data arrival.

**Server-side**

- **4 new `has-data` routes:** `/cloudforwarder/has-data`,
`/otel_apm/has-data`, `/otel_host/has-data`, and enhanced
`/kubernetes/has-data` (now returns `hasLogs`/`hasMetrics` separately).
- **Shared error handler** (`isNoShardsAvailableError` /
`throwHasDataSearchError`): deduplicates identical catch blocks across
all has-data routes.
- **K8s route improvements:** removed deprecated bare `logs`/`metrics`
root indices (per #249705), added runtime field with `._rt` suffix to
avoid shadowing indexed fields on classic streams, queries both
`resource.attributes.onboarding.id` and `labels.onboarding_id` for broad
correlation coverage.
- **Cloudforwarder `logType` validation:** `t.keyof({ vpcflow,
elbaccess, cloudtrail })` replaces unsafe `t.string` + `as LogType`
cast.

**OTel K8s Helm command**

- Injects `resource/onboarding_id` processor via `--set` flags to tag
data with `onboarding.id` resource attribute.
- Processor index management documented: base values file uses
`processors[0..7]`, custom processors start at index 8.

**E2E Ensemble tests**

- Updated OTel Host and OTel K8s specs to dispatch blur + wait for data
indicator instead of static timeouts.
- Updated POMs with correct `data-test-subj` selectors for the new
`GetStartedPanel`-based action links.

## Notes 📓 

### Why not inject `onboarding.id` into every flow for exact
correlation?

Injecting a correlation ID (like `onboarding.id`) as an OTel resource
attribute is permanent, it stays on all events from that collector for
its entire lifetime, not just during onboarding. For K8s OTel this
trade-off is acceptable because the Helm chart is an explicit deployment
that can be reconfigured, and multi-cluster disambiguation benefits from
a stable ID. For OTel Host, OTel APM, and Cloud Forwarder, we chose
time-window detection (`@timestamp >= start`) to avoid persisting a
non-functional field in production data.

### Known edge cases of the detection strategies used in this PR:
- **Time-window false positives (OTel Host, APM, Cloud Forwarder):** If
the cluster already has data in the queried indices (e.g.
`logs-*.otel-*`, `logs.otel`) from a previous setup, the `@timestamp >=
sessionStart` query will match pre-existing data and report "ready"
before the new collector actually sends anything.
- **Correlation ID persistence (K8s OTel):** The `onboarding.id`
resource attribute injected via Helm is permanent, it stays on all
events from that collector for its entire lifetime, not just during
onboarding. This was an intentional trade-off for exact correlation (no
false positives), but it means a non-functional field persists in
production data.
- **K8s EA `fields.onboarding_id` (existing, unchanged):** Same
persistence concern - the Elastic Agent integration injects
`onboarding_id` into all events permanently.


## How to test 🧪 
Use this guide to test each flow:
https://github.com/elastic/observability-dev/tree/main/docs/obs-onboarding/onboarding-flows

## Demos 🎥 




https://github.com/user-attachments/assets/9cfa710b-8fbc-4112-bfac-d93fcf27027f




https://github.com/user-attachments/assets/351768a6-e034-4d33-b895-b049d0a0c514



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Event-driven data monitoring with live progress indicators and
conditional troubleshooting guidance
* New data-availability APIs for Cloud Forwarder, Kubernetes, APM, and
Host flows
* Onboarding identifier surfaced and propagated to improve tracking and
deep links

* **Refactor**
* Generalized data ingest/status UI to support multiple onboarding flows
and richer action links
  * Unified time-window data detection and monitoring logic across flows
  * Improved handling of transient Elasticsearch shard unavailability

* **Tests**
* E2E tests updated to use event-driven checks instead of fixed timeouts
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport:skip This PR does not require backporting ci:project-deploy-observability Create an Observability project Feature: Observability Onboarding Feature:Streams This is the label for the Streams Project release_note:skip Skip the PR/issue when compiling release notes Team:Fleet Team label for Observability Data Collection Fleet team Team:obs-onboarding Observability Onboarding Team v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants