Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,5 @@ Wilhuff
Wunder
xunit
Expando
traceparent
tracestate
Original file line number Diff line number Diff line change
Expand Up @@ -262,24 +262,20 @@ private async ValueTask SendAsync(
CancellationToken cancellationToken)
{
Activity? activity = null;
var traceId = envelope.Headers?.Get(MessageHeaders.TraceId);
var traceState = envelope.Headers?.Get(MessageHeaders.TraceState);
var spanId = envelope.Headers?.Get(MessageHeaders.SpanId);
var traceparent = envelope.Headers?.Get(MessageHeaders.Traceparent);

if (!string.IsNullOrEmpty(traceId) && !string.IsNullOrEmpty(spanId))
if (!string.IsNullOrEmpty(traceparent))
{
var parentContext = new ActivityContext(
ActivityTraceId.CreateFromString(traceId),
ActivitySpanId.CreateFromString(spanId),
ActivityTraceFlags.Recorded,
traceState);

activity = OpenTelemetry.Source.CreateActivity(
$"outbox send {envelope.MessageId}",
ActivityKind.Client,
parentContext);

activity?.Start();
var tracestate = envelope.Headers?.Get(MessageHeaders.Tracestate);
if (ActivityContext.TryParse(traceparent, tracestate, out var parentContext))
{
activity = OpenTelemetry.Source.CreateActivity(
$"outbox send {envelope.MessageId}",
ActivityKind.Client,
parentContext);

activity?.Start();
}
}

var context = _contextPool.Get();
Expand Down
18 changes: 4 additions & 14 deletions src/Mocha/src/Mocha/Headers/MessageHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,14 @@ namespace Mocha;
internal static class MessageHeaders
{
/// <summary>
/// The distributed trace identifier, propagated from <see cref="System.Diagnostics.Activity.TraceId"/>.
/// The W3C Trace Context <c>traceparent</c> header (version-traceId-spanId-traceFlags).
/// </summary>
public static readonly ContextDataKey<string?> TraceId = new("trace-id");
public static readonly ContextDataKey<string?> Traceparent = new("traceparent");

/// <summary>
/// The span identifier, propagated from <see cref="System.Diagnostics.Activity.SpanId"/>.
/// The W3C Trace Context <c>tracestate</c> header carrying vendor-specific trace data.
/// </summary>
public static readonly ContextDataKey<string?> SpanId = new("span-id");

/// <summary>
/// The W3C trace state string, propagated from <see cref="System.Diagnostics.Activity.TraceStateString"/>.
/// </summary>
public static readonly ContextDataKey<string?> TraceState = new("trace-state");

/// <summary>
/// The parent activity identifier, propagated from <see cref="System.Diagnostics.Activity.ParentId"/>.
/// </summary>
public static readonly ContextDataKey<string?> ParentId = new("parent-id");
public static readonly ContextDataKey<string?> Tracestate = new("tracestate");

/// <summary>
/// Indicates the kind of message <see cref="MessageKind"/> it is.
Expand Down
14 changes: 10 additions & 4 deletions src/Mocha/src/Mocha/Observability/OpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ public static IHeaders WithActivity(this IHeaders headers)
return headers;
}

headers.TryAdd(MessageHeaders.TraceId, activity.TraceId.ToHexString());
headers.TryAdd(MessageHeaders.SpanId, activity.SpanId.ToHexString());
headers.TryAdd(MessageHeaders.TraceState, activity.TraceStateString);
headers.TryAdd(MessageHeaders.ParentId, activity.ParentId);
var traceparent = TraceparentHelper.FormatTraceparent(activity);
if (traceparent is not null)
{
headers.TryAdd(MessageHeaders.Traceparent, traceparent);
}

if (!string.IsNullOrEmpty(activity.TraceStateString))
{
headers.TryAdd(MessageHeaders.Tracestate, activity.TraceStateString);
}

return headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,22 @@ private ReceiveActivity(IReceiveContext context)
{
_context = context;

var traceId = context.Headers.Get(MessageHeaders.TraceId);
var traceState = context.Headers.Get(MessageHeaders.TraceState);
var spanId = context.Headers.Get(MessageHeaders.SpanId);
var traceparent = context.Headers.Get(MessageHeaders.Traceparent);

Activity? activity = null;

if (!string.IsNullOrEmpty(traceId) && !string.IsNullOrEmpty(spanId))
if (!string.IsNullOrEmpty(traceparent))
{
var parentContext = new ActivityContext(
ActivityTraceId.CreateFromString(traceId),
ActivitySpanId.CreateFromString(spanId),
ActivityTraceFlags.Recorded,
traceState);

activity = OpenTelemetry.Source.CreateActivity(
$"receive {context.Endpoint.Address}",
ActivityKind.Client,
parentContext);

activity?.Start();
var traceState = context.Headers.Get(MessageHeaders.Tracestate);
if (ActivityContext.TryParse(traceparent, traceState, out var parentContext))
{
activity = OpenTelemetry.Source.CreateActivity(
$"receive {context.Endpoint.Address}",
ActivityKind.Client,
parentContext);

activity?.Start();
}
}

activity ??= OpenTelemetry.Source.StartActivity($"receive {context.Endpoint.Address}", ActivityKind.Client);
Expand Down
62 changes: 62 additions & 0 deletions src/Mocha/src/Mocha/Observability/TraceparentHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System.Diagnostics;

namespace Mocha;

internal static class TraceparentHelper
{
private const int TraceparentLength = 55;

/// <summary>
/// Formats a W3C traceparent header value from the given activity.
/// Returns null if the activity has no valid trace or span ID.
/// Format: "00-{traceId 32 hex}-{spanId 16 hex}-{flags 2 hex}"
/// </summary>
internal static string? FormatTraceparent(Activity activity)
{
var traceId = activity.TraceId;
var spanId = activity.SpanId;

if (traceId == default || spanId == default)
{
return null;
}

return FormatTraceparent(traceId, spanId, activity.ActivityTraceFlags);
}

internal static string FormatTraceparent(
ActivityTraceId traceId,
ActivitySpanId spanId,
ActivityTraceFlags flags)
{
Span<byte> traceIdBytes = stackalloc byte[16];
Span<byte> spanIdBytes = stackalloc byte[8];
traceId.CopyTo(traceIdBytes);
spanId.CopyTo(spanIdBytes);

Span<char> buffer = stackalloc char[TraceparentLength];
buffer[0] = '0';
buffer[1] = '0';
buffer[2] = '-';
HexEncode(traceIdBytes, buffer[3..]);
buffer[35] = '-';
HexEncode(spanIdBytes, buffer[36..]);
buffer[52] = '-';
((byte)flags).TryFormat(buffer[53..], out _, "x2");

return new string(buffer);
}

private static void HexEncode(ReadOnlySpan<byte> bytes, Span<char> destination)
{
for (var i = 0; i < bytes.Length; i++)
{
var b = bytes[i];
destination[i * 2] = ToHexChar(b >> 4);
destination[i * 2 + 1] = ToHexChar(b & 0xF);
}
}

private static char ToHexChar(int value)
=> (char)(value < 10 ? '0' + value : 'a' + value - 10);
}
74 changes: 45 additions & 29 deletions src/Mocha/test/Mocha.Tests/Telemetry/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async Task MessageBus_Should_ProcessMessages_When_NoListenerRegistered()
}

[Fact]
public void WithActivity_Sets_Trace_Headers_From_Current_Activity()
public void WithActivity_Sets_Traceparent_Header_From_Current_Activity()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -166,20 +166,17 @@ public void WithActivity_Sets_Trace_Headers_From_Current_Activity()
headers.WithActivity();

// assert
Assert.True(headers.ContainsKey(MessageHeaders.TraceId.Key));
Assert.True(headers.ContainsKey(MessageHeaders.SpanId.Key));
Assert.True(headers.ContainsKey(MessageHeaders.Traceparent.Key));

var traceId = headers.GetValue(MessageHeaders.TraceId.Key) as string;
var spanId = headers.GetValue(MessageHeaders.SpanId.Key) as string;
var traceparent = headers.GetValue(MessageHeaders.Traceparent.Key) as string;
Assert.NotNull(traceparent);

Assert.NotNull(traceId);
Assert.NotNull(spanId);
Assert.Equal(activity.TraceId.ToHexString(), traceId);
Assert.Equal(activity.SpanId.ToHexString(), spanId);
var expected = $"00-{activity.TraceId.ToHexString()}-{activity.SpanId.ToHexString()}-01";
Assert.Equal(expected, traceparent);
}

[Fact]
public void WithActivity_Sets_TraceState_When_Activity_Has_TraceState()
public void WithActivity_Sets_Tracestate_When_Activity_Has_TraceState()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -198,13 +195,13 @@ public void WithActivity_Sets_TraceState_When_Activity_Has_TraceState()
headers.WithActivity();

// assert
Assert.True(headers.ContainsKey(MessageHeaders.TraceState.Key));
var traceState = headers.GetValue(MessageHeaders.TraceState.Key) as string;
Assert.Equal("key1=value1,key2=value2", traceState);
Assert.True(headers.ContainsKey(MessageHeaders.Tracestate.Key));
var tracestate = headers.GetValue(MessageHeaders.Tracestate.Key) as string;
Assert.Equal("key1=value1,key2=value2", tracestate);
}

[Fact]
public void WithActivity_Sets_ParentId_When_Activity_Has_Parent()
public void WithActivity_Does_Not_Set_Tracestate_When_Activity_Has_No_TraceState()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -213,22 +210,17 @@ public void WithActivity_Sets_ParentId_When_Activity_Has_Parent()
listener.Sample = (ref _) => ActivitySamplingResult.AllDataAndRecorded;
ActivitySource.AddActivityListener(listener);

using var parentActivity = source.StartActivity("parent-operation");
using var childActivity = source.StartActivity("child-operation");
Assert.NotNull(childActivity);
using var activity = source.StartActivity("test-operation");
Assert.NotNull(activity);

var headers = new Headers();

// act
headers.WithActivity();

// assert
if (childActivity.ParentId != null)
{
Assert.True(headers.ContainsKey(MessageHeaders.ParentId.Key));
var parentId = headers.GetValue(MessageHeaders.ParentId.Key) as string;
Assert.Equal(childActivity.ParentId, parentId);
}
Assert.True(headers.ContainsKey(MessageHeaders.Traceparent.Key));
Assert.False(headers.ContainsKey(MessageHeaders.Tracestate.Key));
}

[Fact]
Expand All @@ -246,13 +238,13 @@ public void WithActivity_With_No_Current_Activity_Returns_Headers_Unchanged()

// assert
Assert.Same(headers, result);
Assert.False(headers.ContainsKey(MessageHeaders.TraceId.Key));
Assert.False(headers.ContainsKey(MessageHeaders.SpanId.Key));
Assert.False(headers.ContainsKey(MessageHeaders.Traceparent.Key));
Assert.False(headers.ContainsKey(MessageHeaders.Tracestate.Key));
Assert.Equal("existing-value", headers.GetValue("existing-header"));
}

[Fact]
public void WithActivity_Does_Not_Overwrite_Existing_Trace_Headers()
public void WithActivity_Does_Not_Overwrite_Existing_Traceparent_Header()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -265,14 +257,38 @@ public void WithActivity_Does_Not_Overwrite_Existing_Trace_Headers()
Assert.NotNull(activity);

var headers = new Headers();
headers.Set(MessageHeaders.TraceId.Key, "existing-trace-id");
headers.Set(MessageHeaders.Traceparent.Key, "00-existing-trace-id-span-01");

// act
headers.WithActivity();

// assert - TryAdd should not overwrite existing value
var traceId = headers.GetValue(MessageHeaders.TraceId.Key) as string;
Assert.Equal("existing-trace-id", traceId);
var traceparent = headers.GetValue(MessageHeaders.Traceparent.Key) as string;
Assert.Equal("00-existing-trace-id-span-01", traceparent);
}

[Fact]
public void WithActivity_Sets_Unsampled_TraceFlags_When_Activity_Not_Recorded()
{
// arrange
using var source = new ActivitySource("test-source");
using var listener = new ActivityListener();
listener.ShouldListenTo = _ => true;
listener.Sample = (ref _) => ActivitySamplingResult.PropagationData;
ActivitySource.AddActivityListener(listener);

using var activity = source.StartActivity("test-operation");
Assert.NotNull(activity);

var headers = new Headers();

// act
headers.WithActivity();

// assert
var traceparent = headers.GetValue(MessageHeaders.Traceparent.Key) as string;
Assert.NotNull(traceparent);
Assert.EndsWith("-00", traceparent);
}

[Fact]
Expand Down
Loading
Loading