Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .chloggen/es-exporter-ecs-mode-dynamic-templates.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add per-document `dynamic_templates` for metrics in ECS mapping mode"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
# Replace 0 with your PR number once the PR is opened.
issues: [46499]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Each bulk index action for ECS metrics now includes dynamic_templates so Elasticsearch can apply the correct
mapping (e.g. histogram_metrics, summary_metrics, double_metrics) for the ECS mapping mode. The OTel mapping mode already sent dynamic_templates.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 16 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,22 @@ The metric types supported are:
- Exponential histogram (Delta temporality only)
- Summary

### Metrics dynamic templates

For metrics, the exporter sends **per-document `dynamic_templates`** with each bulk index action so that Elasticsearch can apply the correct mapping to metric fields. It uses the [bulk API `dynamic_templates` parameter](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk):

> A map from the full name of fields to the name of dynamic templates. It defaults to an empty map. If a name matches a dynamic template, that template will be applied regardless of other match predicates defined in the template. If a field is already defined in the mapping, then this parameter won't be used.

The index template must define dynamic templates whose names match the values sent by the exporter. Behavior depends on the mapping mode:

| Mapping mode | Field path in document | Template names sent | Notes |
| ------------ | ---------------------- | -------------------- | ----- |
| **OTel** | `metrics.<metric name>` | `histogram`, `summary`, `gauge_double`, `gauge_long`, `counter_double`, `counter_long` | The OTel data plugin defines more specific templates. |
| **ECS** | `metric.<metric name>` | `histogram_metrics`, `summary_metrics`, `double_metrics` | Relies on core templates in [metrics@mappings](https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json). Intended to match the [APM metrics ingest pipeline](https://github.com/elastic/elasticsearch/blob/b34960a2b450869aee2866e91c647e0026dd6953/x-pack/plugin/apm-data/src/main/resources/ingest-pipelines/metrics-apm%40pipeline.yaml). |

- **OTel**: Each metric is written under the `metrics` object; the bulk action maps full field names (e.g. `metrics.my_metric`) to one of the OTel template names above based on metric type (histogram, summary, gauge, or counter) and value type.
- **ECS**: Each metric is written as a top-level field `metric.<name>`; the bulk action maps that field name to one of the ECS/APM template names (`histogram_metrics`, `summary_metrics`, or `double_metrics` for gauges and counters).

## Exporting profiles

Profiles support is currently in development, and should not be used in
Expand Down
115 changes: 106 additions & 9 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,11 +1149,11 @@ func TestExporterMetrics(t *testing.T) {

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.foo":"histogram_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.foo":"histogram_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}}}`),
},
}
Expand Down Expand Up @@ -1192,7 +1192,7 @@ func TestExporterMetrics(t *testing.T) {

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.foo":"histogram_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[1,1,2,1,1],"values":[-24.0,-3.0,0.0,6.0,12.0]}}}`),
},
}
Expand Down Expand Up @@ -1295,11 +1295,11 @@ func TestExporterMetrics(t *testing.T) {

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.bar":"double_metrics","metric.foo":"histogram_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"bar":1.0}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.foo":"histogram_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}}}`),
},
}
Expand Down Expand Up @@ -1633,17 +1633,114 @@ func TestExporterMetrics(t *testing.T) {

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.foo":"summary_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"sum":1.5,"value_count":1}}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"metric.foo":"summary_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"sum":2.0,"value_count":3}}}`),
},
}

assertRecordedItems(t, expected, rec, false)
})

t.Run("ecs mode document with only invalid data points sends action without dynamic_templates", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL)

ts := pcommon.NewTimestampFromTime(time.Unix(0, 0))
metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
metricSlice := scopeMetrics.Metrics()

// Gauge with NaN causes Value() to return error; no attribute or dynamic template is added.
invalidGauge := metricSlice.AppendEmpty()
invalidGauge.SetName("invalid.gauge")
invalidDp := invalidGauge.SetEmptyGauge().DataPoints().AppendEmpty()
invalidDp.SetTimestamp(ts)
invalidDp.SetDoubleValue(math.NaN())

ctx := client.NewContext(t.Context(), client.Info{Metadata: client.NewMetadata(map[string][]string{"X-Elastic-Mapping-Mode": {"ecs"}})})
mustSendMetricsWithCtx(ctx, t, exporter, metrics)

docs := rec.WaitItems(1)
require.Len(t, docs, 1)
// Action must not contain dynamic_templates (or have empty map); document has no metric fields.
var actionMap map[string]any
require.NoError(t, json.Unmarshal(docs[0].Action, &actionMap))
create, ok := actionMap["create"].(map[string]any)
require.True(t, ok)
_, hasDynamicTemplates := create["dynamic_templates"]
assert.False(t, hasDynamicTemplates, "action should omit dynamic_templates when no metrics are valid")
assert.JSONEq(t,
`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"}}`,
string(docs[0].Document),
)
})

t.Run("ecs mode histogram summary gauge counter with dynamic_templates", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL)

ts := pcommon.NewTimestampFromTime(time.Unix(0, 0))
metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
metricSlice := scopeMetrics.Metrics()

gaugeMetric := metricSlice.AppendEmpty()
gaugeMetric.SetName("ecs.gauge")
gaugeDp := gaugeMetric.SetEmptyGauge().DataPoints().AppendEmpty()
gaugeDp.SetTimestamp(ts)
gaugeDp.SetDoubleValue(1.0)

counterMetric := metricSlice.AppendEmpty()
counterMetric.SetName("ecs.counter")
counterMetric.SetEmptySum().SetIsMonotonic(true)
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
counterDp := counterMetric.Sum().DataPoints().AppendEmpty()
counterDp.SetTimestamp(ts)
counterDp.SetDoubleValue(2.0)

histogramMetric := metricSlice.AppendEmpty()
histogramMetric.SetName("ecs.histogram")
histogram := histogramMetric.SetEmptyHistogram()
histogram.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
histogramDp := histogram.DataPoints().AppendEmpty()
histogramDp.SetTimestamp(ts)
histogramDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0})
histogramDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})

summaryMetric := metricSlice.AppendEmpty()
summaryMetric.SetName("ecs.summary")
summaryDp := summaryMetric.SetEmptySummary().DataPoints().AppendEmpty()
summaryDp.SetTimestamp(ts)
summaryDp.SetSum(5.0)
summaryDp.SetCount(10)

ctx := client.NewContext(t.Context(), client.Info{Metadata: client.NewMetadata(map[string][]string{"X-Elastic-Mapping-Mode": {"ecs"}})})
mustSendMetricsWithCtx(ctx, t, exporter, metrics)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"ecs.counter":"double_metrics","ecs.gauge":"double_metrics","ecs.histogram":"histogram_metrics","ecs.summary":"summary_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"ecs":{"counter":2.0,"gauge":1.0,"histogram":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]},"summary":{"sum":5.0,"value_count":10}}}`),
},
}
assertRecordedItems(t, expected, rec, false)
})
}

func TestExporterMetrics_Grouping(t *testing.T) {
Expand Down Expand Up @@ -1887,11 +1984,11 @@ func TestExporterMetrics_Grouping(t *testing.T) {
mustSendMetricsWithCtx(ctx, t, exporter, metrics)
expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"foo":"double_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"foo":1.0,"a":"c"}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Action: []byte(`{"create":{"_index":"metrics-generic-default","dynamic_templates":{"bar":"double_metrics","baz":"double_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"bar":1.0,"baz":1.0,"a":"b"}`),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

// DynamicTemplateMode selects which dynamic template names are returned by DynamicTemplate.
// OTel mode returns names matching Elasticsearch OTel metrics mapping; ECS mode returns
// names matching the APM metrics ingest pipeline (e.g. histogram_metrics, summary_metrics, double_metrics).
type DynamicTemplateMode int

const (
DynamicTemplateModeOTel DynamicTemplateMode = iota
DynamicTemplateModeECS
)

// DataPoint is an interface that allows specifying behavior for each type of data point
type DataPoint interface {
Timestamp() pcommon.Timestamp
StartTimestamp() pcommon.Timestamp
Attributes() pcommon.Map
Value() (pcommon.Value, error)
DynamicTemplate(pmetric.Metric) string
DynamicTemplate(metric pmetric.Metric, mode DynamicTemplateMode) string
DocCount() uint64
HasMappingHint(elasticsearch.MappingHint) bool
Metric() pmetric.Metric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ func (dp ExponentialHistogram) Value() (pcommon.Value, error) {
return vm, nil
}

func (dp ExponentialHistogram) DynamicTemplate(_ pmetric.Metric) string {
func (dp ExponentialHistogram) DynamicTemplate(_ pmetric.Metric, mode DynamicTemplateMode) string {
if mode == DynamicTemplateModeECS {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary_metrics"
}
return "histogram_metrics"
}
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ func (dp Histogram) Value() (pcommon.Value, error) {
return histogramToValue(dp.HistogramDataPoint, dp.metric)
}

func (dp Histogram) DynamicTemplate(_ pmetric.Metric) string {
func (dp Histogram) DynamicTemplate(_ pmetric.Metric, mode DynamicTemplateMode) string {
if mode == DynamicTemplateModeECS {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary_metrics"
}
return "histogram_metrics"
}

if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary"
}
Expand Down
15 changes: 11 additions & 4 deletions exporter/elasticsearchexporter/internal/datapoints/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ func (dp Number) Value() (pcommon.Value, error) {
return pcommon.Value{}, fmt.Errorf("invalid number data point %q, wrong ValueType %s", dp.metric.Name(), dp.ValueType())
}

func (dp Number) DynamicTemplate(metric pmetric.Metric) string {
func (dp Number) DynamicTemplate(metric pmetric.Metric, mode DynamicTemplateMode) string {
if mode == DynamicTemplateModeECS {
return "double_metrics"
}

switch metric.Type() {
case pmetric.MetricTypeSum:
sum := metric.Sum()
isCounter := sum.IsMonotonic() && sum.AggregationTemporality() == pmetric.AggregationTemporalityCumulative
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
if metric.Sum().IsMonotonic() && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
if isCounter {
return "counter_double"
}
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
if metric.Sum().IsMonotonic() && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
if isCounter {
return "counter_long"
}
return "gauge_long"
Expand All @@ -67,8 +73,9 @@ func (dp Number) DynamicTemplate(metric pmetric.Metric) string {
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
default:
return ""
}
return ""
}

func (Number) DocCount() uint64 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func (dp Summary) Value() (pcommon.Value, error) {
return vm, nil
}

func (Summary) DynamicTemplate(pmetric.Metric) string {
func (Summary) DynamicTemplate(_ pmetric.Metric, mode DynamicTemplateMode) string {
if mode == DynamicTemplateModeECS {
return "summary_metrics"
}
return "summary"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func serializeDataPoints(v *json.Visitor, dataPoints []datapoints.DataPoint, val
// so that the field is indexed into Elasticsearch with the correct mapping. The name should correspond to a
// dynamic template that is defined in ES mapping, e.g.
// https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json
dynamicTemplates["metrics."+metric.Name()] = dp.DynamicTemplate(metric)
dynamicTemplates["metrics."+metric.Name()] = dp.DynamicTemplate(metric, datapoints.DynamicTemplateModeOTel)
}
_ = v.OnObjectFinished()
if docCount != 0 {
Expand Down
7 changes: 6 additions & 1 deletion exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,12 @@ func (ecsDataPointsEncoder) encodeMetrics(
*validationErrors = append(*validationErrors, err)
continue
}
document.AddAttribute(dp.Metric().Name(), value)
metric := dp.Metric()
metricName := metric.Name()
document.AddAttribute(metricName, value)
if name := dp.DynamicTemplate(metric, datapoints.DynamicTemplateModeECS); name != "" {
document.AddDynamicTemplate(metricName, name)
}
}

err := document.Serialize(buf, true, metricsProtectedFields)
Expand Down
Loading
Loading