Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] OTel mode serialization #33290

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3f2f30f
Implement OTel mode serialization
aleksmaus May 29, 2024
d00a835
Enforce flattened attributes
aleksmaus May 29, 2024
51e0c62
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus May 30, 2024
f308e88
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 3, 2024
5d80a84
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 4, 2024
bd189d2
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 5, 2024
87d9929
Add support for data_stream.* routing
aleksmaus Jun 24, 2024
eedd947
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 24, 2024
3fba3f7
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 28, 2024
3efdbec
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 28, 2024
ca8a2ec
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 1, 2024
460138c
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 10, 2024
26a374b
Adjust to the latest datastream routing implementation with added ote…
aleksmaus Jul 10, 2024
7362669
Fix typos in the comments
aleksmaus Jul 11, 2024
174f6d8
Updated the data_stream attributes serialization for OTel logs
aleksmaus Jul 11, 2024
8d8ce19
Always append otel suffix
aleksmaus Jul 11, 2024
018cce9
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 11, 2024
76824b2
Updated a comment
aleksmaus Jul 12, 2024
59e272d
Add unit tests coverage for OTel model serialization
aleksmaus Jul 13, 2024
39b9cb4
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 13, 2024
e914547
Add changelog
aleksmaus Jul 13, 2024
5b0b4ba
Update exporter/elasticsearchexporter/README.md
aleksmaus Jul 15, 2024
dd78d3f
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 15, 2024
2537852
Update .chloggen/feature_elasticsearch_otel_model.yaml
aleksmaus Jul 16, 2024
182eb37
Update .chloggen/feature_elasticsearch_otel_model.yaml
aleksmaus Jul 16, 2024
64d0bd5
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 16, 2024
54a52f0
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 17, 2024
5bcdf7e
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 18, 2024
e24cd4c
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 19, 2024
74c1d77
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
f411f0d
Return the set of prefixes that need to stay flattened
aleksmaus Jul 23, 2024
10370e9
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
8f027f5
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
b7a5def
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
abd9159
Fix linter failures due to naming convention
aleksmaus Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ behaviours, which may be configured through the following settings:
- `mode` (default=none): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Try to map fields defined in the
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions) (version 1.22.0)
aleksmaus marked this conversation as resolved.
Show resolved Hide resolved
to Elastic's preffered "OTel-native" convention. :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes
aleksmaus marked this conversation as resolved.
Show resolved Hide resolved
- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
Expand Down
7 changes: 6 additions & 1 deletion exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type LogstashFormatSettings struct {
}

type DynamicIndexSetting struct {
Enabled bool `mapstructure:"enabled"`
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"` // prefix_suffix or data_stream
aleksmaus marked this conversation as resolved.
Show resolved Hide resolved
}

// AuthenticationSettings defines user authentication related settings.
Expand Down Expand Up @@ -176,6 +177,7 @@ type MappingMode int
const (
MappingNone MappingMode = iota
MappingECS
MappingOTel
MappingRaw
)

Expand All @@ -190,6 +192,8 @@ func (m MappingMode) String() string {
return ""
case MappingECS:
return "ecs"
case MappingOTel:
return "otel"
case MappingRaw:
return "raw"
default:
Expand All @@ -202,6 +206,7 @@ var mappingModes = func() map[string]MappingMode {
for _, m := range []MappingMode{
MappingNone,
MappingECS,
MappingOTel,
MappingRaw,
} {
table[strings.ToLower(m.String())] = m
Expand Down
64 changes: 51 additions & 13 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,39 @@ type elasticsearchExporter struct {
component.TelemetrySettings
userAgent string

config *Config
index string
logstashFormat LogstashFormatSettings
dynamicIndex bool
model mappingModel
config *Config
index string
logstashFormat LogstashFormatSettings
dynamicIndex bool
dynamicIndexMode dynIdxMode

bulkIndexer *esBulkIndexerCurrent
model mappingModel
mode MappingMode
}

type dynIdxMode int // dynamic index mode

const (
dynIdxModePrefixSuffix dynIdxMode = iota
dynIdxModeDataStream
)

const (
sDynIdxModePrefixSuffix = "prefix_suffix"
sDynIdxModedimDataStream = "data_stream"
)

var errUnsupportedDynamicIndexMappingMode = errors.New("unsupported dynamic indexing mode")

func parseDIMode(s string) (dynIdxMode, error) {
switch s {
case "", sDynIdxModePrefixSuffix:
return dynIdxModePrefixSuffix, nil
case sDynIdxModedimDataStream:
return dynIdxModeDataStream, nil
}
return dynIdxModePrefixSuffix, errUnsupportedDynamicIndexMappingMode
}

func newExporter(
Expand All @@ -43,6 +69,11 @@ func newExporter(
return nil, err
}

dimMode, err := parseDIMode(cfg.LogsDynamicIndex.Mode)
if err != nil {
return nil, fmt.Errorf("%w: %s", err, cfg.LogsDynamicIndex.Mode)
}

model := &encodeModel{
dedup: cfg.Mapping.Dedup,
dedot: cfg.Mapping.Dedot,
Expand All @@ -61,11 +92,13 @@ func newExporter(
TelemetrySettings: set.TelemetrySettings,
userAgent: userAgent,

config: cfg,
index: index,
dynamicIndex: dynamicIndex,
model: model,
logstashFormat: cfg.LogstashFormat,
config: cfg,
index: index,
dynamicIndex: dynamicIndex,
dynamicIndexMode: dimMode,
model: model,
mode: cfg.MappingMode(),
logstashFormat: cfg.LogstashFormat,
}, nil
}

Expand Down Expand Up @@ -102,7 +135,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
scope := ill.Scope()
logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil {
if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl()); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -116,7 +149,12 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
return errors.Join(errs...)
}

func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error {
func (e *elasticsearchExporter) pushLogRecord(ctx context.Context,
resource pcommon.Resource,
resourceSchemaUrl string,
record plog.LogRecord,
scope pcommon.InstrumentationScope,
scopeSchemaUrl string) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeLogRecord(record, scope, resource, fIndex)
Expand All @@ -130,7 +168,7 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
document, err := e.model.encodeLog(resource, resourceSchemaUrl, record, scope, scopeSchemaUrl)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
Expand Down
41 changes: 29 additions & 12 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,19 +245,19 @@ func (doc *Document) Dedup() {
// Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true.
//
// NOTE: The documented MUST be sorted if dedot is true.
func (doc *Document) Serialize(w io.Writer, dedot bool) error {
func (doc *Document) Serialize(w io.Writer, dedot bool, otel bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to change objmodel for otel mode? Is there a reason why it isn't done like https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/model.go#L124 where we just add the attributes under a key?

Copy link
Contributor Author

@aleksmaus aleksmaus Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember, at the time of implementing this feature, I stumbled on the issue where the document could only be serialized as completely flat from the root or "dedotted" (by default https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/factory.go#L80), all the comma separated properties were unwrapped into the nested object, which was not stored correctly "flattened" with the passthrough attributes in ES.

With OTel-native serialization the serialized document structure is a mix, needed to keep only the ".attributes*" flattened, while the rest of the document from the root is not flattened.

Let me know if anything changed in that area recently, this PR was open for some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context. I'm just learning about how passthrough field type works, and what you describe here makes sense. I wonder if it could be structured better in the code, but I don't have any good ideas at the moment and it is not a blocker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to revisit this once we remove the dedot option: #33772.

v := json.NewVisitor(w)
return doc.iterJSON(v, dedot)
return doc.iterJSON(v, dedot, otel)
}

func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error {
func (doc *Document) iterJSON(v *json.Visitor, dedot bool, otel bool) error {
if dedot {
return doc.iterJSONDedot(v)
return doc.iterJSONDedot(v, otel)
}
return doc.iterJSONFlat(v)
return doc.iterJSONFlat(v, otel)
}

func (doc *Document) iterJSONFlat(w *json.Visitor) error {
func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
err := w.OnObjectStart(-1, structform.AnyType)
if err != nil {
return err
Expand All @@ -276,15 +276,22 @@ func (doc *Document) iterJSONFlat(w *json.Visitor) error {
return err
}

if err := fld.value.iterJSON(w, true); err != nil {
if err := fld.value.iterJSON(w, true, otel); err != nil {
return err
}
}

return nil
}

func (doc *Document) iterJSONDedot(w *json.Visitor) error {
// Set of prefixes for the OTel attributes that needs to stay flattened
var otelPrefixSet = map[string]struct{}{
"attributes.": struct{}{},
"resource.attributes.": struct{}{},
"scope.attributes.": struct{}{},
}

func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
objPrefix := ""
level := 0

Expand Down Expand Up @@ -336,6 +343,16 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {

// increase object level up to current field
for {

// Otel mode serialization
if otel {
// Check the prefix
_, isOtelPrefix := otelPrefixSet[objPrefix]
if isOtelPrefix {
break
}
}

start := len(objPrefix)
idx := strings.IndexByte(key[start:], '.')
if idx < 0 {
Expand All @@ -358,7 +375,7 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {
if err := w.OnKey(fieldName); err != nil {
return err
}
if err := fld.value.iterJSON(w, true); err != nil {
if err := fld.value.iterJSON(w, true, otel); err != nil {
return err
}
}
Expand Down Expand Up @@ -462,7 +479,7 @@ func (v *Value) IsEmpty() bool {
}
}

func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error {
switch v.kind {
case KindNil:
return w.OnNil()
Expand All @@ -485,13 +502,13 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, dedot)
return v.doc.iterJSON(w, dedot, otel)
case KindArr:
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
return err
}
for i := range v.arr {
if err := v.arr[i].iterJSON(w, dedot); err != nil {
if err := v.arr[i].iterJSON(w, dedot, otel); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestDocument_Serialize_Flat(t *testing.T) {
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup()
err := doc.Serialize(&buf, false)
err := doc.Serialize(&buf, false, false)
require.NoError(t, err)

assert.Equal(t, test.want, buf.String())
Expand Down Expand Up @@ -381,14 +381,50 @@ func TestDocument_Serialize_Dedot(t *testing.T) {
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup()
err := doc.Serialize(&buf, true)
err := doc.Serialize(&buf, true, false)
require.NoError(t, err)

assert.Equal(t, test.want, buf.String())
})
}
}

func TestDocument_Serialize_Otel(t *testing.T) {
tests := map[string]struct {
attrs map[string]any
want string
}{
"otel": {
attrs: map[string]any{
"@timestamp": "2024-03-18T21:09:53.645578000Z",
"attributes.auditd.log.op": "PAM:session_open",
"attributes.auditd.log.record_type": "USER_START",
"attributes.auditd.log.sequence": 6082,
"attributes.auditd.log.subj": "unconfined",
"attributes.auditd.log.uid": "1000",
"scope.attributes.bar.one": "boo",
"scope.attributes.foo.two": "bar",
"resource.attributes.blah.num": 234,
"resource.attributes.blah.str": "something",
},
want: `{"@timestamp":"2024-03-18T21:09:53.645578000Z","attributes":{"auditd.log.op":"PAM:session_open","auditd.log.record_type":"USER_START","auditd.log.sequence":6082,"auditd.log.subj":"unconfined","auditd.log.uid":"1000"},"resource":{"attributes":{"blah.num":234,"blah.str":"something"}},"scope":{"attributes":{"bar.one":"boo","foo.two":"bar"}}}`,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
var buf strings.Builder
m := pcommon.NewMap()
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup() // Call Dedup for predictable order
err := doc.Serialize(&buf, true, true)
require.NoError(t, err)
assert.Equal(t, test.want, buf.String())
})
}
}

func TestValue_Serialize(t *testing.T) {
tests := map[string]struct {
value Value
Expand Down Expand Up @@ -427,7 +463,7 @@ func TestValue_Serialize(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
var buf strings.Builder
err := test.value.iterJSON(json.NewVisitor(&buf), false)
err := test.value.iterJSON(json.NewVisitor(&buf), false, false)
require.NoError(t, err)
assert.Equal(t, test.want, buf.String())
})
Expand Down
Loading