Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][adjuster] Implement otlp to model translator with post processing #6394

Merged
merged 14 commits into from
Dec 23, 2024
Merged
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ linters-settings:
files:
- "**_test.go"

# TODO: enable after import is not used anywhere
# disallow-otel-contrib-translator:
# deny:
# - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
# desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
# files:
# - "!**/jptrace/**"

goimports:
local-prefixes: github.com/jaegertracing/jaeger
gosec:
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/confignet"
Expand All @@ -23,6 +22,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

Expand Down Expand Up @@ -108,7 +108,7 @@ type consumerDelegate struct {
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := otlp2jaeger.ProtoFromTraces(td)
batches := jptrace.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"context"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
)
Expand Down Expand Up @@ -65,7 +65,7 @@ func (tp *traceProcessor) close(context.Context) error {
}

func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
batches := otlp2jaeger.ProtoFromTraces(td)
batches := jptrace.ProtoFromTraces(td)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package app
import (
"fmt"

model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
)

Expand All @@ -18,7 +18,7 @@ func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err)
}
jaegerBatches := model2otel.ProtoFromTraces(otlpTraces)
jaegerBatches := jptrace.ProtoFromTraces(otlpTraces)
var traces []*model.Trace
traceMap := make(map[model.TraceID]*model.Trace)
for _, batch := range jaegerBatches {
Expand Down
58 changes: 58 additions & 0 deletions internal/jptrace/translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jptrace

import (
jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarnings(traces, spanMap)
return batches
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
for _, span := range batch.Spans {
spanMap[span.SpanID] = span
}
}
return spanMap
}

func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
spans := scopes.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
otelSpan := spans.At(k)
warnings := GetWarnings(otelSpan)
span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
}
}
}
}

func filterTags(tags []model.KeyValue, keyToRemove string) []model.KeyValue {
var filteredTags []model.KeyValue
for _, tag := range tags {
if tag.Key != keyToRemove {
filteredTags = append(filteredTags, tag)
}
}
return filteredTags
}
55 changes: 55 additions & 0 deletions internal/jptrace/translator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jptrace

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

func TestProtoFromTraces_AddsWarnings(t *testing.T) {
traces := ptrace.NewTraces()
rs1 := traces.ResourceSpans().AppendEmpty()
ss1 := rs1.ScopeSpans().AppendEmpty()
span1 := ss1.Spans().AppendEmpty()
span1.SetName("test-span-1")
span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
AddWarning(span1, "test-warning-1")
AddWarning(span1, "test-warning-2")
span1.Attributes().PutStr("key", "value")

ss2 := rs1.ScopeSpans().AppendEmpty()
span2 := ss2.Spans().AppendEmpty()
span2.SetName("test-span-2")
span2.SetSpanID(pcommon.SpanID([8]byte{9, 10, 11, 12, 13, 14, 15, 16}))

rs2 := traces.ResourceSpans().AppendEmpty()
ss3 := rs2.ScopeSpans().AppendEmpty()
span3 := ss3.Spans().AppendEmpty()
span3.SetName("test-span-3")
span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24}))
AddWarning(span3, "test-warning-3")

batches := ProtoFromTraces(traces)

assert.Len(t, batches, 2)

assert.Len(t, batches[0].Spans, 2)
assert.Equal(t, "test-span-1", batches[0].Spans[0].OperationName)
assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, batches[0].Spans[0].Warnings)
assert.Equal(t, []model.KeyValue{{Key: "key", VStr: "value"}}, batches[0].Spans[0].Tags)
assert.Equal(t, "test-span-2", batches[0].Spans[1].OperationName)
assert.Empty(t, batches[0].Spans[1].Warnings)
assert.Empty(t, batches[0].Spans[1].Tags)

assert.Len(t, batches[1].Spans, 1)
assert.Equal(t, "test-span-3", batches[1].Spans[0].OperationName)
assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings)
assert.Empty(t, batches[1].Spans[0].Tags)
}
16 changes: 16 additions & 0 deletions model/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,19 @@ func (s *SpanID) UnmarshalJSON(data []byte) error {
func (s *SpanID) UnmarshalJSONPB(_ *jsonpb.Unmarshaler, b []byte) error {
return s.UnmarshalJSON(b)
}

// ToOTELSpanID converts the SpanID to OTEL's representation of a span identitfier.
// This was taken from
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go.
func (s SpanID) ToOTELSpanID() pcommon.SpanID {
spanID := [8]byte{}
binary.BigEndian.PutUint64(spanID[:], uint64(s))
return pcommon.SpanID(spanID)
}

// ToOTELSpanID converts OTEL's SpanID to the model representation of a span identitfier.
// This was taken from
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go.
func SpanIDFromOTEL(spanID pcommon.SpanID) SpanID {
return SpanID(binary.BigEndian.Uint64(spanID[:]))
}
52 changes: 52 additions & 0 deletions model/ids_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,55 @@ func TestTraceIDFromOTEL(t *testing.T) {
}
require.Equal(t, expected, model.TraceIDFromOTEL(otelTraceID))
}

func TestToOTELSpanID(t *testing.T) {
tests := []struct {
name string
spanID model.SpanID
expected pcommon.SpanID
}{
{
name: "zero span ID",
spanID: model.NewSpanID(0),
expected: pcommon.NewSpanIDEmpty(),
},
{
name: "non-zero span ID",
spanID: model.NewSpanID(1),
expected: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := test.spanID.ToOTELSpanID()
assert.Equal(t, test.expected, actual)
})
}
}

func TestSpanIDFromOTEL(t *testing.T) {
tests := []struct {
name string
otelSpanID pcommon.SpanID
expected model.SpanID
}{
{
name: "zero span ID",
otelSpanID: pcommon.NewSpanIDEmpty(),
expected: model.NewSpanID(0),
},
{
name: "non-zero span ID",
otelSpanID: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}),
expected: model.NewSpanID(1),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := model.SpanIDFromOTEL(test.otelSpanID)
assert.Equal(t, test.expected, actual)
})
}
}
4 changes: 2 additions & 2 deletions storage_v2/v1adapter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"context"
"errors"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)
Expand All @@ -26,7 +26,7 @@ func NewTraceWriter(spanWriter spanstore.Writer) tracestore.Writer {

// WriteTraces implements tracestore.Writer.
func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error {
batches := otlp2jaeger.ProtoFromTraces(td)
batches := jptrace.ProtoFromTraces(td)
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
Expand Down
Loading