Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Globalization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
Expand Down Expand Up @@ -262,24 +263,29 @@ private async ValueTask SendAsync(
CancellationToken cancellationToken)
{
Activity? activity = null;
var traceId = envelope.Headers?.Get(MessageHeaders.TraceId);
var traceState = envelope.Headers?.Get(MessageHeaders.TraceState);
var spanId = envelope.Headers?.Get(MessageHeaders.SpanId);
var traceparent = envelope.Headers?.Get(MessageHeaders.Traceparent);

if (!string.IsNullOrEmpty(traceId) && !string.IsNullOrEmpty(spanId))
if (!string.IsNullOrEmpty(traceparent))
{
var parentContext = new ActivityContext(
ActivityTraceId.CreateFromString(traceId),
ActivitySpanId.CreateFromString(spanId),
ActivityTraceFlags.Recorded,
traceState);

activity = OpenTelemetry.Source.CreateActivity(
$"outbox send {envelope.MessageId}",
ActivityKind.Client,
parentContext);

activity?.Start();
ReadOnlySpan<char> span = traceparent;
Span<Range> ranges = stackalloc Range[5];
var count = span.Split(ranges, '-');

if (count >= 4)
{
var parentContext = new ActivityContext(
ActivityTraceId.CreateFromString(span[ranges[1]]),
ActivitySpanId.CreateFromString(span[ranges[2]]),
(ActivityTraceFlags)byte.Parse(span[ranges[3]], NumberStyles.HexNumber),
envelope.Headers?.Get(MessageHeaders.Tracestate));

Comment thread
PascalSenn marked this conversation as resolved.
Outdated
activity = OpenTelemetry.Source.CreateActivity(
$"outbox send {envelope.MessageId}",
ActivityKind.Client,
parentContext);

activity?.Start();
}
}

var context = _contextPool.Get();
Expand Down
18 changes: 4 additions & 14 deletions src/Mocha/src/Mocha/Headers/MessageHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,14 @@ namespace Mocha;
internal static class MessageHeaders
{
/// <summary>
/// The distributed trace identifier, propagated from <see cref="System.Diagnostics.Activity.TraceId"/>.
/// The W3C Trace Context <c>traceparent</c> header (version-traceId-spanId-traceFlags).
/// </summary>
public static readonly ContextDataKey<string?> TraceId = new("trace-id");
public static readonly ContextDataKey<string?> Traceparent = new("traceparent");

/// <summary>
/// The span identifier, propagated from <see cref="System.Diagnostics.Activity.SpanId"/>.
/// The W3C Trace Context <c>tracestate</c> header carrying vendor-specific trace data.
/// </summary>
public static readonly ContextDataKey<string?> SpanId = new("span-id");

/// <summary>
/// The W3C trace state string, propagated from <see cref="System.Diagnostics.Activity.TraceStateString"/>.
/// </summary>
public static readonly ContextDataKey<string?> TraceState = new("trace-state");

/// <summary>
/// The parent activity identifier, propagated from <see cref="System.Diagnostics.Activity.ParentId"/>.
/// </summary>
public static readonly ContextDataKey<string?> ParentId = new("parent-id");
public static readonly ContextDataKey<string?> Tracestate = new("tracestate");

/// <summary>
/// Indicates the kind of message <see cref="MessageKind"/> it is.
Expand Down
10 changes: 6 additions & 4 deletions src/Mocha/src/Mocha/Observability/OpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ public static IHeaders WithActivity(this IHeaders headers)
return headers;
}

headers.TryAdd(MessageHeaders.TraceId, activity.TraceId.ToHexString());
headers.TryAdd(MessageHeaders.SpanId, activity.SpanId.ToHexString());
headers.TryAdd(MessageHeaders.TraceState, activity.TraceStateString);
headers.TryAdd(MessageHeaders.ParentId, activity.ParentId);
headers.TryAdd(MessageHeaders.Traceparent, activity.Id);

Comment thread
PascalSenn marked this conversation as resolved.
Outdated
if (!string.IsNullOrEmpty(activity.TraceStateString))
{
headers.TryAdd(MessageHeaders.Tracestate, activity.TraceStateString);
}

return headers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Globalization;
using Mocha.Middlewares;

namespace Mocha;
Expand Down Expand Up @@ -61,26 +62,32 @@ private ReceiveActivity(IReceiveContext context)
{
_context = context;

var traceId = context.Headers.Get(MessageHeaders.TraceId);
var traceState = context.Headers.Get(MessageHeaders.TraceState);
var spanId = context.Headers.Get(MessageHeaders.SpanId);
var traceparent = context.Headers.Get(MessageHeaders.Traceparent);

Activity? activity = null;

if (!string.IsNullOrEmpty(traceId) && !string.IsNullOrEmpty(spanId))
if (!string.IsNullOrEmpty(traceparent))
{
var parentContext = new ActivityContext(
ActivityTraceId.CreateFromString(traceId),
ActivitySpanId.CreateFromString(spanId),
ActivityTraceFlags.Recorded,
traceState);

activity = OpenTelemetry.Source.CreateActivity(
$"receive {context.Endpoint.Address}",
ActivityKind.Client,
parentContext);

activity?.Start();
ReadOnlySpan<char> span = traceparent;
Span<Range> ranges = stackalloc Range[5];
var count = span.Split(ranges, '-');

if (count >= 4)
{
var traceId = ActivityTraceId.CreateFromString(span[ranges[1]]);
var spanId = ActivitySpanId.CreateFromString(span[ranges[2]]);
var flags = (ActivityTraceFlags)byte.Parse(span[ranges[3]], NumberStyles.HexNumber);
var traceState = context.Headers.Get(MessageHeaders.Tracestate);

var parentContext = new ActivityContext(traceId, spanId, flags, traceState);

Comment thread
PascalSenn marked this conversation as resolved.
Outdated
activity = OpenTelemetry.Source.CreateActivity(
$"receive {context.Endpoint.Address}",
ActivityKind.Client,
parentContext);

activity?.Start();
}
Comment thread
PascalSenn marked this conversation as resolved.
Outdated
}

activity ??= OpenTelemetry.Source.StartActivity($"receive {context.Endpoint.Address}", ActivityKind.Client);
Expand Down
74 changes: 45 additions & 29 deletions src/Mocha/test/Mocha.Tests/Telemetry/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async Task MessageBus_Should_ProcessMessages_When_NoListenerRegistered()
}

[Fact]
public void WithActivity_Sets_Trace_Headers_From_Current_Activity()
public void WithActivity_Sets_Traceparent_Header_From_Current_Activity()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -166,20 +166,17 @@ public void WithActivity_Sets_Trace_Headers_From_Current_Activity()
headers.WithActivity();

// assert
Assert.True(headers.ContainsKey(MessageHeaders.TraceId.Key));
Assert.True(headers.ContainsKey(MessageHeaders.SpanId.Key));
Assert.True(headers.ContainsKey(MessageHeaders.Traceparent.Key));

var traceId = headers.GetValue(MessageHeaders.TraceId.Key) as string;
var spanId = headers.GetValue(MessageHeaders.SpanId.Key) as string;
var traceparent = headers.GetValue(MessageHeaders.Traceparent.Key) as string;
Assert.NotNull(traceparent);

Assert.NotNull(traceId);
Assert.NotNull(spanId);
Assert.Equal(activity.TraceId.ToHexString(), traceId);
Assert.Equal(activity.SpanId.ToHexString(), spanId);
var expected = $"00-{activity.TraceId.ToHexString()}-{activity.SpanId.ToHexString()}-01";
Assert.Equal(expected, traceparent);
}

[Fact]
public void WithActivity_Sets_TraceState_When_Activity_Has_TraceState()
public void WithActivity_Sets_Tracestate_When_Activity_Has_TraceState()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -198,13 +195,13 @@ public void WithActivity_Sets_TraceState_When_Activity_Has_TraceState()
headers.WithActivity();

// assert
Assert.True(headers.ContainsKey(MessageHeaders.TraceState.Key));
var traceState = headers.GetValue(MessageHeaders.TraceState.Key) as string;
Assert.Equal("key1=value1,key2=value2", traceState);
Assert.True(headers.ContainsKey(MessageHeaders.Tracestate.Key));
var tracestate = headers.GetValue(MessageHeaders.Tracestate.Key) as string;
Assert.Equal("key1=value1,key2=value2", tracestate);
}

[Fact]
public void WithActivity_Sets_ParentId_When_Activity_Has_Parent()
public void WithActivity_Does_Not_Set_Tracestate_When_Activity_Has_No_TraceState()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -213,22 +210,17 @@ public void WithActivity_Sets_ParentId_When_Activity_Has_Parent()
listener.Sample = (ref _) => ActivitySamplingResult.AllDataAndRecorded;
ActivitySource.AddActivityListener(listener);

using var parentActivity = source.StartActivity("parent-operation");
using var childActivity = source.StartActivity("child-operation");
Assert.NotNull(childActivity);
using var activity = source.StartActivity("test-operation");
Assert.NotNull(activity);

var headers = new Headers();

// act
headers.WithActivity();

// assert
if (childActivity.ParentId != null)
{
Assert.True(headers.ContainsKey(MessageHeaders.ParentId.Key));
var parentId = headers.GetValue(MessageHeaders.ParentId.Key) as string;
Assert.Equal(childActivity.ParentId, parentId);
}
Assert.True(headers.ContainsKey(MessageHeaders.Traceparent.Key));
Assert.False(headers.ContainsKey(MessageHeaders.Tracestate.Key));
}

[Fact]
Expand All @@ -246,13 +238,13 @@ public void WithActivity_With_No_Current_Activity_Returns_Headers_Unchanged()

// assert
Assert.Same(headers, result);
Assert.False(headers.ContainsKey(MessageHeaders.TraceId.Key));
Assert.False(headers.ContainsKey(MessageHeaders.SpanId.Key));
Assert.False(headers.ContainsKey(MessageHeaders.Traceparent.Key));
Assert.False(headers.ContainsKey(MessageHeaders.Tracestate.Key));
Assert.Equal("existing-value", headers.GetValue("existing-header"));
}

[Fact]
public void WithActivity_Does_Not_Overwrite_Existing_Trace_Headers()
public void WithActivity_Does_Not_Overwrite_Existing_Traceparent_Header()
{
// arrange
using var source = new ActivitySource("test-source");
Expand All @@ -265,14 +257,38 @@ public void WithActivity_Does_Not_Overwrite_Existing_Trace_Headers()
Assert.NotNull(activity);

var headers = new Headers();
headers.Set(MessageHeaders.TraceId.Key, "existing-trace-id");
headers.Set(MessageHeaders.Traceparent.Key, "00-existing-trace-id-span-01");

// act
headers.WithActivity();

// assert - TryAdd should not overwrite existing value
var traceId = headers.GetValue(MessageHeaders.TraceId.Key) as string;
Assert.Equal("existing-trace-id", traceId);
var traceparent = headers.GetValue(MessageHeaders.Traceparent.Key) as string;
Assert.Equal("00-existing-trace-id-span-01", traceparent);
}

[Fact]
public void WithActivity_Sets_Unsampled_TraceFlags_When_Activity_Not_Recorded()
{
// arrange
using var source = new ActivitySource("test-source");
using var listener = new ActivityListener();
listener.ShouldListenTo = _ => true;
listener.Sample = (ref _) => ActivitySamplingResult.PropagationData;
ActivitySource.AddActivityListener(listener);

using var activity = source.StartActivity("test-operation");
Assert.NotNull(activity);

var headers = new Headers();

// act
headers.WithActivity();

// assert
var traceparent = headers.GetValue(MessageHeaders.Traceparent.Key) as string;
Assert.NotNull(traceparent);
Assert.EndsWith("-00", traceparent);
}

[Fact]
Expand Down
6 changes: 3 additions & 3 deletions website/src/docs/mocha/v1/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ In the Aspire Dashboard, you will see the `publish` span from the publishing ser

# How trace context propagates

When the dispatch instrumentation middleware runs, it writes the current `Activity`'s trace context into the outgoing message headers. The receive instrumentation middleware on the other side reads those headers and restores the parent context, linking the two spans into a single trace.
When the dispatch instrumentation middleware runs, it writes the current `Activity`'s trace context into the outgoing message headers using the [W3C Trace Context](https://www.w3.org/TR/trace-context/) standard (`traceparent` and `tracestate`). The receive instrumentation middleware on the other side reads those headers and restores the parent context, linking the two spans into a single trace. This is the same propagation format used by ASP.NET Core, HttpClient, and the broader OpenTelemetry ecosystem, so Mocha traces connect seamlessly with spans from other frameworks.

```mermaid
sequenceDiagram
Expand All @@ -72,8 +72,8 @@ sequenceDiagram
participant B as Service B

A->>A: dispatch span (Producer)
Note over A: Writes trace-id, span-id to headers
A->>Broker: Message + trace headers
Note over A: Writes traceparent, tracestate to headers
A->>Broker: Message + W3C trace headers
Broker->>B: Deliver message
B->>B: receive span (Client, linked to parent)
B->>B: consumer span (Consumer)
Expand Down
Loading