diff --git a/.chloggen/elasticsearchexporter_otel-mode-metrics.yaml b/.chloggen/elasticsearchexporter_otel-mode-metrics.yaml new file mode 100644 index 0000000000000..50607b3a37b26 --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add OTel mapping mode for metrics + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34248] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 4d3ab6068ebf1..93e909cc3a3b1 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -28,7 +28,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, document io.WriterTo) error + Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -108,8 +108,8 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo) error { - return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document}) +func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { + return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}) } // End is a no-op. @@ -243,10 +243,11 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { item := docappender.BulkIndexerItem{ - Index: index, - Body: document, + Index: index, + Body: document, + DynamicTemplates: dynamicTemplates, } select { case <-ctx.Done(): diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 4082073c9c850..a70cdecc25ddb 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -67,7 +67,7 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) assert.NoError(t, bulkIndexer.Close(context.Background())) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) } @@ -106,7 +106,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -164,7 +164,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index bff526042b948..1ea18a46e7549 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -175,7 +175,7 @@ func (e *elasticsearchExporter) pushLogRecord( if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document)) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -215,7 +215,8 @@ func (e *elasticsearchExporter) pushMetricsData( resourceDocs[fIndex] = make(map[uint32]objmodel.Document) } - if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil { + if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, + resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp, dpValue); err != nil { return err } return nil @@ -290,7 +291,7 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes)); err != nil { + if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -397,5 +398,5 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document)) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index da922d9b39cd1..6d1de038abb3a 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -496,7 +496,9 @@ func TestExporterMetrics(t *testing.T) { return itemsAllOK(docs) }) - exporter := newTestMetricsExporter(t, server.URL) + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "ecs" + }) dp := pmetric.NewNumberDataPoint() dp.SetDoubleValue(123.456) dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) @@ -519,6 +521,7 @@ func TestExporterMetrics(t *testing.T) { exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { cfg.MetricsIndex = "metrics.index" + cfg.Mapping.Mode = "ecs" }) metrics := newMetricsWithAttributeAndResourceMap( map[string]string{ @@ -549,6 +552,7 @@ func TestExporterMetrics(t *testing.T) { exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { cfg.MetricsIndex = "metrics.index" + cfg.Mapping.Mode = "ecs" }) metrics := newMetricsWithAttributeAndResourceMap( map[string]string{ @@ -767,6 +771,75 @@ func TestExporterMetrics(t *testing.T) { assertItemsEqual(t, expected, rec.Items(), false) }) + t.Run("otel mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + scopeA := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeA.Metrics() + fooMetric := metricSlice.AppendEmpty() + fooMetric.SetName("metric.foo") + fooDps := fooMetric.SetEmptyHistogram().DataPoints() + fooDp := fooDps.AppendEmpty() + fooDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0}) + fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4}) + fooOtherDp := fooDps.AppendEmpty() + fooOtherDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) + fooOtherDp.ExplicitBounds().FromRaw([]float64{4.0, 5.0, 6.0}) + fooOtherDp.BucketCounts().FromRaw([]uint64{4, 5, 6, 7}) + + sumMetric := metricSlice.AppendEmpty() + sumMetric.SetName("metric.sum") + sumDps := sumMetric.SetEmptySum().DataPoints() + sumDp := sumDps.AppendEmpty() + sumDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) + sumDp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0))) + sumDp.SetDoubleValue(1.5) + + summaryMetric := metricSlice.AppendEmpty() + summaryMetric.SetName("metric.summary") + summaryDps := summaryMetric.SetEmptySummary().DataPoints() + summaryDp := summaryDps.AppendEmpty() + summaryDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3*3600, 0))) + summaryDp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3*3600, 0))) + summaryDp.SetCount(1) + summaryDp.SetSum(1.5) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(2) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T02:00:00.000000000Z"}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary_metrics"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T03:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T03:00:00.000000000Z"}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) + t.Run("publish summary", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go index 1aa67c2d78e63..0907f2fe22233 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -34,6 +34,7 @@ package objmodel // import "github.com/open-telemetry/opentelemetry-collector-co import ( "encoding/hex" "io" + "maps" "math" "sort" "strings" @@ -48,7 +49,8 @@ import ( // Document is an intermediate representation for converting open telemetry records with arbitrary attributes // into a JSON document that can be processed by Elasticsearch. type Document struct { - fields []field + fields []field + dynamicTemplates map[string]string } type field struct { @@ -81,6 +83,7 @@ const ( KindObject KindTimestamp KindIgnore + KindUnflattenableObject // Unflattenable object is an object that should not be flattened at serialization time ) const tsLayout = "2006-01-02T15:04:05.000000000Z" @@ -105,13 +108,24 @@ func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document { fields := make([]field, 0, am.Len()) fields = appendAttributeFields(fields, path, am) - return Document{fields} + return Document{fields: fields} } func (doc *Document) Clone() *Document { fields := make([]field, len(doc.fields)) copy(fields, doc.fields) - return &Document{fields} + return &Document{fields: fields, dynamicTemplates: maps.Clone(doc.dynamicTemplates)} +} + +func (doc *Document) AddDynamicTemplate(path, template string) { + if doc.dynamicTemplates == nil { + doc.dynamicTemplates = make(map[string]string) + } + doc.dynamicTemplates[path] = template +} + +func (doc *Document) DynamicTemplates() map[string]string { + return doc.dynamicTemplates } // AddTimestamp adds a raw timestamp value to the Document. @@ -293,6 +307,7 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error { // for current use cases and the proper fix will be slightly too complex. YAGNI. var otelPrefixSet = map[string]struct{}{ "attributes.": {}, + "metrics.": {}, } func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error { @@ -422,6 +437,12 @@ func TimestampValue(ts time.Time) Value { return Value{kind: KindTimestamp, ts: ts} } +// UnflattenableObjectValue creates a unflattenable object from a map +func UnflattenableObjectValue(m pcommon.Map) Value { + sub := DocumentFromAttributes(m) + return Value{kind: KindUnflattenableObject, doc: sub} +} + // ValueFromAttribute converts a AttributeValue into a value. func ValueFromAttribute(attr pcommon.Value) Value { switch attr.Type() { @@ -506,6 +527,11 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error { return w.OnNil() } return v.doc.iterJSON(w, dedot, otel) + case KindUnflattenableObject: + if len(v.doc.fields) == 0 { + return w.OnNil() + } + return v.doc.iterJSON(w, true, otel) case KindArr: if err := w.OnArrayStart(-1, structform.AnyType); err != nil { return err diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go index 50a7064d8fe2d..b0f7917005672 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go @@ -34,7 +34,7 @@ func TestObjectModel_CreateMap(t *testing.T) { m.PutStr("str", "test") return DocumentFromAttributes(m) }, - want: Document{[]field{{"i", IntValue(42)}, {"str", StringValue("test")}}}, + want: Document{fields: []field{{"i", IntValue(42)}, {"str", StringValue("test")}}}, }, "ignores nil values": { build: func() Document { @@ -43,7 +43,7 @@ func TestObjectModel_CreateMap(t *testing.T) { m.PutStr("str", "test") return DocumentFromAttributes(m) }, - want: Document{[]field{{"str", StringValue("test")}}}, + want: Document{fields: []field{{"str", StringValue("test")}}}, }, "from map with prefix": { build: func() Document { @@ -52,7 +52,7 @@ func TestObjectModel_CreateMap(t *testing.T) { m.PutStr("str", "test") return DocumentFromAttributesWithPath("prefix", m) }, - want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + want: Document{fields: []field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, }, "add attributes with key": { build: func() (doc Document) { @@ -62,7 +62,7 @@ func TestObjectModel_CreateMap(t *testing.T) { doc.AddAttributes("prefix", m) return doc }, - want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + want: Document{fields: []field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, }, "add attribute flattens a map value": { build: func() (doc Document) { @@ -73,7 +73,7 @@ func TestObjectModel_CreateMap(t *testing.T) { doc.AddAttribute("prefix", mapVal) return doc }, - want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + want: Document{fields: []field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, }, } @@ -96,7 +96,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("c", 3) return doc }, - want: Document{[]field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, + want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, }, "duplicate keys": { build: func() (doc Document) { @@ -105,7 +105,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("a", 2) return doc }, - want: Document{[]field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, }, "duplicate after flattening from map: namespace object at end": { build: func() Document { @@ -115,7 +115,7 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutEmptyMap("namespace").PutInt("a", 23) return DocumentFromAttributes(am) }, - want: Document{[]field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, }, "duplicate after flattening from map: namespace object at beginning": { build: func() Document { @@ -125,7 +125,7 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutStr("toplevel", "test") return DocumentFromAttributes(am) }, - want: Document{[]field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, }, "dedup in arrays": { build: func() (doc Document) { @@ -137,7 +137,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded})) return doc }, - want: Document{[]field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{[]field{ + want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{ {"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}, @@ -149,7 +149,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.a", 2) return doc }, - want: Document{[]field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, }, "dedup removes primitive if value exists": { build: func() (doc Document) { @@ -158,7 +158,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.value", 3) return doc }, - want: Document{[]field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, }, } @@ -219,7 +219,7 @@ func TestValue_FromAttribute(t *testing.T) { v.Map().PutInt("a", 1) return v }(), - want: Value{kind: KindObject, doc: Document{[]field{{"a", IntValue(1)}}}}, + want: Value{kind: KindObject, doc: Document{fields: []field{{"a", IntValue(1)}}}}, }, } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index ebb44d7850d0f..c98f412d7b038 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -66,7 +66,7 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) - upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, dataPoint, pcommon.Value) error + upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error encodeDocument(objmodel.Document) ([]byte, error) } @@ -83,6 +83,7 @@ type encodeModel struct { type dataPoint interface { Timestamp() pcommon.Timestamp + StartTimestamp() pcommon.Timestamp Attributes() pcommon.Map } @@ -303,8 +304,21 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) return buf.Bytes(), nil } -func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error { - hash := metricHash(dp.Timestamp(), dp.Attributes()) +// upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index +func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error { + switch m.mode { + case MappingOTel: + return m.upsertMetricDataPointValueOTelMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp, value) + case MappingECS: + return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp, value) + default: + // Defaults to ECS for backward compatibility + return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp, value) + } +} + +func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ string, _ pcommon.InstrumentationScope, _ string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error { + hash := metricECSHash(dp.Timestamp(), dp.Attributes()) var ( document objmodel.Document ok bool @@ -321,6 +335,141 @@ func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.D return nil } +func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error { + // documents is per-resource. Therefore, there is no need to hash resource attributes + hash := metricOTelHash(dp, scope.Attributes(), metric.Unit()) + var ( + document objmodel.Document + ok bool + ) + if document, ok = documents[hash]; !ok { + document.AddTimestamp("@timestamp", dp.Timestamp()) + if dp.StartTimestamp() != 0 { + document.AddTimestamp("start_timestamp", dp.StartTimestamp()) + } + document.AddString("unit", metric.Unit()) + + // At this point the data_stream attributes are expected to be in the record attributes, + // updated by the router. + // Move them to the top of the document and remove them from the record + attributeMap := dp.Attributes() + + forEachDataStreamKey := func(fn func(key string)) { + for _, key := range datastreamKeys { + fn(key) + } + } + + forEachDataStreamKey(func(key string) { + if val, exists := attributeMap.Get(key); exists { + document.AddAttribute(key, val) + attributeMap.Remove(key) + } + }) + + document.AddAttributes("attributes", attributeMap) + + // Resource + resourceMapVal := pcommon.NewValueMap() + resourceMap := resourceMapVal.Map() + resourceMap.PutStr("schema_url", resourceSchemaURL) + resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount())) + resourceAttrMap := resourceMap.PutEmptyMap("attributes") + + resource.Attributes().CopyTo(resourceAttrMap) + + // Remove data_stream attributes from the resources attributes if present + forEachDataStreamKey(func(key string) { + resourceAttrMap.Remove(key) + }) + + document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal)) + + // Scope + scopeMapVal := pcommon.NewValueMap() + scopeMap := scopeMapVal.Map() + if scope.Name() != "" { + scopeMap.PutStr("name", scope.Name()) + } + if scope.Version() != "" { + scopeMap.PutStr("version", scope.Version()) + } + if scopeSchemaURL != "" { + scopeMap.PutStr("schema_url", scopeSchemaURL) + } + if scope.DroppedAttributesCount() > 0 { + scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount())) + } + scopeAttributes := scope.Attributes() + if scopeAttributes.Len() > 0 { + scopeAttrMap := scopeMap.PutEmptyMap("attributes") + scopeAttributes.CopyTo(scopeAttrMap) + + // Remove data_stream attributes from the scope attributes if present + forEachDataStreamKey(func(key string) { + scopeAttrMap.Remove(key) + }) + } + + if scopeMap.Len() > 0 { + document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal)) + } + } + + switch value.Type() { + case pcommon.ValueTypeMap: + m := pcommon.NewMap() + value.Map().CopyTo(m) + document.Add("metrics."+metric.Name(), objmodel.UnflattenableObjectValue(m)) + default: + document.Add("metrics."+metric.Name(), objmodel.ValueFromAttribute(value)) + } + // TODO: support quantiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34561 + + document.AddDynamicTemplate("metrics."+metric.Name(), metricDpToDynamicTemplate(metric, dp)) + documents[hash] = document + return nil +} + +// metricDpToDynamicTemplate returns the name of dynamic template that applies to the metric and data point, +// so that the field is indexed into Elasticsearch with the correct mapping. The name should correspond to a +// dynamic template that is defined in ES mapping, e.g. +// https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json +func metricDpToDynamicTemplate(metric pmetric.Metric, dp dataPoint) string { + switch metric.Type() { + case pmetric.MetricTypeSum: + switch dp.(pmetric.NumberDataPoint).ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + if metric.Sum().IsMonotonic() { + return "counter_double" + } + return "gauge_double" + case pmetric.NumberDataPointValueTypeInt: + if metric.Sum().IsMonotonic() { + return "counter_long" + } + return "gauge_long" + default: + return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue + } + case pmetric.MetricTypeGauge: + switch dp.(pmetric.NumberDataPoint).ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + return "gauge_double" + case pmetric.NumberDataPointValueTypeInt: + return "gauge_long" + default: + return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue + } + case pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram: + return "histogram" + case pmetric.MetricTypeSummary: + return "summary_metrics" + } + return "" +} + func summaryToValue(dp pmetric.SummaryDataPoint) pcommon.Value { // TODO: Add support for quantiles // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34561 @@ -584,18 +733,51 @@ func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecor } // TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity -func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { +func metricECSHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { hasher := fnv.New32a() timestampBuf := make([]byte, 8) binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) hasher.Write(timestampBuf) - mapHash(hasher, attributes) + mapHashExcludeDataStreamAttr(hasher, attributes) return hasher.Sum32() } +func metricOTelHash(dp dataPoint, scopeAttrs pcommon.Map, unit string) uint32 { + hasher := fnv.New32a() + + timestampBuf := make([]byte, 8) + binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp())) + hasher.Write(timestampBuf) + + binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp())) + hasher.Write(timestampBuf) + + hasher.Write([]byte(unit)) + + mapHashExcludeDataStreamAttr(hasher, scopeAttrs) + mapHashExcludeDataStreamAttr(hasher, dp.Attributes()) + + return hasher.Sum32() +} + +// mapHashExcludeDataStreamAttr is mapHash but ignoring DS attributes. +// It is useful for cases where index is already considered during routing and no need to be considered in hashing. +func mapHashExcludeDataStreamAttr(hasher hash.Hash, m pcommon.Map) { + m.Range(func(k string, v pcommon.Value) bool { + switch k { + case dataStreamType, dataStreamDataset, dataStreamNamespace: + return true + } + hasher.Write([]byte(k)) + valueHash(hasher, v) + + return true + }) +} + func mapHash(hasher hash.Hash, m pcommon.Map) { m.Range(func(k string, v pcommon.Value) bool { hasher.Write([]byte(k)) diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index f4e24a209698b..37b9d3788e827 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -105,7 +105,9 @@ func TestEncodeMetric(t *testing.T) { require.NoError(t, err) err = model.upsertMetricDataPointValue(docs, metrics.ResourceMetrics().At(0).Resource(), + "", metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), + "", metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i), val)