Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] OTel mode serialization #33290

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3f2f30f
Implement OTel mode serialization
aleksmaus May 29, 2024
d00a835
Enforce flattened attributes
aleksmaus May 29, 2024
51e0c62
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus May 30, 2024
f308e88
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 3, 2024
5d80a84
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 4, 2024
bd189d2
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 5, 2024
87d9929
Add support for data_stream.* routing
aleksmaus Jun 24, 2024
eedd947
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 24, 2024
3fba3f7
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 28, 2024
3efdbec
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 28, 2024
ca8a2ec
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 1, 2024
460138c
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 10, 2024
26a374b
Adjust to the latest datastream routing implementation with added ote…
aleksmaus Jul 10, 2024
7362669
Fix typos in the comments
aleksmaus Jul 11, 2024
174f6d8
Updated the data_stream attributes serialization for OTel logs
aleksmaus Jul 11, 2024
8d8ce19
Always append otel suffix
aleksmaus Jul 11, 2024
018cce9
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 11, 2024
76824b2
Updated a comment
aleksmaus Jul 12, 2024
59e272d
Add unit tests coverage for OTel model serialization
aleksmaus Jul 13, 2024
39b9cb4
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 13, 2024
e914547
Add changelog
aleksmaus Jul 13, 2024
5b0b4ba
Update exporter/elasticsearchexporter/README.md
aleksmaus Jul 15, 2024
dd78d3f
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 15, 2024
2537852
Update .chloggen/feature_elasticsearch_otel_model.yaml
aleksmaus Jul 16, 2024
182eb37
Update .chloggen/feature_elasticsearch_otel_model.yaml
aleksmaus Jul 16, 2024
64d0bd5
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 16, 2024
54a52f0
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 17, 2024
5bcdf7e
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 18, 2024
e24cd4c
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 19, 2024
74c1d77
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
f411f0d
Return the set of prefixes that need to stay flattened
aleksmaus Jul 23, 2024
10370e9
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
8f027f5
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
b7a5def
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
abd9159
Fix linter failures due to naming convention
aleksmaus Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feature_elasticsearch_otel_model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce an experimental OTel native mapping mode for logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33290]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ behaviours, which may be configured through the following settings:
- `mode` (default=none): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
:warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.

- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type MappingMode int
const (
MappingNone MappingMode = iota
MappingECS
MappingOTel
MappingRaw
)

Expand All @@ -193,6 +194,8 @@ func (m MappingMode) String() string {
return ""
case MappingECS:
return "ecs"
case MappingOTel:
return "otel"
case MappingRaw:
return "raw"
default:
Expand All @@ -205,6 +208,7 @@ var mappingModes = func() map[string]MappingMode {
for _, m := range []MappingMode{
MappingNone,
MappingECS,
MappingOTel,
MappingRaw,
} {
table[strings.ToLower(m.String())] = m
Expand Down
18 changes: 15 additions & 3 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
pcommon.Map,
pcommon.Map,
string,
bool,
) string {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
otel bool,
) string {
// Order:
// 1. read data_stream.* from attributes
Expand All @@ -37,6 +39,13 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}
}

// The naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
// This is in order to match the soon to be built-in logs-*.otel-* index template.
if otel {
dataset += ".otel"
}

recordAttr.PutStr(dataStreamDataset, dataset)
aleksmaus marked this conversation as resolved.
Show resolved Hide resolved
recordAttr.PutStr(dataStreamNamespace, namespace)
recordAttr.PutStr(dataStreamType, defaultDSType)
Expand All @@ -51,9 +60,10 @@ func routeLogRecord(
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
Expand All @@ -63,9 +73,10 @@ func routeDataPoint(
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
Expand All @@ -75,7 +86,8 @@ func routeSpan(
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
78 changes: 78 additions & 0 deletions exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type routeTestInfo struct {
name string
otel bool
want string
}

func createRouteTests(dsType string) []routeTestInfo {
renderWantRoute := func(dsType string, otel bool) string {
if otel {
return fmt.Sprintf("%s-%s.otel-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
}
return fmt.Sprintf("%s-%s-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
}

return []routeTestInfo{
{
name: "default",
otel: false,
want: renderWantRoute(dsType, false),
},
{
name: "otel",
otel: true,
want: renderWantRoute(dsType, true),
},
}
}

func TestRouteLogRecord(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeLogs)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeLogRecord(plog.NewLogRecord(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
}

func TestRouteDataPoint(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeMetrics)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeDataPoint(pmetric.NewNumberDataPoint(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
}

func TestRouteSpan(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeTraces)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeSpan(ptrace.NewSpan(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
}
16 changes: 11 additions & 5 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type elasticsearchExporter struct {
logstashFormat LogstashFormatSettings
dynamicIndex bool
model mappingModel
otel bool

bulkIndexer bulkIndexer
}
Expand All @@ -49,6 +50,8 @@ func newExporter(
mode: cfg.MappingMode(),
}

otel := model.mode == MappingOTel

userAgent := fmt.Sprintf(
"%s/%s (%s/%s)",
set.BuildInfo.Description,
Expand All @@ -66,6 +69,7 @@ func newExporter(
dynamicIndex: dynamicIndex,
model: model,
logstashFormat: cfg.LogstashFormat,
otel: otel,
}, nil
}

Expand Down Expand Up @@ -107,7 +111,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
scope := ill.Scope()
logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope, session); err != nil {
if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -130,13 +134,15 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
func (e *elasticsearchExporter) pushLogRecord(
ctx context.Context,
resource pcommon.Resource,
resourceSchemaUrl string,
record plog.LogRecord,
scope pcommon.InstrumentationScope,
scopeSchemaUrl string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeLogRecord(record, scope, resource, fIndex)
fIndex = routeLogRecord(record, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
Expand All @@ -147,7 +153,7 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
document, err := e.model.encodeLog(resource, resourceSchemaUrl, record, scope, scopeSchemaUrl)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
Expand Down Expand Up @@ -279,7 +285,7 @@ func (e *elasticsearchExporter) getMetricDataPointIndex(
) (string, error) {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex)
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
Expand Down Expand Up @@ -342,7 +348,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpan(span, scope, resource, fIndex)
fIndex = routeSpan(span, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
Expand Down
30 changes: 18 additions & 12 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,19 @@ func (doc *Document) Dedup() {
// Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true.
//
// NOTE: The documented MUST be sorted if dedot is true.
func (doc *Document) Serialize(w io.Writer, dedot bool) error {
func (doc *Document) Serialize(w io.Writer, dedot bool, otel bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to change objmodel for otel mode? Is there a reason why it isn't done like https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/model.go#L124 where we just add the attributes under a key?

Copy link
Contributor Author

@aleksmaus aleksmaus Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember, at the time of implementing this feature, I stumbled on the issue where the document could only be serialized as completely flat from the root or "dedotted" (by default https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/factory.go#L80), all the comma separated properties were unwrapped into the nested object, which was not stored correctly "flattened" with the passthrough attributes in ES.

With OTel-native serialization the serialized document structure is a mix, needed to keep only the ".attributes*" flattened, while the rest of the document from the root is not flattened.

Let me know if anything changed in that area recently, this PR was open for some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context. I'm just learning about how passthrough field type works, and what you describe here makes sense. I wonder if it could be structured better in the code, but I don't have any good ideas at the moment and it is not a blocker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to revisit this once we remove the dedot option: #33772.

v := json.NewVisitor(w)
return doc.iterJSON(v, dedot)
return doc.iterJSON(v, dedot, otel)
}

func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error {
func (doc *Document) iterJSON(v *json.Visitor, dedot bool, otel bool) error {
if dedot {
return doc.iterJSONDedot(v)
return doc.iterJSONDedot(v, otel)
}
return doc.iterJSONFlat(v)
return doc.iterJSONFlat(v, otel)
}

func (doc *Document) iterJSONFlat(w *json.Visitor) error {
func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
err := w.OnObjectStart(-1, structform.AnyType)
if err != nil {
return err
Expand All @@ -275,15 +275,15 @@ func (doc *Document) iterJSONFlat(w *json.Visitor) error {
return err
}

if err := fld.value.iterJSON(w, true); err != nil {
if err := fld.value.iterJSON(w, true, otel); err != nil {
return err
}
}

return nil
}

func (doc *Document) iterJSONDedot(w *json.Visitor) error {
func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
objPrefix := ""
level := 0

Expand Down Expand Up @@ -335,6 +335,12 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {

// increase object level up to current field
for {

// Otel mode serialization
if otel && strings.HasPrefix(objPrefix, "attributes.") {
break
}

start := len(objPrefix)
idx := strings.IndexByte(key[start:], '.')
if idx < 0 {
Expand All @@ -357,7 +363,7 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {
if err := w.OnKey(fieldName); err != nil {
return err
}
if err := fld.value.iterJSON(w, true); err != nil {
if err := fld.value.iterJSON(w, true, otel); err != nil {
return err
}
}
Expand Down Expand Up @@ -460,7 +466,7 @@ func (v *Value) IsEmpty() bool {
}
}

func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error {
switch v.kind {
case KindNil:
return w.OnNil()
Expand All @@ -483,13 +489,13 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, dedot)
return v.doc.iterJSON(w, dedot, otel)
case KindArr:
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
return err
}
for i := range v.arr {
if err := v.arr[i].iterJSON(w, dedot); err != nil {
if err := v.arr[i].iterJSON(w, dedot, otel); err != nil {
return err
}
}
Expand Down
Loading
Loading