-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
stats: opentelemetry GrpcTraceBinPropagator
refactor GRPCTraceBinPropagator to be exported externally Use context instead of metadata in CustomCarrier
- Loading branch information
1 parent
ac41314
commit d0ddcc0
Showing
4 changed files
with
551 additions
and
0 deletions.
There are no files selected for viewing
132 changes: 132 additions & 0 deletions
132
stats/opentelemetry/internal/grpc_trace_bin_propagator.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
/* | ||
* | ||
* Copyright 2024 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// TODO: Move out of internal as part of open telemetry API | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"encoding/base64" | ||
|
||
"go.opentelemetry.io/otel/propagation" | ||
"go.opentelemetry.io/otel/trace" | ||
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" | ||
) | ||
|
||
// GRPCTraceBinPropagator is TextMapPropagator to propagate cross-cutting | ||
// concerns as both text and binary key-value pairs within a carrier that | ||
// travels in-band across process boundaries. | ||
type GRPCTraceBinPropagator struct{} | ||
|
||
// Inject set cross-cutting concerns from the Context into the carrier. | ||
// | ||
// If carrier is carrier.CustomMapCarrier then SetBinary (fast path) is used, | ||
// otherwise Set (slow path) with encoding is used. | ||
func (p GRPCTraceBinPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) { | ||
span := trace.SpanFromContext(ctx) | ||
if !span.SpanContext().IsValid() { | ||
return | ||
} | ||
|
||
binaryData := Binary(span.SpanContext()) | ||
if binaryData == nil { | ||
return | ||
} | ||
|
||
if customCarrier, ok := carrier.(otelinternaltracing.CustomCarrier); ok { | ||
customCarrier.SetBinary(binaryData) // fast path: set the binary data without encoding | ||
return | ||
} else { | ||
carrier.Set(otelinternaltracing.GRPCTraceBinHeaderKey, base64.StdEncoding.EncodeToString(binaryData)) // slow path: set the binary data with encoding | ||
} | ||
} | ||
|
||
// Extract reads cross-cutting concerns from the carrier into a Context. | ||
// | ||
// If carrier is carrier.CustomCarrier then GetBinary (fast path) is used, | ||
// otherwise Get (slow path) with decoding is used. | ||
func (p GRPCTraceBinPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context { | ||
var binaryData []byte | ||
|
||
if customCarrier, ok := carrier.(otelinternaltracing.CustomCarrier); ok { | ||
binaryData, _ = customCarrier.GetBinary() | ||
} else { | ||
binaryData, _ = base64.StdEncoding.DecodeString(carrier.Get(otelinternaltracing.GRPCTraceBinHeaderKey)) | ||
} | ||
if binaryData == nil { | ||
return ctx | ||
} | ||
|
||
spanContext, ok := FromBinary([]byte(binaryData)) | ||
if !ok { | ||
return ctx | ||
} | ||
|
||
return trace.ContextWithRemoteSpanContext(ctx, spanContext) | ||
} | ||
|
||
// Fields returns the keys whose values are set with Inject. | ||
// | ||
// GRPCTraceBinPropagator will only have `grpc-trace-bin` field. | ||
func (p GRPCTraceBinPropagator) Fields() []string { | ||
return []string{otelinternaltracing.GRPCTraceBinHeaderKey} | ||
} | ||
|
||
// Binary returns the binary format representation of a SpanContext. | ||
// | ||
// If sc is the zero value, Binary returns nil. | ||
func Binary(sc trace.SpanContext) []byte { | ||
if sc.Equal(trace.SpanContext{}) { | ||
return nil | ||
} | ||
var b [29]byte | ||
traceID := trace.TraceID(sc.TraceID()) | ||
copy(b[2:18], traceID[:]) | ||
b[18] = 1 | ||
spanID := trace.SpanID(sc.SpanID()) | ||
copy(b[19:27], spanID[:]) | ||
b[27] = 2 | ||
b[28] = uint8(trace.TraceFlags(sc.TraceFlags())) | ||
return b[:] | ||
} | ||
|
||
// FromBinary returns the SpanContext represented by b. | ||
// | ||
// If b has an unsupported version ID or contains no TraceID, FromBinary | ||
// returns with ok==false. | ||
func FromBinary(b []byte) (sc trace.SpanContext, ok bool) { | ||
if len(b) == 0 || b[0] != 0 { | ||
return trace.SpanContext{}, false | ||
} | ||
b = b[1:] | ||
|
||
if len(b) >= 17 && b[0] == 0 { | ||
sc = sc.WithTraceID(trace.TraceID(b[1:17])) | ||
b = b[17:] | ||
} else { | ||
return trace.SpanContext{}, false | ||
} | ||
if len(b) >= 9 && b[0] == 1 { | ||
sc = sc.WithSpanID(trace.SpanID(b[1:9])) | ||
b = b[9:] | ||
} | ||
if len(b) >= 2 && b[0] == 2 { | ||
sc = sc.WithTraceFlags(trace.TraceFlags(b[1])) | ||
} | ||
return sc, true | ||
} |
116 changes: 116 additions & 0 deletions
116
stats/opentelemetry/internal/grpc_trace_bin_propagator_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* | ||
* Copyright 2024 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// TODO: Move out of internal as part of open telemetry API | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"encoding/base64" | ||
"testing" | ||
|
||
"go.opentelemetry.io/otel/propagation" | ||
"go.opentelemetry.io/otel/trace" | ||
"google.golang.org/grpc/internal/grpctest" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/stats" | ||
"google.golang.org/grpc/stats/opentelemetry/internal/tracing" | ||
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" | ||
) | ||
|
||
type s struct { | ||
grpctest.Tester | ||
} | ||
|
||
func Test(t *testing.T) { | ||
grpctest.RunSubTests(t, s{}) | ||
} | ||
|
||
func (s) TestInject(t *testing.T) { | ||
propagator := GRPCTraceBinPropagator{} | ||
spanContext := trace.NewSpanContext(trace.SpanContextConfig{ | ||
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, | ||
SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24}, | ||
TraceFlags: trace.FlagsSampled, | ||
}) | ||
traceCtx, traceCancel := context.WithCancel(context.Background()) | ||
traceCtx = trace.ContextWithSpanContext(traceCtx, spanContext) | ||
|
||
t.Run("Fast path with CustomCarrier", func(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
carrier := tracing.NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{})) | ||
propagator.Inject(traceCtx, carrier) | ||
|
||
got := stats.OutgoingTrace(*carrier.Ctx) | ||
want := Binary(spanContext) | ||
if string(got) != string(want) { | ||
t.Fatalf("got = %v, want %v", got, want) | ||
} | ||
cancel() | ||
}) | ||
|
||
t.Run("Slow path with TextMapCarrier", func(t *testing.T) { | ||
carrier := propagation.MapCarrier{} | ||
propagator.Inject(traceCtx, carrier) | ||
|
||
got := carrier.Get(otelinternaltracing.GRPCTraceBinHeaderKey) | ||
want := base64.StdEncoding.EncodeToString(Binary(spanContext)) | ||
if got != want { | ||
t.Fatalf("got = %v, want %v", got, want) | ||
} | ||
}) | ||
|
||
traceCancel() | ||
} | ||
|
||
func (s) TestExtract(t *testing.T) { | ||
propagator := GRPCTraceBinPropagator{} | ||
spanContext := trace.NewSpanContext(trace.SpanContextConfig{ | ||
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, | ||
SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24}, | ||
TraceFlags: trace.FlagsSampled, | ||
Remote: true, | ||
}) | ||
binaryData := Binary(spanContext) | ||
|
||
t.Run("Fast path with CustomCarrier", func(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
carrier := tracing.NewCustomCarrier(stats.SetIncomingTrace(ctx, binaryData)) | ||
traceCtx := propagator.Extract(ctx, carrier) | ||
got := trace.SpanContextFromContext(traceCtx) | ||
|
||
if !got.Equal(spanContext) { | ||
t.Fatalf("got = %v, want %v", got, spanContext) | ||
} | ||
cancel() | ||
}) | ||
|
||
t.Run("Slow path with TextMapCarrier", func(t *testing.T) { | ||
carrier := propagation.MapCarrier{ | ||
otelinternaltracing.GRPCTraceBinHeaderKey: base64.StdEncoding.EncodeToString(binaryData), | ||
} | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
traceCtx := propagator.Extract(ctx, carrier) | ||
got := trace.SpanContextFromContext(traceCtx) | ||
|
||
if !got.Equal(spanContext) { | ||
t.Fatalf("got = %v, want %v", got, spanContext) | ||
} | ||
cancel() | ||
}) | ||
} |
101 changes: 101 additions & 0 deletions
101
stats/opentelemetry/internal/tracing/custom_map_carrier.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* | ||
* Copyright 2024 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// Package tracing implements the OpenTelemetry carrier for context propagation | ||
// in gRPC tracing. | ||
package tracing | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"go.opentelemetry.io/otel/propagation" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/stats" | ||
) | ||
|
||
const GRPCTraceBinHeaderKey = "grpc-trace-bin" | ||
|
||
// CustomMapCarrier is a TextMapCarrier that uses gRPC context to store and | ||
// retrieve any propagated key-value pairs in text format along with binary | ||
// format for `grpc-trace-bin` header | ||
type CustomCarrier struct { | ||
propagation.TextMapCarrier | ||
|
||
Ctx *context.Context | ||
} | ||
|
||
// NewCustomCarrier creates a new CustomMapCarrier with | ||
// the given context. | ||
func NewCustomCarrier(ctx context.Context) CustomCarrier { | ||
return CustomCarrier{ | ||
Ctx: &ctx, | ||
} | ||
} | ||
|
||
// Get returns the string value associated with the passed key from the gRPC | ||
// context. | ||
func (c CustomCarrier) Get(key string) string { | ||
md, ok := metadata.FromIncomingContext(*c.Ctx) | ||
if !ok { | ||
return "" | ||
} | ||
values := md.Get(key) | ||
if len(values) == 0 { | ||
return "" | ||
} | ||
return values[0] | ||
} | ||
|
||
// Set stores the key-value pair in string format in the gRPC context. | ||
// If the key already exists, its value will be overwritten. | ||
func (c CustomCarrier) Set(key, value string) { | ||
md, ok := metadata.FromOutgoingContext(*c.Ctx) | ||
if !ok { | ||
md = metadata.MD{} | ||
} | ||
md.Set(key, value) | ||
*c.Ctx = metadata.NewOutgoingContext(*c.Ctx, md) | ||
} | ||
|
||
// GetBinary returns the binary value from the gRPC context in the incoming RPC, | ||
// associated with the header `grpc-trace-bin`. | ||
func (c CustomCarrier) GetBinary() ([]byte, error) { | ||
values := stats.Trace(*c.Ctx) | ||
if len(values) == 0 { | ||
return nil, errors.New("`grpc-trace-bin` header not found") | ||
} | ||
|
||
return values, nil | ||
} | ||
|
||
// SetBinary sets the binary value to the gRPC context, which will be sent in | ||
// the outgoing RPC with the header grpc-trace-bin. | ||
func (c CustomCarrier) SetBinary(value []byte) { | ||
*c.Ctx = stats.SetTrace(*c.Ctx, value) | ||
} | ||
|
||
// Keys lists the keys stored in the gRPC context for the outgoing RPC. | ||
func (c CustomCarrier) Keys() []string { | ||
md, _ := metadata.FromOutgoingContext(*c.Ctx) | ||
keys := make([]string, 0, len(md)) | ||
for k := range md { | ||
keys = append(keys, k) | ||
} | ||
return keys | ||
} |
Oops, something went wrong.