Skip to content

Commit

Permalink
otel trace (apache#1774)
Browse files Browse the repository at this point in the history
  • Loading branch information
pherzheyu committed May 10, 2022
1 parent b27ec53 commit 43e2705
Show file tree
Hide file tree
Showing 12 changed files with 1,147 additions and 0 deletions.
2 changes: 2 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const (
TpsLimitFilterKey = "tps"
TracingFilterKey = "tracing"
XdsCircuitBreakerKey = "xds_circuit_reaker"
OTELServerTraceKey = "otelServerTrace"
OTELClientTraceKey = "otelClientTrace"
)

const (
Expand Down
88 changes: 88 additions & 0 deletions filter/otel/trace/attachment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package trace

import (
"context"
)

import (
"go.opentelemetry.io/otel/baggage"

"go.opentelemetry.io/otel/propagation"

"go.opentelemetry.io/otel/trace"
)

type metadataSupplier struct {
metadata *map[string]interface{}
}

var _ propagation.TextMapCarrier = &metadataSupplier{}

func (s *metadataSupplier) Get(key string) string {
if s.metadata == nil {
return ""
}
item, ok := (*s.metadata)[key]
if !ok {
return ""
}
value, ok := item.([]string)
if !ok {
return ""
}
if len(value) == 0 {
return ""
}
return value[0]
}

func (s *metadataSupplier) Set(key string, value string) {
if s.metadata == nil {
s.metadata = &map[string]interface{}{}
}
(*s.metadata)[key] = value
}

func (s *metadataSupplier) Keys() []string {
out := make([]string, 0, len(*s.metadata))
for key := range *s.metadata {
out = append(out, key)
}
return out
}

// Inject injects correlation context and span context into the dubbo
// metadata object. This function is meant to be used on outgoing
// requests.
func Inject(ctx context.Context, metadata *map[string]interface{}, propagators propagation.TextMapPropagator) {
propagators.Inject(ctx, &metadataSupplier{
metadata: metadata,
})
}

// Extract returns the correlation context and span context that
// another service encoded in the dubbo metadata object with Inject.
// This function is meant to be used on incoming requests.
func Extract(ctx context.Context, metadata *map[string]interface{}, propagators propagation.TextMapPropagator) (baggage.Baggage, trace.SpanContext) {
ctx = propagators.Extract(ctx, &metadataSupplier{
metadata: metadata,
})
return baggage.FromContext(ctx), trace.SpanContextFromContext(ctx)
}
131 changes: 131 additions & 0 deletions filter/otel/trace/attachment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package trace

import (
"reflect"
"testing"
)

func Test_metadataSupplier_Keys(t *testing.T) {
tests := []struct {
name string
metadata *map[string]interface{}
want []string
}{
{
name: "test",
metadata: &map[string]interface{}{
"key1": nil,
},
want: []string{"key1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &metadataSupplier{
metadata: tt.metadata,
}
if got := s.Keys(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Keys() = %v, want %v", got, tt.want)
}
})
}
}

func Test_metadataSupplier_Set(t *testing.T) {
tests := []struct {
name string
metadata *map[string]interface{}
key string
value string
}{
{
name: "test",
metadata: nil,
key: "key",
value: "value",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &metadataSupplier{
metadata: tt.metadata,
}
s.Set(tt.key, tt.value)
})
}
}

func Test_metadataSupplier_Get(t *testing.T) {
tests := []struct {
name string
metadata *map[string]interface{}
key string
want string
}{
{
name: "nil metadata",
metadata: nil,
key: "key",
want: "",
},
{
name: "not exist",
metadata: &map[string]interface{}{
"k": nil,
},
key: "key",
want: "",
},
{
name: "nil slice",
metadata: &map[string]interface{}{
"key": nil,
},
key: "key",
want: "",
},
{
name: "empty slice",
metadata: &map[string]interface{}{
"key": []string{},
},
key: "key",
want: "",
},
{
name: "test",
metadata: &map[string]interface{}{
"key": []string{"test"},
},
key: "key",
want: "test",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &metadataSupplier{
metadata: tt.metadata,
}
if got := s.Get(tt.key); got != tt.want {
t.Errorf("Get() = %v, want %v", got, tt.want)
}
})
}
}
20 changes: 20 additions & 0 deletions filter/otel/trace/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package trace instruments dubbogo with open-telemetry
// (https://github.com/open-telemetry/opentelemetry-go).
package trace
141 changes: 141 additions & 0 deletions filter/otel/trace/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package trace

import (
"context"
)

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"go.opentelemetry.io/otel/trace"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

const (
instrumentationName = "dubbo.apache.org/dubbo-go/v3/oteldubbo"
)

func init() {
extension.SetFilter(constant.OTELServerTraceKey, func() filter.Filter {
return &otelServerFilter{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
}
})
extension.SetFilter(constant.OTELClientTraceKey, func() filter.Filter {
return &otelClientFilter{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
}
})
}

type otelServerFilter struct {
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
}

func (f *otelServerFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, protocol protocol.Invocation) protocol.Result {
return result
}

func (f *otelServerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
attachments := invocation.Attachments()
bags, spanCtx := Extract(ctx, &attachments, f.Propagators)
ctx = baggage.ContextWithBaggage(ctx, bags)

tracer := f.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
invocation.ActualMethodName(),
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
RPCSystemDubbo,
semconv.RPCServiceKey.String(invoker.GetURL().ServiceKey()),
semconv.RPCMethodKey.String(invocation.MethodName()),
),
)
defer span.End()

result := invoker.Invoke(ctx, invocation)

if result.Error() != nil {
span.SetStatus(codes.Error, result.Error().Error())
} else {
span.SetStatus(codes.Ok, codes.Ok.String())
}
return result
}

type otelClientFilter struct {
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
}

func (f *otelClientFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, protocol protocol.Invocation) protocol.Result {
return result
}

func (f *otelClientFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
tracer := f.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

var span trace.Span
ctx, span = tracer.Start(
ctx,
invocation.ActualMethodName(),
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
RPCSystemDubbo,
semconv.RPCServiceKey.String(invoker.GetURL().ServiceKey()),
semconv.RPCMethodKey.String(invocation.MethodName()),
),
)
defer span.End()

attachments := make(map[string]interface{})
Inject(ctx, &(attachments), f.Propagators)
for k, v := range attachments {
invocation.SetAttachment(k, v)
}
result := invoker.Invoke(ctx, invocation)

if result.Error() != nil {
span.SetStatus(codes.Error, result.Error().Error())
} else {
span.SetStatus(codes.Ok, codes.Ok.String())
}
return result
}
Loading

0 comments on commit 43e2705

Please sign in to comment.