Skip to content

Commit

Permalink
[exporter/elasticsearch] Add OTel mapping mode for traces
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Aug 8, 2024
1 parent c9bd3ef commit e7f41d2
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 71 deletions.
6 changes: 4 additions & 2 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (e *elasticsearchExporter) pushTraceData(
spans := scopeSpan.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if err := e.pushTraceRecord(ctx, resource, span, scope, session); err != nil {
if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -366,8 +366,10 @@ func (e *elasticsearchExporter) pushTraceData(
func (e *elasticsearchExporter) pushTraceRecord(
ctx context.Context,
resource pcommon.Resource,
resourceSchemaURL string,
span ptrace.Span,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := e.index
Expand All @@ -383,7 +385,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeSpan(resource, span, scope)
document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
Expand Down
62 changes: 62 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,68 @@ func TestExporterTraces(t *testing.T) {
))
rec.WaitItems(1)
})

t.Run("otel mode", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) {
cfg.TracesDynamicIndex.Enabled = true
cfg.Mapping.Mode = "otel"
})

traces := ptrace.NewTraces()
resourceSpans := traces.ResourceSpans()
rs := resourceSpans.AppendEmpty()

span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("name")
span.SetTraceID(pcommon.NewTraceIDEmpty())
span.SetSpanID(pcommon.NewSpanIDEmpty())
span.SetFlags(1)
span.SetDroppedAttributesCount(2)
span.SetDroppedEventsCount(3)
span.SetDroppedLinksCount(4)
span.TraceState().FromRaw("foo")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0)))

scopeAttr := span.Attributes()
fillResourceAttributeMap(scopeAttr, map[string]string{
"attr.foo": "attr.bar",
})

resAttr := rs.Resource().Attributes()
fillResourceAttributeMap(resAttr, map[string]string{
"resource.foo": "resource.bar",
})

spanLink := span.Links().AppendEmpty()
spanLink.SetTraceID(pcommon.NewTraceIDEmpty())
spanLink.SetSpanID(pcommon.NewSpanIDEmpty())
spanLink.SetFlags(10)
spanLink.SetDroppedAttributesCount(11)
spanLink.TraceState().FromRaw("bar")
fillResourceAttributeMap(spanLink.Attributes(), map[string]string{
"link.attr.foo": "link.attr.bar",
})

mustSendTraces(t, exporter, traces)

rec.WaitItems(1)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"traces-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"SPAN_KIND_UNSPECIFIED","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"","trace_flags":10,"trace_id":"","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0,"schema_url":""},"status":{"code":"Unset","message":""},"trace_flags":1,"trace_state":"foo"}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})
}

// TestExporterAuth verifies that the Elasticsearch exporter supports
Expand Down
200 changes: 132 additions & 68 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var resourceAttrsToPreserve = map[string]bool{

type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, dataPoint, pcommon.Value) error
encodeDocument(objmodel.Document) ([]byte, error)
}
Expand Down Expand Up @@ -130,7 +130,29 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
return document
}

var datastreamKeys = []string{dataStreamType, dataStreamDataset, dataStreamNamespace}
func forEachDataStreamKey(fn func(key string)) {
for _, key := range []string{dataStreamType, dataStreamDataset, dataStreamNamespace} {
fn(key)
}
}

// addDataStreamAttributes adds data_stream.* attributes to document
func addDataStreamAttributes(document *objmodel.Document, attr pcommon.Map) {
forEachDataStreamKey(func(key string) {
if value, exists := attr.Get(key); exists {
document.AddAttribute(key, value)
}
})
}

// stripDataStreamAttributes removes data_stream.* attributes from map
func stripDataStreamAttributes(attr pcommon.Map) {
forEachDataStreamKey(func(key string) {
if _, exists := attr.Get(key); exists {
attr.Remove(key)
}
})
}

func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document {
var document objmodel.Document
Expand All @@ -154,67 +176,12 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
// updated by the router.
// Move them to the top of the document and remove them from the record
attributeMap := record.Attributes()

forEachDataStreamKey := func(fn func(key string)) {
for _, key := range datastreamKeys {
fn(key)
}
}

forEachDataStreamKey(func(key string) {
if value, exists := attributeMap.Get(key); exists {
document.AddAttribute(key, value)
attributeMap.Remove(key)
}
})

addDataStreamAttributes(&document, attributeMap)
stripDataStreamAttributes(attributeMap)
document.AddAttributes("attributes", attributeMap)

// Resource
resourceMapVal := pcommon.NewValueMap()
resourceMap := resourceMapVal.Map()
resourceMap.PutStr("schema_url", resourceSchemaURL)
resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount()))
resourceAttrMap := resourceMap.PutEmptyMap("attributes")

resource.Attributes().CopyTo(resourceAttrMap)

// Remove data_stream attributes from the resources attributes if present
forEachDataStreamKey(func(key string) {
resourceAttrMap.Remove(key)
})

document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal))

// Scope
scopeMapVal := pcommon.NewValueMap()
scopeMap := scopeMapVal.Map()
if scope.Name() != "" {
scopeMap.PutStr("name", scope.Name())
}
if scope.Version() != "" {
scopeMap.PutStr("version", scope.Version())
}
if scopeSchemaURL != "" {
scopeMap.PutStr("schema_url", scopeSchemaURL)
}
if scope.DroppedAttributesCount() > 0 {
scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount()))
}
scopeAttributes := scope.Attributes()
if scopeAttributes.Len() > 0 {
scopeAttrMap := scopeMap.PutEmptyMap("attributes")
scopeAttributes.CopyTo(scopeAttrMap)

// Remove data_stream attributes from the scope attributes if present
forEachDataStreamKey(func(key string) {
scopeAttrMap.Remove(key)
})
}

if scopeMap.Len() > 0 {
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

// Body
setOTelLogBody(&document, record.Body())
Expand Down Expand Up @@ -385,7 +352,109 @@ func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) {
return pcommon.Value{}, errInvalidNumberDataPoint
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string) {
resourceMapVal := pcommon.NewValueMap()
resourceMap := resourceMapVal.Map()
resourceMap.PutStr("schema_url", resourceSchemaURL)
resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount()))
resourceAttrMap := resourceMap.PutEmptyMap("attributes")
resource.Attributes().CopyTo(resourceAttrMap)
stripDataStreamAttributes(resourceAttrMap)

document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal))
}

func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pcommon.InstrumentationScope, scopeSchemaURL string) {
scopeMapVal := pcommon.NewValueMap()
scopeMap := scopeMapVal.Map()
if scope.Name() != "" {
scopeMap.PutStr("name", scope.Name())
}
if scope.Version() != "" {
scopeMap.PutStr("version", scope.Version())
}
if scopeSchemaURL != "" {
scopeMap.PutStr("schema_url", scopeSchemaURL)
}
if scope.DroppedAttributesCount() > 0 {
scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount()))
}
scopeAttributes := scope.Attributes()
if scopeAttributes.Len() > 0 {
scopeAttrMap := scopeMap.PutEmptyMap("attributes")
scopeAttributes.CopyTo(scopeAttrMap)
stripDataStreamAttributes(scopeAttrMap)
}
if scopeMap.Len() > 0 {
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
var document objmodel.Document
switch m.mode {
case MappingOTel:
document = m.encodeSpanOTelMode(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
default:
document = m.encodeSpanDefaultMode(resource, span, scope)
}
document.Dedup()
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
return buf.Bytes(), err
}

func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document {
var document objmodel.Document
document.AddTimestamp("@timestamp", span.StartTimestamp())
document.AddTraceID("trace_id", span.TraceID())
document.AddSpanID("span_id", span.SpanID())
document.AddString("trace_state", span.TraceState().AsRaw())
document.AddSpanID("parent_span_id", span.ParentSpanID())
document.AddInt("trace_flags", int64(span.Flags()))
document.AddString("name", span.Name())
document.AddString("kind", traceutil.SpanKindStr(span.Kind()))
document.AddInt("duration", int64(span.EndTimestamp()-span.StartTimestamp()))

attributeMap := span.Attributes()
addDataStreamAttributes(&document, attributeMap)
stripDataStreamAttributes(attributeMap)
document.AddAttributes("attributes", attributeMap)

document.AddInt("dropped_attributes_count", int64(span.DroppedAttributesCount()))
document.AddInt("dropped_events_count", int64(span.DroppedEventsCount()))

links := pcommon.NewValueSlice()
linkSlice := links.SetEmptySlice()
spanLinks := span.Links()
for i := 0; i < spanLinks.Len(); i++ {
linkMap := linkSlice.AppendEmpty().SetEmptyMap()
spanLink := spanLinks.At(i)
linkMap.PutStr("trace_id", spanLink.TraceID().String())
linkMap.PutStr("span_id", spanLink.SpanID().String())
linkMap.PutStr("trace_state", spanLink.TraceState().AsRaw())
mAttr := linkMap.PutEmptyMap("attributes")
spanLink.Attributes().CopyTo(mAttr)
linkMap.PutInt("dropped_attributes_count", int64(spanLink.DroppedAttributesCount()))
linkMap.PutInt("trace_flags", int64(spanLink.Flags()))
}
document.AddAttribute("links", links)

document.AddInt("dropped_links_count", int64(span.DroppedLinksCount()))
status := pcommon.NewMap()
status.PutStr("message", span.Status().Message())
status.PutStr("code", span.Status().Code().String())
document.AddAttributes("status", status)

m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

// TODO: add span events to log data streams

return document
}

func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document {
var document objmodel.Document
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
Expand All @@ -402,12 +471,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc
m.encodeEvents(&document, span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))
document.Dedup()

var buf bytes.Buffer
// OTel serialization is not supported for traces yet
err := document.Serialize(&buf, m.dedot, false)
return buf.Bytes(), err
return document
}

func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var expectedLogBodyDeDottedWithEmptyTimestamp = `{"@timestamp":"1970-01-01T00:00
func TestEncodeSpan(t *testing.T) {
model := &encodeModel{dedot: false}
td := mockResourceSpans()
spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope())
spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "")
assert.NoError(t, err)
assert.Equal(t, expectedSpanBody, string(spanByte))
}
Expand Down

0 comments on commit e7f41d2

Please sign in to comment.