Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] OTel mode serialization #33290

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3f2f30f
Implement OTel mode serialization
aleksmaus May 29, 2024
d00a835
Enforce flattened attributes
aleksmaus May 29, 2024
51e0c62
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus May 30, 2024
f308e88
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 3, 2024
5d80a84
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 4, 2024
bd189d2
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 5, 2024
87d9929
Add support for data_stream.* routing
aleksmaus Jun 24, 2024
eedd947
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 24, 2024
3fba3f7
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 28, 2024
3efdbec
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jun 28, 2024
ca8a2ec
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 1, 2024
460138c
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 10, 2024
26a374b
Adjust to the latest datastream routing implementation with added ote…
aleksmaus Jul 10, 2024
7362669
Fix typos in the comments
aleksmaus Jul 11, 2024
174f6d8
Updated the data_stream attributes serialization for OTel logs
aleksmaus Jul 11, 2024
8d8ce19
Always append otel suffix
aleksmaus Jul 11, 2024
018cce9
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 11, 2024
76824b2
Updated a comment
aleksmaus Jul 12, 2024
59e272d
Add unit tests coverage for OTel model serialization
aleksmaus Jul 13, 2024
39b9cb4
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 13, 2024
e914547
Add changelog
aleksmaus Jul 13, 2024
5b0b4ba
Update exporter/elasticsearchexporter/README.md
aleksmaus Jul 15, 2024
dd78d3f
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 15, 2024
2537852
Update .chloggen/feature_elasticsearch_otel_model.yaml
aleksmaus Jul 16, 2024
182eb37
Update .chloggen/feature_elasticsearch_otel_model.yaml
aleksmaus Jul 16, 2024
64d0bd5
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 16, 2024
54a52f0
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 17, 2024
5bcdf7e
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 18, 2024
e24cd4c
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 19, 2024
74c1d77
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
f411f0d
Return the set of prefixes that need to stay flattened
aleksmaus Jul 23, 2024
10370e9
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
8f027f5
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
b7a5def
Merge branch 'main' into feature/elasticsearch_otel_model
aleksmaus Jul 23, 2024
abd9159
Fix linter failures due to naming convention
aleksmaus Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 11 additions & 49 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,19 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"

dataStreamType = "data_stream.type"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"

defaultDataStreamType = "logs"
defaultDataStreamNamespace = "default"
defaultDataStreamDataset = "generic"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
)

// resource is higher priotized than record attribute
type attrGetter interface {
Attributes() pcommon.Map
}

// retrieve attribute out of resource, scope, and record (span or log, if not found in resource)
// Deprecated: Use getFromAttributesNew instead.
func getFromAttributes(name string, resource, scope, record attrGetter) string {
var str string
val, exist := resource.Attributes().Get(name)
if !exist {
val, exist = scope.Attributes().Get(name)
if !exist {
val, exist = record.Attributes().Get(name)
if exist {
str = val.AsString()
}
}
if exist {
str = val.AsString()
}
}
if exist {
str = val.AsString()
}
return str
}

// retrieve attribute out of resource, scope, and record (span or log, if not found in resource)
// returns default value if result is empty string
func getFromAttributesWithDefault(name string, resource, scope, record attrGetter, defaultValue string) string {
s := getFromAttributes(name, resource, scope, record)
if s == "" {
s = defaultValue
}
return s
}

func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string {
func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) {
for _, attributeMap := range attributeMaps {
if value, exists := attributeMap.Get(name); exists {
return value.AsString(), true
Expand Down
20 changes: 1 addition & 19 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"runtime"
"strings"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -158,24 +157,7 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context,
scopeSchemaUrl string) error {
fIndex := e.index
if e.dynamicIndex {
switch e.dynamicIndexMode {
case dynIdxModePrefixSuffix:
prefix := getFromAttributes(indexPrefix, resource, scope, record)
suffix := getFromAttributes(indexSuffix, resource, scope, record)
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
case dynIdxModeDataStream:
const otelSuffix = ".otel"
aleksmaus marked this conversation as resolved.
Show resolved Hide resolved
typ := getFromAttributesWithDefault(dataStreamType, resource, scope, record, defaultDataStreamType)
dataset := getFromAttributesWithDefault(dataStreamDataset, resource, scope, record, defaultDataStreamDataset)
namespace := getFromAttributesWithDefault(dataStreamNamespace, resource, scope, record, defaultDataStreamNamespace)

// The naming convension for datastream is expected to be the following
// "logs-[dataset].otel-[namespace]"
if e.mode == MappingOTel && !strings.HasSuffix(dataset, otelSuffix) {
dataset += otelSuffix
}
fIndex = fmt.Sprintf("%s-%s-%s", typ, dataset, namespace)
}
fIndex = routeLogRecord(record, scope, resource, fIndex)
}

if e.logstashFormat.Enabled {
Expand Down
27 changes: 11 additions & 16 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ var resourceAttrsToPreserve = map[string]bool{

type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeMetrics(pcommon.Resource, pmetric.MetricSlice, pcommon.InstrumentationScope) ([][]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
upsertMetricDataPoint(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, pmetric.NumberDataPoint) error
encodeDocument(objmodel.Document) ([]byte, error)
Expand Down Expand Up @@ -274,27 +273,23 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error)
}

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

for _, doc := range docs {
if m.dedup {
doc.Dedup()
} else if m.dedot {
doc.Sort()
}

var buf bytes.Buffer
err := doc.Serialize(&buf, m.dedot, m.mode == MappingOTel)
if err != nil {
return nil, err
}

res = append(res, buf.Bytes())
func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error {
hash := metricHash(dp.Timestamp(), dp.Attributes())
var (
document objmodel.Document
ok bool
)
if document, ok = documents[hash]; !ok {
encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve)
document.AddTimestamp("@timestamp", dp.Timestamp())
document.AddAttributes("", dp.Attributes())
}

switch dp.ValueType() {
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.