diff --git a/src/Transports/AWS/Wolverine.AmazonSns/Internal/MassTransitMapper.cs b/src/Transports/AWS/Wolverine.AmazonSns/Internal/MassTransitMapper.cs index 4a10c21bc..a5e1f985c 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns/Internal/MassTransitMapper.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns/Internal/MassTransitMapper.cs @@ -25,7 +25,12 @@ public string BuildMessageBody(Envelope envelope) public IEnumerable> ToAttributes(Envelope envelope) { - yield break; + if (!string.IsNullOrEmpty(envelope.ParentId)) + { + yield return new KeyValuePair( + MassTransitHeaders.ActivityId, + new MessageAttributeValue { DataType = "String", StringValue = envelope.ParentId }); + } } public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary attributes) @@ -33,6 +38,11 @@ public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary< // TODO -- this could be more efficient of course envelope.Data = Encoding.UTF8.GetBytes(messageBody); envelope.Serializer = _serializer; + + if (attributes.TryGetValue(MassTransitHeaders.ActivityId, out var activityId)) + { + envelope.ParentId = activityId.StringValue; + } } } \ No newline at end of file diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs index db7f22c80..42d8980c3 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs @@ -28,17 +28,27 @@ public string BuildMessageBody(Envelope envelope) public IEnumerable> ToAttributes(Envelope envelope) { - yield break; + if (!string.IsNullOrEmpty(envelope.ParentId)) + { + yield return new KeyValuePair( + MassTransitHeaders.ActivityId, + new MessageAttributeValue { DataType = "String", StringValue = envelope.ParentId }); + } } public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary attributes) { // TODO -- this could be more efficient of course envelope.Data = Encoding.UTF8.GetBytes(messageBody); - + // This is the really important part // of the mapping envelope.Serializer = _serializer; + + if (attributes.TryGetValue(MassTransitHeaders.ActivityId, out var activityId)) + { + envelope.ParentId = activityId.StringValue; + } } } diff --git a/src/Wolverine/AssemblyAttributes.cs b/src/Wolverine/AssemblyAttributes.cs index c93031734..b61a321ec 100644 --- a/src/Wolverine/AssemblyAttributes.cs +++ b/src/Wolverine/AssemblyAttributes.cs @@ -19,6 +19,7 @@ [assembly: InternalsVisibleTo("Wolverine.RabbitMq.Tests")] [assembly: InternalsVisibleTo("Wolverine.AzureServiceBus")] [assembly: InternalsVisibleTo("Wolverine.AmazonSqs")] +[assembly: InternalsVisibleTo("Wolverine.AmazonSns")] [assembly: InternalsVisibleTo("Wolverine.ConfluentKafka")] [assembly: InternalsVisibleTo("Wolverine.AzureServiceBus.Tests")] [assembly: InternalsVisibleTo("PersistenceTests")] diff --git a/src/Wolverine/Runtime/Interop/MassTransit/MassTransitHeaders.cs b/src/Wolverine/Runtime/Interop/MassTransit/MassTransitHeaders.cs index 944567510..a3ff910c0 100644 --- a/src/Wolverine/Runtime/Interop/MassTransit/MassTransitHeaders.cs +++ b/src/Wolverine/Runtime/Interop/MassTransit/MassTransitHeaders.cs @@ -83,6 +83,11 @@ internal static class MassTransitHeaders /// public const string InitiatingConversationId = "MT-InitiatingConversationId"; + /// + /// The W3C Activity Id used for distributed trace propagation + /// + public const string ActivityId = "MT-Activity-Id"; + /// /// MessageId - /// diff --git a/src/Wolverine/Transports/EnvelopeMapper.cs b/src/Wolverine/Transports/EnvelopeMapper.cs index 4120cc891..2f7f35c10 100644 --- a/src/Wolverine/Transports/EnvelopeMapper.cs +++ b/src/Wolverine/Transports/EnvelopeMapper.cs @@ -119,9 +119,10 @@ public void InteropWithMassTransit(Action? configure = null { var serializer = new MassTransitJsonSerializer(e); configure?.Invoke(serializer); - + MapPropertyToHeader(x => x.MessageType!, MassTransitHeaders.MessageType); - + MapPropertyToHeader(x => x.ParentId!, MassTransitHeaders.ActivityId); + _endpoint.DefaultSerializer = serializer; var replyUri = new Lazy(() => e.MassTransitReplyUri()?.ToString() ?? string.Empty);