diff --git a/.chloggen/kafkaexporter_zipkin_encoding.yaml b/.chloggen/kafkaexporter_zipkin_encoding.yaml new file mode 100755 index 000000000000..0e6c488eec17 --- /dev/null +++ b/.chloggen/kafkaexporter_zipkin_encoding.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adding Zipkin encoding option for traces to kafkaexporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21102] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index e03e44f0afa0..e30afbf02a6d 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -29,7 +29,9 @@ The following settings can be optionally configured: - `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. - The following encodings are valid *only* for **traces**. - `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID. - - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\ + - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID. + - `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span. + - `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span. - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `auth` diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 8b1c1b23dbf2..874f8e44b6eb 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -10,6 +10,8 @@ require ( github.com/jaegertracing/jaeger v1.41.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.80.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.80.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.80.0 + github.com/openzipkin/zipkin-go v0.4.1 github.com/stretchr/testify v1.8.4 github.com/xdg-go/scram v1.1.2 go.opentelemetry.io/collector/component v0.80.1-0.20230629144634-c3f70bd1f8ea @@ -32,7 +34,7 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect diff --git a/exporter/kafkaexporter/go.sum b/exporter/kafkaexporter/go.sum index 7143d6973d60..38456e136fe5 100644 --- a/exporter/kafkaexporter/go.sum +++ b/exporter/kafkaexporter/go.sum @@ -130,8 +130,9 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ= github.com/hashicorp/consul/sdk v0.8.0/go.mod h1:GBvyrGALthsZObzUGsfgHZQDXjg4lOjagTIwIR1vPms= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI= @@ -253,8 +254,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.80.0 h1:iyiSDzzDuQKX2BC04bZcYo85gIEXiJVw7AzQf4QSUM4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.80.0/go.mod h1:WvSStezNID55991drm0t9CJ+1beZPEozE29sgvlYtuo= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= +github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index b2df7084f472..0b6000219ab1 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -8,6 +8,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" ) // TracesMarshaler marshals traces into Message array. @@ -41,11 +43,15 @@ type LogsMarshaler interface { func tracesMarshalers() map[string]TracesMarshaler { otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding) otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json") + zipkinProto := newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto") + zipkinJSON := newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json") jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}} jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()} return map[string]TracesMarshaler{ otlpPb.Encoding(): otlpPb, otlpJSON.Encoding(): otlpJSON, + zipkinProto.Encoding(): zipkinProto, + zipkinJSON.Encoding(): zipkinJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index d1a70f47a04d..9e2e9a7ee94f 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + zipkin "github.com/openzipkin/zipkin-go/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" @@ -20,6 +21,8 @@ func TestDefaultTracesMarshalers(t *testing.T) { expectedEncodings := []string{ "otlp_proto", "otlp_json", + "zipkin_proto", + "zipkin_json", "jaeger_proto", "jaeger_json", } @@ -84,27 +87,17 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { ils.Spans().AppendEmpty() span := ils.Spans().At(0) - span.SetKind(ptrace.SpanKindInternal) - span.SetName(t.Name()) + span.SetKind(ptrace.SpanKindServer) + span.SetName("foo") span.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14}) - marshaler, ok := tracesMarshalers()["otlp_json"] - require.True(t, ok, "Must have otlp json marshaller") - - msg, err := marshaler.Marshal(traces, t.Name()) - require.NoError(t, err, "Must have marshaled the data without error") - require.Len(t, msg, 1, "Must have one entry in the message") - - data, err := msg[0].Value.Encode() - require.NoError(t, err, "Must not error when encoding value") - require.NotNil(t, data, "Must have valid data to test") - // Since marshaling json is not guaranteed to be in order // within a string, using a map to compare that the expected values are there - expectedJSON := map[string]interface{}{ + otlpJSON := map[string]interface{}{ "resourceSpans": []interface{}{ map[string]interface{}{ "resource": map[string]interface{}{}, @@ -113,11 +106,11 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { "scope": map[string]interface{}{}, "spans": []interface{}{ map[string]interface{}{ - "traceId": "", + "traceId": "0102030405060708090a0b0c0d0e0f10", "spanId": "0001020304050607", "parentSpanId": "08090a0b0c0d0e00", - "name": t.Name(), - "kind": float64(ptrace.SpanKindInternal), + "name": "foo", + "kind": float64(ptrace.SpanKindServer), "startTimeUnixNano": fmt.Sprint(now.UnixNano()), "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), "status": map[string]interface{}{}, @@ -131,9 +124,45 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { }, } - var final map[string]interface{} - err = json.Unmarshal(data, &final) - require.NoError(t, err, "Must not error marshaling expected data") + zipkinJSON := []interface{}{ + map[string]interface{}{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "id": "0001020304050607", + "parentId": "08090a0b0c0d0e00", + "name": "foo", + "timestamp": float64(time.Second.Microseconds()), + "duration": float64(time.Second.Microseconds()), + "kind": string(zipkin.Server), + "localEndpoint": map[string]interface{}{"serviceName": "otlpresourcenoservicename"}, + }, + } + + tests := []struct { + encoding string + expectedJSON interface{} + unmarshaled interface{} + }{ + {encoding: "otlp_json", expectedJSON: otlpJSON, unmarshaled: map[string]interface{}{}}, + {encoding: "zipkin_json", expectedJSON: zipkinJSON, unmarshaled: []map[string]interface{}{}}, + } + + for _, test := range tests { + + marshaler, ok := tracesMarshalers()[test.encoding] + require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding)) + + msg, err := marshaler.Marshal(traces, t.Name()) + require.NoError(t, err, "Must have marshaled the data without error") + require.Len(t, msg, 1, "Must have one entry in the message") + + data, err := msg[0].Value.Encode() + require.NoError(t, err, "Must not error when encoding value") + require.NotNil(t, data, "Must have valid data to test") - assert.Equal(t, expectedJSON, final, "Must match the expected value") + err = json.Unmarshal(data, &test.unmarshaled) + require.NoError(t, err, "Must not error marshaling expected data") + + assert.Equal(t, test.expectedJSON, test.unmarshaled, "Must match the expected value") + + } }