Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,24 @@ public string BuildMessageBody(Envelope envelope)

public IEnumerable<KeyValuePair<string, MessageAttributeValue>> ToAttributes(Envelope envelope)
{
yield break;
if (!string.IsNullOrEmpty(envelope.ParentId))
{
yield return new KeyValuePair<string, MessageAttributeValue>(
MassTransitHeaders.ActivityId,
new MessageAttributeValue { DataType = "String", StringValue = envelope.ParentId });
}
}

public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary<string, MessageAttributeValue> attributes)
{
// TODO -- this could be more efficient of course
envelope.Data = Encoding.UTF8.GetBytes(messageBody);
envelope.Serializer = _serializer;

if (attributes.TryGetValue(MassTransitHeaders.ActivityId, out var activityId))
{
envelope.ParentId = activityId.StringValue;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,27 @@ public string BuildMessageBody(Envelope envelope)

public IEnumerable<KeyValuePair<string, MessageAttributeValue>> ToAttributes(Envelope envelope)
{
yield break;
if (!string.IsNullOrEmpty(envelope.ParentId))
{
yield return new KeyValuePair<string, MessageAttributeValue>(
MassTransitHeaders.ActivityId,
new MessageAttributeValue { DataType = "String", StringValue = envelope.ParentId });
}
}

public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary<string, MessageAttributeValue> attributes)
{
// TODO -- this could be more efficient of course
envelope.Data = Encoding.UTF8.GetBytes(messageBody);

// This is the really important part
// of the mapping
envelope.Serializer = _serializer;

if (attributes.TryGetValue(MassTransitHeaders.ActivityId, out var activityId))
{
envelope.ParentId = activityId.StringValue;
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
[assembly: InternalsVisibleTo("Wolverine.RabbitMq.Tests")]
[assembly: InternalsVisibleTo("Wolverine.AzureServiceBus")]
[assembly: InternalsVisibleTo("Wolverine.AmazonSqs")]
[assembly: InternalsVisibleTo("Wolverine.AmazonSns")]
[assembly: InternalsVisibleTo("Wolverine.ConfluentKafka")]
[assembly: InternalsVisibleTo("Wolverine.AzureServiceBus.Tests")]
[assembly: InternalsVisibleTo("PersistenceTests")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ internal static class MassTransitHeaders
/// </summary>
public const string InitiatingConversationId = "MT-InitiatingConversationId";

/// <summary>
/// The W3C Activity Id used for distributed trace propagation
/// </summary>
public const string ActivityId = "MT-Activity-Id";

/// <summary>
/// MessageId - <see cref="MessageEnvelope" />
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions src/Wolverine/Transports/EnvelopeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ public void InteropWithMassTransit(Action<IMassTransitInterop>? configure = null
{
var serializer = new MassTransitJsonSerializer(e);
configure?.Invoke(serializer);

MapPropertyToHeader(x => x.MessageType!, MassTransitHeaders.MessageType);

MapPropertyToHeader(x => x.ParentId!, MassTransitHeaders.ActivityId);

_endpoint.DefaultSerializer = serializer;

var replyUri = new Lazy<string>(() => e.MassTransitReplyUri()?.ToString() ?? string.Empty);
Expand Down