diff --git a/src/Wolverine.Grpc.Tests/inline_request_reply_grpc.cs b/src/Wolverine.Grpc.Tests/inline_request_reply_grpc.cs new file mode 100644 index 000000000..aa591de40 --- /dev/null +++ b/src/Wolverine.Grpc.Tests/inline_request_reply_grpc.cs @@ -0,0 +1,81 @@ +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Grpc; +using Wolverine.Runtime.RemoteInvocation; +using Xunit; + +namespace Wolverine.Grpc.Tests; + +// GH-2967: inline request/reply over the gRPC transport. The sender configures NO listening endpoint +// and NO reply tracker wiring, so InvokeAsync can only return by reading the reply envelope straight +// off the unary Call(WolverineMessage) response — otherwise it would time out. Mirrors the HTTP test. +[Collection(GrpcSerialTestsCollection.Name)] +public class inline_request_reply_grpc +{ + private const int ReceiverPort = 5188; + + private static Task startReceiverAsync() + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ListenAtGrpcPort(ReceiverPort); + opts.Discovery.DisableConventionalDiscovery(); + opts.Discovery.IncludeType(); + }) + .StartAsync(); + } + + private static Task startSenderAsync() + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Deliberately NO ListenAtGrpcPort(...).UseForReplies() — the reply rides the gRPC response. + opts.Discovery.DisableConventionalDiscovery(); + opts.PublishAllMessages().ToGrpcEndpoint("localhost", ReceiverPort); + }) + .StartAsync(); + } + + [Fact] + public async Task invoke_reads_reply_from_the_grpc_response_slot() + { + using var receiver = await startReceiverAsync(); + using var sender = await startSenderAsync(); + + var response = await sender.MessageBus().InvokeAsync(new GrpcInlinePing("Rand")); + + response.ShouldNotBeNull(); + response.Name.ShouldBe("Rand"); + } + + [Fact] + public async Task handler_failure_surfaces_as_request_reply_exception() + { + using var receiver = await startReceiverAsync(); + using var sender = await startSenderAsync(); + + var ex = await Should.ThrowAsync(async () => + await sender.MessageBus().InvokeAsync(new GrpcInlinePing("boom"))); + + ex.Message.ShouldContain("boom"); + } +} + +public record GrpcInlinePing(string Name); + +public record GrpcInlinePong(string Name); + +public class GrpcInlineHandler +{ + public GrpcInlinePong Handle(GrpcInlinePing ping) + { + if (ping.Name == "boom") + { + throw new InvalidOperationException("boom from the gRPC handler"); + } + + return new GrpcInlinePong(ping.Name); + } +} diff --git a/src/Wolverine.Grpc/GrpcEndpoint.cs b/src/Wolverine.Grpc/GrpcEndpoint.cs index 497032fbe..a26b26e80 100644 --- a/src/Wolverine.Grpc/GrpcEndpoint.cs +++ b/src/Wolverine.Grpc/GrpcEndpoint.cs @@ -1,14 +1,22 @@ +using Google.Protobuf; +using Grpc.Net.Client; using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Grpc.Internals; using Wolverine.Runtime; +using Wolverine.Runtime.RemoteInvocation; +using Wolverine.Runtime.Serialization; using Wolverine.Transports; using Wolverine.Transports.Sending; namespace Wolverine.Grpc; -public class GrpcEndpoint : Endpoint +public class GrpcEndpoint : Endpoint, IInlineRequestReplyEndpoint { + private readonly object _clientLock = new(); + private GrpcChannel? _callChannel; + private WolverineTransport.WolverineTransportClient? _callClient; + public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application) { Host = uri.Host; @@ -21,7 +29,7 @@ public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application) public override async ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) { - var listener = new GrpcListener(Uri, Port, receiver, runtime.LoggerFactory.CreateLogger()); + var listener = new GrpcListener(Uri, Port, receiver, runtime, runtime.LoggerFactory.CreateLogger()); await listener.StartAsync(); return listener; } @@ -35,4 +43,50 @@ protected override bool supportsMode(EndpointMode mode) { return mode is EndpointMode.Inline or EndpointMode.BufferedInMemory; } + + // GH-2967: gRPC's unary RPC carries the reply in the same exchange, so InvokeAsync reads the + // reply envelope straight off the Call(WolverineMessage) response — no ReplyListener, no listening + // endpoint on the sender. Selected by MessageRoute.For via IInlineRequestReplyEndpoint. + public async Task InvokeRemoteAsync(Envelope request, IWolverineRuntime runtime, CancellationToken cancellation) + { + if (request.Data == null && request.Message != null) + { + request.Serializer ??= DefaultSerializer; + request.Data = request.Serializer!.Write(request); + request.ContentType ??= request.Serializer!.ContentType; + } + + var client = resolveCallClient(); + var message = new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(request)) }; + + try + { + var response = await client.CallAsync(message, cancellationToken: cancellation); + return EnvelopeSerializer.Deserialize(response.Data.ToByteArray()); + } + catch (global::Grpc.Core.RpcException e) + { + // Transport-level failure (the receiver couldn't even produce a reply envelope): surface as + // a failure reply so the caller gets the usual WolverineRequestReplyException. + var ack = new FailureAcknowledgement + { + RequestId = request.Id, + Message = $"Inline gRPC request/reply to {Uri} failed: {e.Status.StatusCode} {e.Status.Detail}" + }; + return new Envelope { Message = ack }; + } + } + + private WolverineTransport.WolverineTransportClient resolveCallClient() + { + if (_callClient != null) return _callClient; + + lock (_clientLock) + { + _callChannel ??= GrpcChannel.ForAddress($"http://{Host}:{Port}"); + _callClient ??= new WolverineTransport.WolverineTransportClient(_callChannel); + } + + return _callClient; + } } diff --git a/src/Wolverine.Grpc/Internals/GrpcListener.cs b/src/Wolverine.Grpc/Internals/GrpcListener.cs index dad0e3001..2beab1857 100644 --- a/src/Wolverine.Grpc/Internals/GrpcListener.cs +++ b/src/Wolverine.Grpc/Internals/GrpcListener.cs @@ -11,14 +11,16 @@ namespace Wolverine.Grpc.Internals; internal class GrpcListener : IListener { private readonly IReceiver _receiver; + private readonly IWolverineRuntime _runtime; private readonly ILogger _logger; private WebApplication? _app; - public GrpcListener(Uri address, int port, IReceiver receiver, ILogger logger) + public GrpcListener(Uri address, int port, IReceiver receiver, IWolverineRuntime runtime, ILogger logger) { Address = address; Port = port; _receiver = receiver; + _runtime = runtime; _logger = logger; } @@ -41,6 +43,8 @@ internal async Task StartAsync() builder.Services.AddGrpc(); builder.Services.AddSingleton(_receiver); builder.Services.AddSingleton(this); + // GH-2967: the inline-request/reply Call handler executes against the main application's runtime. + builder.Services.AddSingleton(_runtime); _app = builder.Build(); _app.MapGrpcService(); diff --git a/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs b/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs index 354d75aa8..5da826d96 100644 --- a/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs +++ b/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs @@ -1,5 +1,10 @@ using Google.Protobuf; using Grpc.Core; +using JasperFx.Core; +using Wolverine.ErrorHandling; +using Wolverine.Runtime; +using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Serialization; using Wolverine.Transports; @@ -9,11 +14,13 @@ internal class WolverineGrpcTransportService : WolverineTransport.WolverineTrans { private readonly IReceiver _receiver; private readonly IListener _listener; + private readonly WolverineRuntime _runtime; - public WolverineGrpcTransportService(IReceiver receiver, IListener listener) + public WolverineGrpcTransportService(IReceiver receiver, IListener listener, IWolverineRuntime runtime) { _receiver = receiver; _listener = listener; + _runtime = (WolverineRuntime)runtime; } public override async Task Send(WolverineMessage request, ServerCallContext context) @@ -34,4 +41,90 @@ public override Task Ping(PingRequest request, ServerCallContext context) { return Task.FromResult(new Ack { Success = true }); } + + // GH-2967 inline request/reply: run the handler chain for the inbound envelope and return the reply + // envelope in the unary response. Mirrors the HTTP transport's /_wolverine/invoke executor. A + // handler failure comes back as a FailureAcknowledgement reply (the sender rethrows + // WolverineRequestReplyException), and the outbox is flushed (InvokeInlineAsync) before the response. + public override async Task Call(WolverineMessage request, ServerCallContext context) + { + Envelope envelope; + try + { + envelope = EnvelopeSerializer.Deserialize(request.Data.ToByteArray()); + } + catch (Exception e) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, "Invalid envelope: " + e.Message)); + } + + envelope.DoNotCascadeResponse = true; + if (envelope.ContentType.IsNotEmpty()) + { + envelope.Serializer = _runtime.Options.FindSerializer(envelope.ContentType!); + } + + var deserializeResult = await _runtime.Pipeline.TryDeserializeEnvelope(envelope); + if (deserializeResult != NullContinuation.Instance) + { + return failureReply(envelope, deserializeResult is NoHandlerContinuation + ? $"No handler for the requested message type {envelope.MessageType}" + : $"Execution error for requested message type {envelope.MessageType}"); + } + + if (envelope.ReplyRequested.IsNotEmpty()) + { + if (_runtime.Handlers.TryFindMessageType(envelope.ReplyRequested, out var responseType)) + { + envelope.ResponseType = responseType; + } + else + { + return failureReply(envelope, $"Unknown reply requested message type of {envelope.ReplyRequested}"); + } + } + + if (_runtime.FindInvoker(envelope.MessageType!) is not Executor executor) + { + return failureReply(envelope, $"Unable to find a message executor for {envelope.MessageType}"); + } + + try + { + await executor.InvokeInlineAsync(envelope, context.CancellationToken); + } + catch (Exception e) + { + return failureReply(envelope, e.Message); + } + + // Mark the request complete on every success path (mirrors the HTTP executor) so tracked + // sessions quiesce — the request's record needs this terminal event. + _runtime.MessageTracking.MessageSucceeded(envelope); + + if (envelope.Response != null) + { + var response = envelope.CreateForResponse(envelope.Response); + response.Serializer ??= _runtime.Options.DefaultSerializer; + response.ContentType = response.Serializer.ContentType; + response.Data = response.Serializer.WriteMessage(response.Message!); + return new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(response)) }; + } + + // No reply message (Ack-style InvokeAsync): return a minimal reply envelope correlated to the + // request so the sender can complete an Acknowledgement. + var ack = new Envelope { ConversationId = envelope.Id, Data = Array.Empty() }; + return new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(ack)) }; + } + + private static WolverineMessage failureReply(Envelope request, string message) + { + var failure = new FailureAcknowledgement { RequestId = request.Id, Message = message }; + var reply = request.CreateForResponse(failure); + reply.Serializer ??= IntrinsicSerializer.Instance; + // IntrinsicSerializer only implements Write(Envelope); WriteMessage(object) intentionally throws. + reply.Data = reply.Serializer.Write(reply); + reply.ContentType = reply.Serializer.ContentType; + return new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(reply)) }; + } } diff --git a/src/Wolverine.Grpc/Internals/wolverine.proto b/src/Wolverine.Grpc/Internals/wolverine.proto index 45314da11..6c150098a 100644 --- a/src/Wolverine.Grpc/Internals/wolverine.proto +++ b/src/Wolverine.Grpc/Internals/wolverine.proto @@ -6,6 +6,12 @@ service WolverineTransport { rpc Send(WolverineMessage) returns (Ack); rpc SendBatch(WolverineMessageBatch) returns (Ack); rpc Ping(PingRequest) returns (Ack); + + // GH-2967 inline request/reply: the request WolverineMessage carries the serialized request + // envelope; the response WolverineMessage carries the serialized reply envelope (which may be a + // FailureAcknowledgement on a handler failure). Lets InvokeAsync use gRPC's unary response slot + // instead of a separate brokered reply on a listening endpoint. + rpc Call(WolverineMessage) returns (WolverineMessage); } message WolverineMessage {