Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/grpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: grpc

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

env:
config: Release
disable_test_parallelization: true

jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 15

steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Setup .NET 8
uses: actions/setup-dotnet@v5
with:
dotnet-version: 8.0.x

- name: Setup .NET 9
uses: actions/setup-dotnet@v5
with:
dotnet-version: 9.0.x

- name: Setup .NET 10
uses: actions/setup-dotnet@v5
with:
dotnet-version: 10.0.x

- name: Run gRPC Tests
run: ./build.sh CIGrpc --framework net9.0
11 changes: 11 additions & 0 deletions build/CITargets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,17 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat
RunSingleProjectOneClassAtATime(leaderElectionTests);
});

Target CIGrpc => _ => _
.ProceedAfterFailure()
.Executes(() =>
{
var tests = RootDirectory / "src" / "Wolverine.Grpc.Tests" / "Wolverine.Grpc.Tests.csproj";

BuildTestProjects(tests);

RunSingleProjectOneClassAtATime(tests);
});

Target CIAzureServiceBus => _ => _
.ProceedAfterFailure()
.Executes(() =>
Expand Down
1 change: 1 addition & 0 deletions build/build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ partial class Build : NukeBuild
Solution.Transports.Redis.Wolverine_Redis,
Solution.Transports.SignalR.Wolverine_SignalR,
Solution.Transports.NATS.Wolverine_Nats,
Solution.Grpc.Wolverine_Grpc,
Solution.Persistence.EFCore.Wolverine_EntityFrameworkCore,
Solution.Persistence.Polecat.Wolverine_Polecat
};
Expand Down
33 changes: 33 additions & 0 deletions src/Wolverine.Grpc.Tests/GrpcTransportCompliance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Wolverine.ComplianceTests.Compliance;
using Wolverine.Grpc;
using Xunit;

public class GrpcComplianceFixture : TransportComplianceFixture, IAsyncLifetime
{
public const int ReceiverPort = 5150;
public const int SenderPort = 5151;

public GrpcComplianceFixture() : base(new Uri($"grpc://localhost:{ReceiverPort}"), 30)
{
}

public async Task InitializeAsync()
{
OutboundAddress = new Uri($"grpc://localhost:{ReceiverPort}");

await ReceiverIs(opts =>
{
opts.ListenAtGrpcPort(ReceiverPort);
});

await SenderIs(opts =>
{
opts.ListenAtGrpcPort(SenderPort).UseForReplies();
opts.PublishAllMessages().ToGrpcEndpoint("localhost", ReceiverPort);
});
}

public new Task DisposeAsync() => Task.CompletedTask;
}

public class GrpcTransportCompliance : TransportCompliance<GrpcComplianceFixture>;
1 change: 1 addition & 0 deletions src/Wolverine.Grpc.Tests/Wolverine.Grpc.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<ProjectReference Include="..\Wolverine.Grpc\Wolverine.Grpc.csproj"/>
<ProjectReference Include="..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj"/>
<ProjectReference Include="..\Extensions\Wolverine.FluentValidation\Wolverine.FluentValidation.csproj"/>
<ProjectReference Include="..\Extensions\Wolverine.FluentValidation.Grpc\Wolverine.FluentValidation.Grpc.csproj"/>
</ItemGroup>
Expand Down
37 changes: 37 additions & 0 deletions src/Wolverine.Grpc/GrpcEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Microsoft.Extensions.Logging;
using Wolverine.Configuration;
using Wolverine.Grpc.Internals;
using Wolverine.Runtime;
using Wolverine.Transports;
using Wolverine.Transports.Sending;

namespace Wolverine.Grpc;

public class GrpcEndpoint : Endpoint
{
public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application)
{
Host = uri.Host;
Port = uri.IsDefaultPort ? 5000 : uri.Port;
}

public string Host { get; }
public int Port { get; }

public override async ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
{
var listener = new GrpcListener(Uri, Port, receiver, runtime.LoggerFactory.CreateLogger<GrpcListener>());
await listener.StartAsync();
return listener;
}

protected override ISender CreateSender(IWolverineRuntime runtime)
{
return new GrpcSender(Uri, Host, Port, runtime.LoggerFactory.CreateLogger<GrpcSender>());
}

protected override bool supportsMode(EndpointMode mode)
{
return mode is EndpointMode.Inline or EndpointMode.BufferedInMemory;
}
}
10 changes: 10 additions & 0 deletions src/Wolverine.Grpc/GrpcListenerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Wolverine.Configuration;

namespace Wolverine.Grpc;

public class GrpcListenerConfiguration : ListenerConfiguration<GrpcListenerConfiguration, GrpcEndpoint>
{
public GrpcListenerConfiguration(GrpcEndpoint endpoint) : base(endpoint)
{
}
}
10 changes: 10 additions & 0 deletions src/Wolverine.Grpc/GrpcSubscriberConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Wolverine.Configuration;

namespace Wolverine.Grpc;

public class GrpcSubscriberConfiguration : SubscriberConfiguration<GrpcSubscriberConfiguration, GrpcEndpoint>
{
public GrpcSubscriberConfiguration(GrpcEndpoint endpoint) : base(endpoint)
{
}
}
42 changes: 42 additions & 0 deletions src/Wolverine.Grpc/GrpcTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using JasperFx.Core;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Transports;

namespace Wolverine.Grpc;

public class GrpcTransport : TransportBase<GrpcEndpoint>
{
private readonly LightweightCache<Uri, GrpcEndpoint> _endpoints;

public GrpcTransport() : base("grpc", "gRPC Transport", [])
{
_endpoints = new LightweightCache<Uri, GrpcEndpoint>(uri => new GrpcEndpoint(uri));
}

protected override IEnumerable<GrpcEndpoint> endpoints() => _endpoints;

protected override GrpcEndpoint findEndpointByUri(Uri uri) => _endpoints[uri];

public GrpcEndpoint EndpointForLocalPort(int port)
{
var uri = new Uri($"grpc://localhost:{port}");
return _endpoints[uri];
}

public GrpcEndpoint EndpointFor(string host, int port)
{
var uri = new Uri($"grpc://{host}:{port}");
return _endpoints[uri];
}

public override ValueTask InitializeAsync(IWolverineRuntime runtime)
{
foreach (var endpoint in _endpoints)
{
endpoint.Compile(runtime);
}

return ValueTask.CompletedTask;
}
}
34 changes: 34 additions & 0 deletions src/Wolverine.Grpc/GrpcTransportExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using JasperFx.Core.Reflection;
using Wolverine.Configuration;

namespace Wolverine.Grpc;

public static class GrpcTransportExtensions
{
/// <summary>
/// Configure Wolverine to listen for gRPC messages on the specified port.
/// The listener starts an embedded gRPC server bound to all interfaces.
/// </summary>
public static GrpcListenerConfiguration ListenAtGrpcPort(this WolverineOptions options, int port)
{
var transport = options.Transports.GetOrCreate<GrpcTransport>();
var endpoint = transport.EndpointForLocalPort(port);
endpoint.IsListener = true;
return new GrpcListenerConfiguration(endpoint);
}

/// <summary>
/// Publish messages to the specified gRPC endpoint (host:port).
/// </summary>
public static GrpcSubscriberConfiguration ToGrpcEndpoint(
this IPublishToExpression publishing,
string host,
int port)
{
var transports = publishing.As<PublishingExpression>().Parent.Transports;
var transport = transports.GetOrCreate<GrpcTransport>();
var endpoint = transport.EndpointFor(host, port);
publishing.To(endpoint.Uri);
return new GrpcSubscriberConfiguration(endpoint);
}
}
74 changes: 74 additions & 0 deletions src/Wolverine.Grpc/Internals/GrpcListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Transports;

namespace Wolverine.Grpc.Internals;

internal class GrpcListener : IListener
{
private readonly IReceiver _receiver;
private readonly ILogger<GrpcListener> _logger;
private WebApplication? _app;

public GrpcListener(Uri address, int port, IReceiver receiver, ILogger<GrpcListener> logger)
{
Address = address;
Port = port;
_receiver = receiver;
_logger = logger;
}

public int Port { get; }

public IHandlerPipeline? Pipeline => null;

public Uri Address { get; }

internal async Task StartAsync()
{
var builder = WebApplication.CreateSlimBuilder();
builder.Logging.ClearProviders();

builder.WebHost.ConfigureKestrel(opts =>
{
opts.ListenAnyIP(Port, o => o.Protocols = HttpProtocols.Http2);
});

builder.Services.AddGrpc();
builder.Services.AddSingleton(_receiver);
builder.Services.AddSingleton<IListener>(this);

_app = builder.Build();
_app.MapGrpcService<WolverineGrpcTransportService>();

await _app.StartAsync();
_logger.LogInformation("gRPC transport listener started on port {Port}", Port);
}

public ValueTask CompleteAsync(Envelope envelope) => ValueTask.CompletedTask;

public ValueTask DeferAsync(Envelope envelope) => ValueTask.CompletedTask;

public async ValueTask StopAsync()
{
if (_app != null)
{
_logger.LogInformation("Stopping gRPC transport listener on port {Port}", Port);
await _app.StopAsync();
}
}

public async ValueTask DisposeAsync()
{
if (_app != null)
{
await _app.StopAsync();
await _app.DisposeAsync();
_app = null;
}
}
}
52 changes: 52 additions & 0 deletions src/Wolverine.Grpc/Internals/GrpcSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Google.Protobuf;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports.Sending;

namespace Wolverine.Grpc.Internals;

internal class GrpcSender : ISender, IDisposable
{
private readonly ILogger<GrpcSender> _logger;
private readonly GrpcChannel _channel;
private readonly WolverineTransport.WolverineTransportClient _client;

public GrpcSender(Uri destination, string host, int port, ILogger<GrpcSender> logger)
{
Destination = destination;
_logger = logger;
_channel = GrpcChannel.ForAddress($"http://{host}:{port}");
_client = new WolverineTransport.WolverineTransportClient(_channel);
}

public bool SupportsNativeScheduledSend => false;

public Uri Destination { get; }

public async Task<bool> PingAsync()
{
try
{
var result = await _client.PingAsync(new PingRequest());
return result.Success;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Ping to {Destination} failed", Destination);
return false;
}
}

public async ValueTask SendAsync(Envelope envelope)
{
var data = EnvelopeSerializer.Serialize(envelope);
var message = new WolverineMessage { Data = ByteString.CopyFrom(data) };
await _client.SendAsync(message);
}

public void Dispose()
{
_channel.Dispose();
}
}
Loading
Loading