diff --git a/.github/workflows/grpc.yml b/.github/workflows/grpc.yml new file mode 100644 index 000000000..e9b40e50d --- /dev/null +++ b/.github/workflows/grpc.yml @@ -0,0 +1,41 @@ +name: grpc + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +env: + config: Release + disable_test_parallelization: true + +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup .NET 8 + uses: actions/setup-dotnet@v5 + with: + dotnet-version: 8.0.x + + - name: Setup .NET 9 + uses: actions/setup-dotnet@v5 + with: + dotnet-version: 9.0.x + + - name: Setup .NET 10 + uses: actions/setup-dotnet@v5 + with: + dotnet-version: 10.0.x + + - name: Run gRPC Tests + run: ./build.sh CIGrpc --framework net9.0 diff --git a/build/CITargets.cs b/build/CITargets.cs index aea8c5d68..8eb9b5acc 100644 --- a/build/CITargets.cs +++ b/build/CITargets.cs @@ -437,6 +437,17 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat RunSingleProjectOneClassAtATime(leaderElectionTests); }); + Target CIGrpc => _ => _ + .ProceedAfterFailure() + .Executes(() => + { + var tests = RootDirectory / "src" / "Wolverine.Grpc.Tests" / "Wolverine.Grpc.Tests.csproj"; + + BuildTestProjects(tests); + + RunSingleProjectOneClassAtATime(tests); + }); + Target CIAzureServiceBus => _ => _ .ProceedAfterFailure() .Executes(() => diff --git a/build/build.cs b/build/build.cs index 914520cbb..4c13b6104 100644 --- a/build/build.cs +++ b/build/build.cs @@ -350,6 +350,7 @@ partial class Build : NukeBuild Solution.Transports.Redis.Wolverine_Redis, Solution.Transports.SignalR.Wolverine_SignalR, Solution.Transports.NATS.Wolverine_Nats, + Solution.Grpc.Wolverine_Grpc, Solution.Persistence.EFCore.Wolverine_EntityFrameworkCore, Solution.Persistence.Polecat.Wolverine_Polecat }; diff --git a/src/Wolverine.Grpc.Tests/GrpcTransportCompliance.cs b/src/Wolverine.Grpc.Tests/GrpcTransportCompliance.cs new file mode 100644 index 000000000..1d14f2532 --- /dev/null +++ b/src/Wolverine.Grpc.Tests/GrpcTransportCompliance.cs @@ -0,0 +1,33 @@ +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Grpc; +using Xunit; + +public class GrpcComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + public const int ReceiverPort = 5150; + public const int SenderPort = 5151; + + public GrpcComplianceFixture() : base(new Uri($"grpc://localhost:{ReceiverPort}"), 30) + { + } + + public async Task InitializeAsync() + { + OutboundAddress = new Uri($"grpc://localhost:{ReceiverPort}"); + + await ReceiverIs(opts => + { + opts.ListenAtGrpcPort(ReceiverPort); + }); + + await SenderIs(opts => + { + opts.ListenAtGrpcPort(SenderPort).UseForReplies(); + opts.PublishAllMessages().ToGrpcEndpoint("localhost", ReceiverPort); + }); + } + + public new Task DisposeAsync() => Task.CompletedTask; +} + +public class GrpcTransportCompliance : TransportCompliance; diff --git a/src/Wolverine.Grpc.Tests/Wolverine.Grpc.Tests.csproj b/src/Wolverine.Grpc.Tests/Wolverine.Grpc.Tests.csproj index 5ecfb2f37..baf788a15 100644 --- a/src/Wolverine.Grpc.Tests/Wolverine.Grpc.Tests.csproj +++ b/src/Wolverine.Grpc.Tests/Wolverine.Grpc.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/src/Wolverine.Grpc/GrpcEndpoint.cs b/src/Wolverine.Grpc/GrpcEndpoint.cs new file mode 100644 index 000000000..0aa8faa91 --- /dev/null +++ b/src/Wolverine.Grpc/GrpcEndpoint.cs @@ -0,0 +1,37 @@ +using Microsoft.Extensions.Logging; +using Wolverine.Configuration; +using Wolverine.Grpc.Internals; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Grpc; + +public class GrpcEndpoint : Endpoint +{ + public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application) + { + Host = uri.Host; + Port = uri.IsDefaultPort ? 5000 : uri.Port; + } + + public string Host { get; } + public int Port { get; } + + public override async ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) + { + var listener = new GrpcListener(Uri, Port, receiver, runtime.LoggerFactory.CreateLogger()); + await listener.StartAsync(); + return listener; + } + + protected override ISender CreateSender(IWolverineRuntime runtime) + { + return new GrpcSender(Uri, Host, Port, runtime.LoggerFactory.CreateLogger()); + } + + protected override bool supportsMode(EndpointMode mode) + { + return mode is EndpointMode.Inline or EndpointMode.BufferedInMemory; + } +} diff --git a/src/Wolverine.Grpc/GrpcListenerConfiguration.cs b/src/Wolverine.Grpc/GrpcListenerConfiguration.cs new file mode 100644 index 000000000..8bf209dc4 --- /dev/null +++ b/src/Wolverine.Grpc/GrpcListenerConfiguration.cs @@ -0,0 +1,10 @@ +using Wolverine.Configuration; + +namespace Wolverine.Grpc; + +public class GrpcListenerConfiguration : ListenerConfiguration +{ + public GrpcListenerConfiguration(GrpcEndpoint endpoint) : base(endpoint) + { + } +} diff --git a/src/Wolverine.Grpc/GrpcSubscriberConfiguration.cs b/src/Wolverine.Grpc/GrpcSubscriberConfiguration.cs new file mode 100644 index 000000000..267bb5c72 --- /dev/null +++ b/src/Wolverine.Grpc/GrpcSubscriberConfiguration.cs @@ -0,0 +1,10 @@ +using Wolverine.Configuration; + +namespace Wolverine.Grpc; + +public class GrpcSubscriberConfiguration : SubscriberConfiguration +{ + public GrpcSubscriberConfiguration(GrpcEndpoint endpoint) : base(endpoint) + { + } +} diff --git a/src/Wolverine.Grpc/GrpcTransport.cs b/src/Wolverine.Grpc/GrpcTransport.cs new file mode 100644 index 000000000..9ab6a278c --- /dev/null +++ b/src/Wolverine.Grpc/GrpcTransport.cs @@ -0,0 +1,42 @@ +using JasperFx.Core; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.Grpc; + +public class GrpcTransport : TransportBase +{ + private readonly LightweightCache _endpoints; + + public GrpcTransport() : base("grpc", "gRPC Transport", []) + { + _endpoints = new LightweightCache(uri => new GrpcEndpoint(uri)); + } + + protected override IEnumerable endpoints() => _endpoints; + + protected override GrpcEndpoint findEndpointByUri(Uri uri) => _endpoints[uri]; + + public GrpcEndpoint EndpointForLocalPort(int port) + { + var uri = new Uri($"grpc://localhost:{port}"); + return _endpoints[uri]; + } + + public GrpcEndpoint EndpointFor(string host, int port) + { + var uri = new Uri($"grpc://{host}:{port}"); + return _endpoints[uri]; + } + + public override ValueTask InitializeAsync(IWolverineRuntime runtime) + { + foreach (var endpoint in _endpoints) + { + endpoint.Compile(runtime); + } + + return ValueTask.CompletedTask; + } +} diff --git a/src/Wolverine.Grpc/GrpcTransportExtensions.cs b/src/Wolverine.Grpc/GrpcTransportExtensions.cs new file mode 100644 index 000000000..52120821c --- /dev/null +++ b/src/Wolverine.Grpc/GrpcTransportExtensions.cs @@ -0,0 +1,34 @@ +using JasperFx.Core.Reflection; +using Wolverine.Configuration; + +namespace Wolverine.Grpc; + +public static class GrpcTransportExtensions +{ + /// + /// Configure Wolverine to listen for gRPC messages on the specified port. + /// The listener starts an embedded gRPC server bound to all interfaces. + /// + public static GrpcListenerConfiguration ListenAtGrpcPort(this WolverineOptions options, int port) + { + var transport = options.Transports.GetOrCreate(); + var endpoint = transport.EndpointForLocalPort(port); + endpoint.IsListener = true; + return new GrpcListenerConfiguration(endpoint); + } + + /// + /// Publish messages to the specified gRPC endpoint (host:port). + /// + public static GrpcSubscriberConfiguration ToGrpcEndpoint( + this IPublishToExpression publishing, + string host, + int port) + { + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + var endpoint = transport.EndpointFor(host, port); + publishing.To(endpoint.Uri); + return new GrpcSubscriberConfiguration(endpoint); + } +} diff --git a/src/Wolverine.Grpc/Internals/GrpcListener.cs b/src/Wolverine.Grpc/Internals/GrpcListener.cs new file mode 100644 index 000000000..dad0e3001 --- /dev/null +++ b/src/Wolverine.Grpc/Internals/GrpcListener.cs @@ -0,0 +1,74 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.Grpc.Internals; + +internal class GrpcListener : IListener +{ + private readonly IReceiver _receiver; + private readonly ILogger _logger; + private WebApplication? _app; + + public GrpcListener(Uri address, int port, IReceiver receiver, ILogger logger) + { + Address = address; + Port = port; + _receiver = receiver; + _logger = logger; + } + + public int Port { get; } + + public IHandlerPipeline? Pipeline => null; + + public Uri Address { get; } + + internal async Task StartAsync() + { + var builder = WebApplication.CreateSlimBuilder(); + builder.Logging.ClearProviders(); + + builder.WebHost.ConfigureKestrel(opts => + { + opts.ListenAnyIP(Port, o => o.Protocols = HttpProtocols.Http2); + }); + + builder.Services.AddGrpc(); + builder.Services.AddSingleton(_receiver); + builder.Services.AddSingleton(this); + + _app = builder.Build(); + _app.MapGrpcService(); + + await _app.StartAsync(); + _logger.LogInformation("gRPC transport listener started on port {Port}", Port); + } + + public ValueTask CompleteAsync(Envelope envelope) => ValueTask.CompletedTask; + + public ValueTask DeferAsync(Envelope envelope) => ValueTask.CompletedTask; + + public async ValueTask StopAsync() + { + if (_app != null) + { + _logger.LogInformation("Stopping gRPC transport listener on port {Port}", Port); + await _app.StopAsync(); + } + } + + public async ValueTask DisposeAsync() + { + if (_app != null) + { + await _app.StopAsync(); + await _app.DisposeAsync(); + _app = null; + } + } +} diff --git a/src/Wolverine.Grpc/Internals/GrpcSender.cs b/src/Wolverine.Grpc/Internals/GrpcSender.cs new file mode 100644 index 000000000..f8eae9f0d --- /dev/null +++ b/src/Wolverine.Grpc/Internals/GrpcSender.cs @@ -0,0 +1,52 @@ +using Google.Protobuf; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime.Serialization; +using Wolverine.Transports.Sending; + +namespace Wolverine.Grpc.Internals; + +internal class GrpcSender : ISender, IDisposable +{ + private readonly ILogger _logger; + private readonly GrpcChannel _channel; + private readonly WolverineTransport.WolverineTransportClient _client; + + public GrpcSender(Uri destination, string host, int port, ILogger logger) + { + Destination = destination; + _logger = logger; + _channel = GrpcChannel.ForAddress($"http://{host}:{port}"); + _client = new WolverineTransport.WolverineTransportClient(_channel); + } + + public bool SupportsNativeScheduledSend => false; + + public Uri Destination { get; } + + public async Task PingAsync() + { + try + { + var result = await _client.PingAsync(new PingRequest()); + return result.Success; + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Ping to {Destination} failed", Destination); + return false; + } + } + + public async ValueTask SendAsync(Envelope envelope) + { + var data = EnvelopeSerializer.Serialize(envelope); + var message = new WolverineMessage { Data = ByteString.CopyFrom(data) }; + await _client.SendAsync(message); + } + + public void Dispose() + { + _channel.Dispose(); + } +} diff --git a/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs b/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs new file mode 100644 index 000000000..354d75aa8 --- /dev/null +++ b/src/Wolverine.Grpc/Internals/WolverineGrpcTransportService.cs @@ -0,0 +1,37 @@ +using Google.Protobuf; +using Grpc.Core; +using Wolverine.Runtime.Serialization; +using Wolverine.Transports; + +namespace Wolverine.Grpc.Internals; + +internal class WolverineGrpcTransportService : WolverineTransport.WolverineTransportBase +{ + private readonly IReceiver _receiver; + private readonly IListener _listener; + + public WolverineGrpcTransportService(IReceiver receiver, IListener listener) + { + _receiver = receiver; + _listener = listener; + } + + public override async Task Send(WolverineMessage request, ServerCallContext context) + { + var envelope = EnvelopeSerializer.Deserialize(request.Data.ToByteArray()); + await _receiver.ReceivedAsync(_listener, envelope); + return new Ack { Success = true }; + } + + public override async Task SendBatch(WolverineMessageBatch request, ServerCallContext context) + { + var envelopes = EnvelopeSerializer.ReadMany(request.Data.ToByteArray()); + await _receiver.ReceivedAsync(_listener, envelopes); + return new Ack { Success = true }; + } + + public override Task Ping(PingRequest request, ServerCallContext context) + { + return Task.FromResult(new Ack { Success = true }); + } +} diff --git a/src/Wolverine.Grpc/Internals/wolverine.proto b/src/Wolverine.Grpc/Internals/wolverine.proto new file mode 100644 index 000000000..45314da11 --- /dev/null +++ b/src/Wolverine.Grpc/Internals/wolverine.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +option csharp_namespace = "Wolverine.Grpc.Internals"; +package wolverine_grpc; + +service WolverineTransport { + rpc Send(WolverineMessage) returns (Ack); + rpc SendBatch(WolverineMessageBatch) returns (Ack); + rpc Ping(PingRequest) returns (Ack); +} + +message WolverineMessage { + bytes data = 1; +} + +message WolverineMessageBatch { + bytes data = 1; +} + +message PingRequest {} + +message Ack { + bool success = 1; +} diff --git a/src/Wolverine.Grpc/Wolverine.Grpc.csproj b/src/Wolverine.Grpc/Wolverine.Grpc.csproj index 95a74dc91..2b479042b 100644 --- a/src/Wolverine.Grpc/Wolverine.Grpc.csproj +++ b/src/Wolverine.Grpc/Wolverine.Grpc.csproj @@ -1,19 +1,34 @@ - Code-first and proto-first gRPC service support for Wolverine + Code-first and proto-first gRPC service support for Wolverine, including gRPC message transport WolverineFx.Grpc + + + <_Parameter1>Wolverine.Grpc.Tests + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + +