Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/Wolverine.Grpc.Tests/inline_request_reply_grpc.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Grpc;
using Wolverine.Runtime.RemoteInvocation;
using Xunit;

namespace Wolverine.Grpc.Tests;

// GH-2967: inline request/reply over the gRPC transport. The sender configures NO listening endpoint
// and NO reply tracker wiring, so InvokeAsync<T> can only return by reading the reply envelope straight
// off the unary Call(WolverineMessage) response — otherwise it would time out. Mirrors the HTTP test.
[Collection(GrpcSerialTestsCollection.Name)]
public class inline_request_reply_grpc
{
private const int ReceiverPort = 5188;

private static Task<IHost> startReceiverAsync()
{
return Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ListenAtGrpcPort(ReceiverPort);
opts.Discovery.DisableConventionalDiscovery();
opts.Discovery.IncludeType<GrpcInlineHandler>();
})
.StartAsync();
}

private static Task<IHost> startSenderAsync()
{
return Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Deliberately NO ListenAtGrpcPort(...).UseForReplies() — the reply rides the gRPC response.
opts.Discovery.DisableConventionalDiscovery();
opts.PublishAllMessages().ToGrpcEndpoint("localhost", ReceiverPort);
})
.StartAsync();
}

[Fact]
public async Task invoke_reads_reply_from_the_grpc_response_slot()
{
using var receiver = await startReceiverAsync();
using var sender = await startSenderAsync();

var response = await sender.MessageBus().InvokeAsync<GrpcInlinePong>(new GrpcInlinePing("Rand"));

response.ShouldNotBeNull();
response.Name.ShouldBe("Rand");
}

[Fact]
public async Task handler_failure_surfaces_as_request_reply_exception()
{
using var receiver = await startReceiverAsync();
using var sender = await startSenderAsync();

var ex = await Should.ThrowAsync<WolverineRequestReplyException>(async () =>
await sender.MessageBus().InvokeAsync<GrpcInlinePong>(new GrpcInlinePing("boom")));

ex.Message.ShouldContain("boom");
}
}

public record GrpcInlinePing(string Name);

public record GrpcInlinePong(string Name);

public class GrpcInlineHandler
{
public GrpcInlinePong Handle(GrpcInlinePing ping)
{
if (ping.Name == "boom")
{
throw new InvalidOperationException("boom from the gRPC handler");
}

return new GrpcInlinePong(ping.Name);
}
}
58 changes: 56 additions & 2 deletions src/Wolverine.Grpc/GrpcEndpoint.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
using Google.Protobuf;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using Wolverine.Configuration;
using Wolverine.Grpc.Internals;
using Wolverine.Runtime;
using Wolverine.Runtime.RemoteInvocation;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Sending;

namespace Wolverine.Grpc;

public class GrpcEndpoint : Endpoint
public class GrpcEndpoint : Endpoint, IInlineRequestReplyEndpoint
{
private readonly object _clientLock = new();
private GrpcChannel? _callChannel;
private WolverineTransport.WolverineTransportClient? _callClient;

public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application)
{
Host = uri.Host;
Expand All @@ -21,7 +29,7 @@ public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application)

public override async ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
{
var listener = new GrpcListener(Uri, Port, receiver, runtime.LoggerFactory.CreateLogger<GrpcListener>());
var listener = new GrpcListener(Uri, Port, receiver, runtime, runtime.LoggerFactory.CreateLogger<GrpcListener>());
await listener.StartAsync();
return listener;
}
Expand All @@ -35,4 +43,50 @@ protected override bool supportsMode(EndpointMode mode)
{
return mode is EndpointMode.Inline or EndpointMode.BufferedInMemory;
}

// GH-2967: gRPC's unary RPC carries the reply in the same exchange, so InvokeAsync<T> reads the
// reply envelope straight off the Call(WolverineMessage) response — no ReplyListener, no listening
// endpoint on the sender. Selected by MessageRoute.For via IInlineRequestReplyEndpoint.
public async Task<Envelope> InvokeRemoteAsync(Envelope request, IWolverineRuntime runtime, CancellationToken cancellation)
{
if (request.Data == null && request.Message != null)
{
request.Serializer ??= DefaultSerializer;
request.Data = request.Serializer!.Write(request);
request.ContentType ??= request.Serializer!.ContentType;
}

var client = resolveCallClient();
var message = new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(request)) };

try
{
var response = await client.CallAsync(message, cancellationToken: cancellation);
return EnvelopeSerializer.Deserialize(response.Data.ToByteArray());
}
catch (global::Grpc.Core.RpcException e)
{
// Transport-level failure (the receiver couldn't even produce a reply envelope): surface as
// a failure reply so the caller gets the usual WolverineRequestReplyException.
var ack = new FailureAcknowledgement
{
RequestId = request.Id,
Message = $"Inline gRPC request/reply to {Uri} failed: {e.Status.StatusCode} {e.Status.Detail}"
};
return new Envelope { Message = ack };
}
}

private WolverineTransport.WolverineTransportClient resolveCallClient()
{
if (_callClient != null) return _callClient;

lock (_clientLock)
{
_callChannel ??= GrpcChannel.ForAddress($"http://{Host}:{Port}");
_callClient ??= new WolverineTransport.WolverineTransportClient(_callChannel);
}

return _callClient;
}
}
6 changes: 5 additions & 1 deletion src/Wolverine.Grpc/Internals/GrpcListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ namespace Wolverine.Grpc.Internals;
internal class GrpcListener : IListener
{
private readonly IReceiver _receiver;
private readonly IWolverineRuntime _runtime;
private readonly ILogger<GrpcListener> _logger;
private WebApplication? _app;

public GrpcListener(Uri address, int port, IReceiver receiver, ILogger<GrpcListener> logger)
public GrpcListener(Uri address, int port, IReceiver receiver, IWolverineRuntime runtime, ILogger<GrpcListener> logger)
{
Address = address;
Port = port;
_receiver = receiver;
_runtime = runtime;
_logger = logger;
}

Expand All @@ -41,6 +43,8 @@ internal async Task StartAsync()
builder.Services.AddGrpc();
builder.Services.AddSingleton(_receiver);
builder.Services.AddSingleton<IListener>(this);
// GH-2967: the inline-request/reply Call handler executes against the main application's runtime.
builder.Services.AddSingleton(_runtime);

_app = builder.Build();
_app.MapGrpcService<WolverineGrpcTransportService>();
Expand Down
95 changes: 94 additions & 1 deletion src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
using Google.Protobuf;
using Grpc.Core;
using JasperFx.Core;
using Wolverine.ErrorHandling;
using Wolverine.Runtime;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.RemoteInvocation;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;

Expand All @@ -9,11 +14,13 @@ internal class WolverineGrpcTransportService : WolverineTransport.WolverineTrans
{
private readonly IReceiver _receiver;
private readonly IListener _listener;
private readonly WolverineRuntime _runtime;

public WolverineGrpcTransportService(IReceiver receiver, IListener listener)
public WolverineGrpcTransportService(IReceiver receiver, IListener listener, IWolverineRuntime runtime)
{
_receiver = receiver;
_listener = listener;
_runtime = (WolverineRuntime)runtime;
}

public override async Task<Ack> Send(WolverineMessage request, ServerCallContext context)
Expand All @@ -34,4 +41,90 @@ public override Task<Ack> Ping(PingRequest request, ServerCallContext context)
{
return Task.FromResult(new Ack { Success = true });
}

// GH-2967 inline request/reply: run the handler chain for the inbound envelope and return the reply
// envelope in the unary response. Mirrors the HTTP transport's /_wolverine/invoke executor. A
// handler failure comes back as a FailureAcknowledgement reply (the sender rethrows
// WolverineRequestReplyException), and the outbox is flushed (InvokeInlineAsync) before the response.
public override async Task<WolverineMessage> Call(WolverineMessage request, ServerCallContext context)
{
Envelope envelope;
try
{
envelope = EnvelopeSerializer.Deserialize(request.Data.ToByteArray());
}
catch (Exception e)
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "Invalid envelope: " + e.Message));
}

envelope.DoNotCascadeResponse = true;
if (envelope.ContentType.IsNotEmpty())
{
envelope.Serializer = _runtime.Options.FindSerializer(envelope.ContentType!);
}

var deserializeResult = await _runtime.Pipeline.TryDeserializeEnvelope(envelope);
if (deserializeResult != NullContinuation.Instance)
{
return failureReply(envelope, deserializeResult is NoHandlerContinuation
? $"No handler for the requested message type {envelope.MessageType}"
: $"Execution error for requested message type {envelope.MessageType}");
}

if (envelope.ReplyRequested.IsNotEmpty())
{
if (_runtime.Handlers.TryFindMessageType(envelope.ReplyRequested, out var responseType))
{
envelope.ResponseType = responseType;
}
else
{
return failureReply(envelope, $"Unknown reply requested message type of {envelope.ReplyRequested}");
}
}

if (_runtime.FindInvoker(envelope.MessageType!) is not Executor executor)
{
return failureReply(envelope, $"Unable to find a message executor for {envelope.MessageType}");
}

try
{
await executor.InvokeInlineAsync(envelope, context.CancellationToken);
}
catch (Exception e)
{
return failureReply(envelope, e.Message);
}

// Mark the request complete on every success path (mirrors the HTTP executor) so tracked
// sessions quiesce — the request's record needs this terminal event.
_runtime.MessageTracking.MessageSucceeded(envelope);

if (envelope.Response != null)
{
var response = envelope.CreateForResponse(envelope.Response);
response.Serializer ??= _runtime.Options.DefaultSerializer;
response.ContentType = response.Serializer.ContentType;
response.Data = response.Serializer.WriteMessage(response.Message!);
return new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(response)) };
}

// No reply message (Ack-style InvokeAsync): return a minimal reply envelope correlated to the
// request so the sender can complete an Acknowledgement.
var ack = new Envelope { ConversationId = envelope.Id, Data = Array.Empty<byte>() };
return new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(ack)) };
}

private static WolverineMessage failureReply(Envelope request, string message)
{
var failure = new FailureAcknowledgement { RequestId = request.Id, Message = message };
var reply = request.CreateForResponse(failure);
reply.Serializer ??= IntrinsicSerializer.Instance;
// IntrinsicSerializer only implements Write(Envelope); WriteMessage(object) intentionally throws.
reply.Data = reply.Serializer.Write(reply);
reply.ContentType = reply.Serializer.ContentType;
return new WolverineMessage { Data = ByteString.CopyFrom(EnvelopeSerializer.Serialize(reply)) };
}
}
6 changes: 6 additions & 0 deletions src/Wolverine.Grpc/Internals/wolverine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ service WolverineTransport {
rpc Send(WolverineMessage) returns (Ack);
rpc SendBatch(WolverineMessageBatch) returns (Ack);
rpc Ping(PingRequest) returns (Ack);

// GH-2967 inline request/reply: the request WolverineMessage carries the serialized request
// envelope; the response WolverineMessage carries the serialized reply envelope (which may be a
// FailureAcknowledgement on a handler failure). Lets InvokeAsync<T> use gRPC's unary response slot
// instead of a separate brokered reply on a listening endpoint.
rpc Call(WolverineMessage) returns (WolverineMessage);
}

message WolverineMessage {
Expand Down