Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/elasticsearchexporter-refactor-encoder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: drop support for metrics for none, raw, and bodymap mapping modes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37928]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Metrics support is in development, and was added for "ecs" and "otel" mapping modes.
Support was unintentionally added for the other mapping modes, defaulting to the same
behaviour as "ecs" mode. While metrics support is still in development, drop support
from these mapping modes and require users to use the intended mapping modes.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
67 changes: 65 additions & 2 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,78 @@ behaviours, which may be configured through the following settings:
body is serialized to JSON as-is and becomes a separate document for ingestion.
If the log record body is not a map, the exporter will log a warning and drop the log record.

#### OTel mapping mode

In `otel` mapping mode, the Elasticsearch Exporter stores documents in an OTel-native schema.

| Signal | Supported |
| --------- | ------------------ |
| Logs | :white_check_mark: |
| Traces | :white_check_mark: |
| Metrics | :white_check_mark: |
| Profiles | :white_check_mark: |

#### ECS mapping mode

> [!WARNING]
> The ECS mode mapping mode is currently undergoing changes, and its behaviour is unstable.

In ECS mapping mode, the Elasticsearch Exporter attempts to map fields from
[OpenTelemetry Semantic Conventions][SemConv] (version 1.22.0) to [Elastic Common Schema][ECS].
In `ecs` mapping mode, the Elasticsearch Exporter maps fields from
[OpenTelemetry Semantic Conventions][SemConv] (version 1.22.0) to [Elastic Common Schema][ECS] where possible.
This mode may be used for compatibility with existing dashboards that work with ECS.

| Signal | `ecs` |

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consolidating these separate tables into a single table could help? This could be done as a follow-up pull request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe, I was on the fence - I started doing it in a single table and ended up splitting it when I added new sections. I don't have a strong opinion. WDYT @carsonip?

@carsonip carsonip Feb 21, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks fine to me now. We can refactor the docs later, especially when we start documenting OTel field mapping

| --------- | ------------------ |
| Logs | :white_check_mark: |
| Traces | :white_check_mark: |
| Metrics | :white_check_mark: |
| Profiles | :no_entry_sign: |

#### Bodymap mapping mode

> [!WARNING]
> The Bodymap mode mapping mode is currently undergoing changes, and its behaviour is unstable.

In `bodymap` mapping mode, the Elasticsearch Exporter supports only logs and will take the "body"
of a log record as the exact content of the Elasticsearch document without any transformation.
This mapping mode is intended for use cases where the client wishes to have complete control over
the Elasticsearch document structure.

| Signal | `bodymap` |
| --------- | ------------------ |
| Logs | :white_check_mark: |
| Traces | :no_entry_sign: |
| Metrics | :no_entry_sign: |
| Profiles | :no_entry_sign: |

#### Default (none) mapping mode

In the `none` mapping mode the Elasticsearhc Exporter produces documents with the original
field names of from the OTLP data structures.

| Signal | `none` |
| --------- | ------------------ |
| Logs | :white_check_mark: |
| Traces | :white_check_mark: |
| Metrics | :no_entry_sign: |
| Profiles | :no_entry_sign: |

#### Raw mapping mode

The `raw` mapping mode is identical to `none`, except for two differences:

- In `none` mode attributes are mapped with an `Attributes.` prefix,
while in `raw` mode they are not.
- In `none` mode span events are mapped with an `Events.` prefix,
while in `raw` mode they are not.

| Signal | `raw ` |
| --------- | ------------------ |
| Logs | :white_check_mark: |
| Traces | :white_check_mark: |
| Metrics | :no_entry_sign: |
| Profiles | :no_entry_sign: |

### Elasticsearch ingest pipeline

Documents may be optionally passed through an [Elasticsearch Ingest pipeline] prior to indexing.
Expand Down
128 changes: 83 additions & 45 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ type elasticsearchExporter struct {
component.TelemetrySettings
userAgent string

config *Config
index string
dynamicIndex bool
model mappingModel
config *Config
index string
dynamicIndex bool
logstashFormat LogstashFormatSettings

wg sync.WaitGroup // active sessions
bulkIndexer bulkIndexer
Expand All @@ -58,10 +58,6 @@ func newExporter(
index string,
dynamicIndex bool,
) *elasticsearchExporter {
model := &encodeModel{
mode: cfg.MappingMode(),
}

userAgent := fmt.Sprintf(
"%s/%s (%s/%s)",
set.BuildInfo.Description,
Expand All @@ -74,11 +70,11 @@ func newExporter(
TelemetrySettings: set.TelemetrySettings,
userAgent: userAgent,

config: cfg,
index: index,
dynamicIndex: dynamicIndex,
model: model,
bufferPool: pool.NewBufferPool(),
config: cfg,
index: index,
dynamicIndex: dynamicIndex,
logstashFormat: cfg.LogstashFormat,
bufferPool: pool.NewBufferPool(),
}
}

Expand Down Expand Up @@ -159,6 +155,10 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
mappingMode := e.config.MappingMode()
router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config)
encoder, err := newEncoder(mappingMode)
if err != nil {
return err
}

e.wg.Add(1)
defer e.wg.Done()
Expand All @@ -178,9 +178,16 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
for j := 0; j < ills.Len(); j++ {
ill := ills.At(j)
scope := ill.Scope()
ec := encodingContext{
resource: resource,
resourceSchemaURL: rl.SchemaUrl(),
scope: scope,
scopeSchemaURL: ill.SchemaUrl(),
}

logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, router, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil {
if err := e.pushLogRecord(ctx, router, encoder, ec, logs.At(k), session); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand Down Expand Up @@ -208,21 +215,19 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
func (e *elasticsearchExporter) pushLogRecord(
ctx context.Context,
router documentRouter,
resource pcommon.Resource,
resourceSchemaURL string,
encoder documentEncoder,
ec encodingContext,
record plog.LogRecord,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
index, err := router.routeLogRecord(resource, scope, record.Attributes())
index, err := router.routeLogRecord(ec.resource, ec.scope, record.Attributes())
if err != nil {
return err
}

buf := e.bufferPool.NewPooledBuffer()
docID := e.extractDocumentIDAttribute(record.Attributes())
if err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, index, buf.Buffer); err != nil {
if err := encoder.encodeLog(ec, record, index, buf.Buffer); err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode log event: %w", err)
}
Expand Down Expand Up @@ -250,6 +255,10 @@ func (e *elasticsearchExporter) pushMetricsData(
mappingMode := e.config.MappingMode()
router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config)
hasher := newDataPointHasher(mappingMode)
encoder, err := newEncoder(mappingMode)
if err != nil {
return err
}

e.wg.Add(1)
defer e.wg.Done()
Expand Down Expand Up @@ -361,10 +370,17 @@ func (e *elasticsearchExporter) pushMetricsData(
for index, groupedDataPoints := range groupedDataPointsByIndex {
for _, dpGroup := range groupedDataPoints {
buf := e.bufferPool.NewPooledBuffer()
dynamicTemplates, err := e.model.encodeMetrics(
dpGroup.resource, dpGroup.resourceSchemaURL,
dpGroup.scope, dpGroup.scopeSchemaURL,
dpGroup.dataPoints, &validationErrs, index, buf.Buffer,
dynamicTemplates, err := encoder.encodeMetrics(
encodingContext{
resource: dpGroup.resource,
resourceSchemaURL: dpGroup.resourceSchemaURL,
scope: dpGroup.scope,
scopeSchemaURL: dpGroup.scopeSchemaURL,
},
dpGroup.dataPoints,
&validationErrs,
index,
buf.Buffer,
)
if err != nil {
buf.Recycle()
Expand Down Expand Up @@ -399,6 +415,10 @@ func (e *elasticsearchExporter) pushTraceData(
) error {
mappingMode := e.config.MappingMode()
router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config)
encoder, err := newEncoder(mappingMode)
if err != nil {
return err
}

e.wg.Add(1)
defer e.wg.Done()
Expand All @@ -418,18 +438,25 @@ func (e *elasticsearchExporter) pushTraceData(
for j := 0; j < scopeSpans.Len(); j++ {
scopeSpan := scopeSpans.At(j)
scope := scopeSpan.Scope()
ec := encodingContext{
resource: resource,
resourceSchemaURL: il.SchemaUrl(),
scope: scope,
scopeSchemaURL: scopeSpan.SchemaUrl(),
}

spans := scopeSpan.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if err := e.pushTraceRecord(ctx, router, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil {
if err := e.pushTraceRecord(ctx, router, encoder, ec, span, session); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
errs = append(errs, err)
}
for ii := 0; ii < span.Events().Len(); ii++ {
spanEvent := span.Events().At(ii)
if err := e.pushSpanEvent(ctx, router, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil {
if err := e.pushSpanEvent(ctx, router, encoder, ec, span, spanEvent, session); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -449,20 +476,18 @@ func (e *elasticsearchExporter) pushTraceData(
func (e *elasticsearchExporter) pushTraceRecord(
ctx context.Context,
router documentRouter,
resource pcommon.Resource,
resourceSchemaURL string,
encoder documentEncoder,
ec encodingContext,
span ptrace.Span,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
index, err := router.routeSpan(resource, scope, span.Attributes())
index, err := router.routeSpan(ec.resource, ec.scope, span.Attributes())
if err != nil {
return err
}

buf := e.bufferPool.NewPooledBuffer()
if err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, index, buf.Buffer); err != nil {
if err := encoder.encodeSpan(ec, span, index, buf.Buffer); err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode trace record: %w", err)
}
Expand All @@ -473,24 +498,21 @@ func (e *elasticsearchExporter) pushTraceRecord(
func (e *elasticsearchExporter) pushSpanEvent(
ctx context.Context,
router documentRouter,
resource pcommon.Resource,
resourceSchemaURL string,
encoder documentEncoder,
ec encodingContext,
span ptrace.Span,
spanEvent ptrace.SpanEvent,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
index, err := router.routeSpanEvent(resource, scope, spanEvent.Attributes())
index, err := router.routeSpanEvent(ec.resource, ec.scope, spanEvent.Attributes())
if err != nil {
return err
}

buf := e.bufferPool.NewPooledBuffer()
e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, index, buf.Buffer)
if buf.Buffer.Len() == 0 {
if err := encoder.encodeSpanEvent(ec, span, spanEvent, index, buf.Buffer); err != nil || buf.Buffer.Len() == 0 {
buf.Recycle()
return nil
return err
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil, docappender.ActionCreate)
Expand All @@ -509,6 +531,13 @@ func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string
}

func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofile.Profiles) error {
// TODO add support for routing profiles to different data_stream.namespaces?
mappingMode := e.config.MappingMode()
encoder, err := newEncoder(mappingMode)
if err != nil {
return err
}

e.wg.Add(1)
defer e.wg.Done()

Expand Down Expand Up @@ -549,7 +578,16 @@ func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofil
scope := sp.Scope()
p := sp.Profiles()
for k := 0; k < p.Len(); k++ {
if err := e.pushProfileRecord(ctx, resource, p.At(k), scope, defaultSession, eventsSession, stackTracesSession, stackFramesSession, executablesSession); err != nil {
ec := encodingContext{
resource: resource,
resourceSchemaURL: rp.SchemaUrl(),
scope: scope,
scopeSchemaURL: sp.SchemaUrl(),
}
if err := e.pushProfileRecord(
ctx, encoder, ec, p.At(k), defaultSession, eventsSession,
stackTracesSession, stackFramesSession, executablesSession,
); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand Down Expand Up @@ -601,12 +639,12 @@ func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofil

func (e *elasticsearchExporter) pushProfileRecord(
ctx context.Context,
resource pcommon.Resource,
record pprofile.Profile,
scope pcommon.InstrumentationScope,
encoder documentEncoder,
ec encodingContext,
profile pprofile.Profile,
defaultSession, eventsSession, stackTracesSession, stackFramesSession, executablesSession bulkIndexerSession,
) error {
return e.model.encodeProfile(resource, scope, record, func(buf *bytes.Buffer, docID, index string) error {
return encoder.encodeProfile(ec, profile, func(buf *bytes.Buffer, docID, index string) error {
switch index {
case otelserializer.StackTraceIndex:
return stackTracesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
enabled: false
sending_queue:
enabled: true
mapping:
mode: otel
retry:
enabled: true
initial_interval: 100ms
Expand Down
Loading