diff --git a/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs b/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs index 709144ab5..b22a20e05 100644 --- a/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs +++ b/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs @@ -89,15 +89,17 @@ public async Task InvokeAsync(HttpContext httpContext) envelope.Destination = $"http://localhost{httpContext.Request.Path}".ToUri(); envelope.DoNotCascadeResponse = true; envelope.Serializer = _runtime.Options.FindSerializer(envelope.ContentType); - - if (!_runtime.Pipeline.TryDeserializeEnvelope(envelope, out var continuation)) + + var deserializeResult = await _runtime.Pipeline.TryDeserializeEnvelope(envelope); + + if (deserializeResult != NullContinuation.Instance) { - if (continuation is NoHandlerContinuation) + if (deserializeResult is NoHandlerContinuation) { return Results.Problem($"No handler for the requested message type {envelope.MessageType}", statusCode:400); } - if (continuation is MoveToErrorQueue move) + if (deserializeResult is MoveToErrorQueue move) { _logger.LogError(move.Exception, "Error executing message of type {MessageType}", envelope.MessageType); return Results.Problem($"Execution error for requested message type {envelope.MessageType}", diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs index a435907e5..b92db2b8c 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs @@ -35,7 +35,7 @@ public async Task PingAsync() public async ValueTask SendAsync(Envelope envelope) { - var message = _topic.Mapper.CreateMessage(envelope); + var message = await _topic.Mapper.CreateMessage(envelope); await _producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); _producer.Flush(); diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs index e783a7b00..b56c9914e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs @@ -21,7 +21,7 @@ public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch { // TODO -- separate try/catch here! - var message = _topic.Mapper.CreateMessage(envelope); + var message = await _topic.Mapper.CreateMessage(envelope); await _producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs index 585153553..ecb7b4d36 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -126,12 +126,13 @@ internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string return envelope; } - internal static Message CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope) + internal static async ValueTask> CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope) { + var data = await envelope.GetDataAsync(); var message = new Message { Key = !string.IsNullOrEmpty(envelope.PartitionKey) ? envelope.PartitionKey : envelope.Id.ToString(), - Value = envelope.Data, + Value = data, Headers = new Headers() }; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqSender.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqSender.cs index 13e59a0db..a3866f829 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqSender.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqSender.cs @@ -11,11 +11,12 @@ namespace Wolverine.RabbitMQ.Internal; internal class RabbitMqSender : RabbitMqChannelAgent, ISender { private readonly RabbitMqEndpoint _endpoint; + private readonly RoutingMode _routingType; + private readonly IWolverineRuntime _runtime; private readonly CachedString _exchangeName; private readonly bool _isDurable; - private readonly string _key; + private readonly CachedString _routingKey; private readonly IRabbitMqEnvelopeMapper _mapper; - private readonly Func _toRoutingKey; public RabbitMqSender(RabbitMqEndpoint endpoint, RabbitMqTransport transport, RoutingMode routingType, IWolverineRuntime runtime) : base( @@ -26,29 +27,40 @@ public RabbitMqSender(RabbitMqEndpoint endpoint, RabbitMqTransport transport, _isDurable = endpoint.Mode == EndpointMode.Durable; _exchangeName = new CachedString(endpoint.ExchangeName); - _key = endpoint.RoutingKey(); - - _toRoutingKey = routingType == RoutingMode.Static ? _ => new CachedString(_key) : x => + + if (routingType == RoutingMode.Static) { - if (x.TopicName.IsEmpty() && x.Message == null) - { - try - { - runtime.Pipeline.TryDeserializeEnvelope(x, out var _); - } - catch (Exception e) - { - Logger.LogError(e, "Error trying to deserialize an envelope in order to determine the topic name"); - } - } - - return new CachedString(TopicRouting.DetermineTopicName(x)); - }; + _routingKey = new CachedString(endpoint.RoutingKey()); + } _mapper = endpoint.BuildMapper(runtime); _endpoint = endpoint; + _routingType = routingType; + _runtime = runtime; } + private async ValueTask ToRoutingKeyAsync(Envelope envelope) + { + if(_routingKey != null) + { + return _routingKey; + } + + if (envelope.TopicName.IsEmpty() && envelope.Message == null) + { + try + { + await _runtime.Pipeline.TryDeserializeEnvelope(envelope); + } + catch (Exception e) + { + Logger.LogError(e, "Error trying to deserialize an envelope in order to determine the topic name"); + } + } + return new CachedString(TopicRouting.DetermineTopicName(envelope)); + } + + public bool SupportsNativeScheduledSend => false; public Uri Destination { get; } @@ -75,7 +87,7 @@ public async ValueTask SendAsync(Envelope envelope) _mapper.MapEnvelopeToOutgoing(envelope, props); - var routingKey = _toRoutingKey(envelope); + var routingKey = await ToRoutingKeyAsync(envelope); await Channel.BasicPublishAsync(_exchangeName, routingKey, false, props, envelope.Data); } diff --git a/src/Wolverine/Envelope.cs b/src/Wolverine/Envelope.cs index f16242ed5..c81dfbe65 100644 --- a/src/Wolverine/Envelope.cs +++ b/src/Wolverine/Envelope.cs @@ -4,6 +4,7 @@ using JasperFx.MultiTenancy; using MassTransit; using Wolverine.Attributes; +using Wolverine.Runtime.Serialization; using Wolverine.Util; namespace Wolverine; @@ -120,6 +121,30 @@ public TimeSpan? ScheduleDelay get => _scheduleDelay; } + public async ValueTask GetDataAsync() + { + if (_data != null) + { + return _data; + } + AssertMessage(); + + if(Serializer is IAsyncMessageSerializer asyncMessaeSerializer) + { + try + { + _data = await asyncMessaeSerializer.WriteAsync(this); + } + catch (Exception e) + { + throw new WolverineSerializationException( + $"Error trying to serialize message of type {Message.GetType().FullNameInCode()} with serializer {Serializer}", e); + } + } + + return Data; + } + /// /// The raw, serialized message data /// @@ -132,10 +157,7 @@ public byte[]? Data return _data; } - if (_message == null) - { - throw new WolverineSerializationException($"Cannot ensure data is present when there is no message. The Message Type Name is '{MessageType}'"); - } + AssertMessage(); if (Serializer == null) { @@ -144,7 +166,7 @@ public byte[]? Data _data = serializable.Write(); return _data; } - + throw new WolverineSerializationException($"No data or writer is known for this envelope of message type {_message.GetType().FullNameInCode()}"); } @@ -163,6 +185,14 @@ public byte[]? Data set => _data = value; } + private void AssertMessage() + { + if (_message == null) + { + throw new WolverineSerializationException($"Cannot ensure data is present when there is no message. The Message Type Name is '{MessageType}'"); + } + } + internal int? MessagePayloadSize => _data?.Length; /// diff --git a/src/Wolverine/Runtime/HandlerPipeline.cs b/src/Wolverine/Runtime/HandlerPipeline.cs index f26db8a9a..60fbab3ce 100644 --- a/src/Wolverine/Runtime/HandlerPipeline.cs +++ b/src/Wolverine/Runtime/HandlerPipeline.cs @@ -5,6 +5,7 @@ using Wolverine.ErrorHandling; using Wolverine.Logging; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Serialization; using Wolverine.Transports; namespace Wolverine.Runtime; @@ -106,7 +107,7 @@ public async Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activ } } - public bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continuation) + public async ValueTask TryDeserializeEnvelope(Envelope envelope) { // Try to deserialize try @@ -127,29 +128,31 @@ public bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continua if (_graph.TryFindMessageType(envelope.MessageType, out var messageType)) { - envelope.Message = serializer.ReadFromData(messageType, envelope); + if (serializer is IAsyncMessageSerializer asyncMessageSerializer) + { + envelope.Message = await asyncMessageSerializer.ReadFromDataAsync(messageType, envelope); + } + else + { + envelope.Message = serializer.ReadFromData(messageType, envelope); + } } else { - continuation = new NoHandlerContinuation(_runtime.MissingHandlers(), _runtime); - return false; + return new NoHandlerContinuation(_runtime.MissingHandlers(), _runtime); } if (envelope.Message == null) { - continuation = new MoveToErrorQueue(new InvalidOperationException( + return new MoveToErrorQueue(new InvalidOperationException( "No message body could be de-serialized from the raw data in this envelope")); - - return false; } - continuation = NullContinuation.Instance; - return true; + return NullContinuation.Instance; } catch (Exception? e) { - continuation = new MoveToErrorQueue(e); - return false; + return new MoveToErrorQueue(e); } finally { @@ -157,19 +160,20 @@ public bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continua } } - private Task executeAsync(MessageContext context, Envelope envelope, Activity? activity) + private async Task executeAsync(MessageContext context, Envelope envelope, Activity? activity) { if (envelope.IsExpired()) { - return Task.FromResult(DiscardEnvelope.Instance); + return DiscardEnvelope.Instance; } if (envelope.Message == null) { - if (!TryDeserializeEnvelope(envelope, out var serializationError)) + var deserializationResult = await TryDeserializeEnvelope(envelope); + if(deserializationResult != NullContinuation.Instance) { activity?.SetStatus(ActivityStatusCode.Error, "Serialization Failure"); - return Task.FromResult(serializationError); + return deserializationResult; } } else @@ -180,11 +184,11 @@ private Task executeAsync(MessageContext context, Envelope envelo if (envelope.IsResponse) { _runtime.Replies.Complete(envelope); - return Task.FromResult(MessageSucceededContinuation.Instance); + return MessageSucceededContinuation.Instance; } var executor = _executors[envelope.Message!.GetType()]; - return executor.ExecuteAsync(context, _cancellation); + return await executor.ExecuteAsync(context, _cancellation); } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/IHandlerPipeline.cs b/src/Wolverine/Runtime/IHandlerPipeline.cs index 834ec9d5c..a3de9a805 100644 --- a/src/Wolverine/Runtime/IHandlerPipeline.cs +++ b/src/Wolverine/Runtime/IHandlerPipeline.cs @@ -7,5 +7,5 @@ public interface IHandlerPipeline { Task InvokeAsync(Envelope envelope, IChannelCallback channel); Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activity activity); - bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continuation); + ValueTask TryDeserializeEnvelope(Envelope envelope); } \ No newline at end of file diff --git a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs index 7b7c60fc7..e8cab699c 100644 --- a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs +++ b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs @@ -14,4 +14,14 @@ public interface IMessageSerializer object ReadFromData(byte[] data); byte[] WriteMessage(object message); +} + +/// +/// Async version of +/// +public interface IAsyncMessageSerializer : IMessageSerializer +{ + ValueTask WriteAsync(Envelope envelope); + + ValueTask ReadFromDataAsync(Type messageType, Envelope envelope); } \ No newline at end of file