diff --git a/src/Http/Wolverine.Http.Tests/Transport/CloudEventsHttpTransportTests.cs b/src/Http/Wolverine.Http.Tests/Transport/CloudEventsHttpTransportTests.cs new file mode 100644 index 000000000..6a0edc1b0 --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/Transport/CloudEventsHttpTransportTests.cs @@ -0,0 +1,155 @@ +using System.Net; +using System.Text; +using System.Text.Json; +using NSubstitute; +using Shouldly; +using Wolverine.Http.Transport; +using Wolverine.Runtime.Interop; +using Xunit; + +namespace Wolverine.Http.Tests.Transport; + +public class CloudEventsHttpTransportTests +{ + private readonly IHttpClientFactory _clientFactory; + private readonly MockHttpMessageHandler _handler; + private readonly HttpClient _httpClient; + private readonly WolverineHttpTransportClientCloudEvents _client; + + public CloudEventsHttpTransportTests() + { + _clientFactory = Substitute.For(); + _handler = new MockHttpMessageHandler(); + _httpClient = new HttpClient(_handler); + _client = new WolverineHttpTransportClientCloudEvents(_clientFactory); + } + + [Fact] + public async Task send_envelope_as_cloud_event() + { + var uri = "https://localhost:5001/cloudevents"; + _httpClient.BaseAddress = new Uri(uri); + _clientFactory.CreateClient(uri).Returns(_httpClient); + + var message = new TestMessage { Name = "test" }; + var envelope = new Envelope + { + Id = Guid.NewGuid(), + MessageType = "TestMessage", + Message = message, + Data = Encoding.UTF8.GetBytes("{\"name\":\"test\"}"), + ContentType = "application/json" + }; + + var options = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + await _client.SendAsync(uri, envelope, options); + + _handler.LastRequest.ShouldNotBeNull(); + _handler.LastRequest.Method.ShouldBe(HttpMethod.Post); + _handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransport.CloudEventsContentType); + + // Verify the body contains a CloudEvents formatted JSON + var content = Encoding.UTF8.GetString(_handler.LastContent); + content.ShouldContain("specversion"); + content.ShouldContain("type"); + content.ShouldContain("source"); + content.ShouldContain("id"); + } + + [Fact] + public async Task cloud_events_envelope_contains_correct_properties() + { + var uri = "https://localhost:5001/cloudevents"; + _httpClient.BaseAddress = new Uri(uri); + _clientFactory.CreateClient(uri).Returns(_httpClient); + + var envelopeId = Guid.NewGuid(); + var message = new CloudEventsTestCommand { Value = 42 }; + var envelope = new Envelope + { + Id = envelopeId, + MessageType = "MyApp.TestCommand", + Message = message, + Data = Encoding.UTF8.GetBytes("{\"value\":42}"), + ContentType = "application/json", + Source = "test-service" + }; + + var options = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + await _client.SendAsync(uri, envelope, options); + + var content = Encoding.UTF8.GetString(_handler.LastContent); + var ce = JsonSerializer.Deserialize(content, options); + + ce.ShouldNotBeNull(); + ce.SpecVersion.ShouldBe("1.0"); + ce.Type.ShouldBe("Wolverine.Http.Tests.Transport.CloudEventsTestCommand"); + ce.Source.ShouldBe("test-service"); + ce.Id.ToString().ShouldBe(envelopeId.ToString()); + ce.DataContentType.ShouldStartWith("application/json"); + } + + [Fact] + public async Task send_batch_with_cloud_events_uses_envelope_batch_content_type() + { + var uri = "https://localhost:5001/batch"; + _httpClient.BaseAddress = new Uri("https://target"); + _clientFactory.CreateClient(uri).Returns(_httpClient); + + var envelopes = new[] + { + new Envelope { Data = new byte[] { 1 } }, + new Envelope { Data = new byte[] { 2 } } + }; + + var batch = new Wolverine.Transports.OutgoingMessageBatch(new Uri(uri), envelopes); + + await _client.SendBatchAsync(uri, batch); + + _handler.LastRequest.ShouldNotBeNull(); + // Note: CloudEvents client uses EnvelopeBatchContentType for batches + _handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransport.EnvelopeBatchContentType); + } + + [Fact] + public async Task cloud_events_with_null_options_uses_default() + { + var uri = "https://localhost:5001/cloudevents"; + _httpClient.BaseAddress = new Uri(uri); + _clientFactory.CreateClient(uri).Returns(_httpClient); + + var message = new TestMessage { Name = "test" }; + var envelope = new Envelope + { + Id = Guid.NewGuid(), + MessageType = "TestMessage", + Message = message, + Data = Encoding.UTF8.GetBytes("{\"name\":\"test\"}") + }; + + // Should not throw with null options + await _client.SendAsync(uri, envelope, null); + + _handler.LastRequest.ShouldNotBeNull(); + _handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransport.CloudEventsContentType); + } +} + +public class TestMessage +{ + public string Name { get; set; } +} + +public class CloudEventsTestCommand +{ + public int Value { get; set; } +} + diff --git a/src/Http/Wolverine.Http.Tests/Transport/HttpSenderTests.cs b/src/Http/Wolverine.Http.Tests/Transport/HttpSenderTests.cs new file mode 100644 index 000000000..ce1c64267 --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/Transport/HttpSenderTests.cs @@ -0,0 +1,249 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Http.Transport; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; +using Xunit; + +namespace Wolverine.Http.Tests.Transport; + +public class HttpSenderTests +{ + [Fact] + public async Task inline_sender_supports_native_scheduled_send_when_endpoint_configured() + { + var endpoint = new HttpEndpoint(new Uri("https://scheduler.com/api"), EndpointRole.Application) + { + SupportsNativeScheduledSend = true, + OutboundUri = "https://scheduler.com/api" + }; + + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + sender.SupportsNativeScheduledSend.ShouldBeTrue(); + } + + [Fact] + public async Task inline_sender_does_not_support_native_scheduled_send_by_default() + { + var endpoint = new HttpEndpoint(new Uri("https://regular.com/api"), EndpointRole.Application) + { + OutboundUri = "https://regular.com/api" + }; + + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + sender.SupportsNativeScheduledSend.ShouldBeFalse(); + } + + [Fact] + public async Task inline_sender_uses_wolverine_http_transport_client() + { + var endpoint = new HttpEndpoint(new Uri("https://test.com/api"), EndpointRole.Application) + { + OutboundUri = "https://test.com/api" + }; + + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + var envelope = new Envelope + { + Id = Guid.NewGuid(), + Data = new byte[] { 1, 2, 3 } + }; + + await sender.SendAsync(envelope); + + await mockClient.Received(1).SendAsync( + Arg.Is("https://test.com/api"), + Arg.Is(e => e.Id == envelope.Id), + Arg.Any()); + } + + [Fact] + public async Task inline_sender_handles_exceptions_gracefully() + { + var endpoint = new HttpEndpoint(new Uri("https://failing.com/api"), EndpointRole.Application) + { + OutboundUri = "https://failing.com/api" + }; + + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + mockClient.SendAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(Task.FromException(new Exception("Network error"))); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + var envelope = new Envelope + { + Id = Guid.NewGuid(), + Data = new byte[] { 1, 2, 3 } + }; + + // Should not throw - errors are logged + await sender.SendAsync(envelope); + } + + [Fact] + public async Task inline_sender_logs_error_if_client_not_registered() + { + var endpoint = new HttpEndpoint(new Uri("https://test.com/api"), EndpointRole.Application) + { + OutboundUri = "https://test.com/api" + }; + + var services = new ServiceCollection(); + // Not registering IWolverineHttpTransportClient + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + var envelope = new Envelope { Id = Guid.NewGuid() }; + + // InlineHttpSender catches exceptions and logs them instead of throwing + // This should not throw - it logs the error + await sender.SendAsync(envelope); + + // Test passes if no exception is thrown + true.ShouldBeTrue(); + } + + [Fact] + public async Task batched_sender_protocol_sends_batch_via_client() + { + var endpoint = new HttpEndpoint(new Uri("https://batch.com/api"), EndpointRole.Application) + { + OutboundUri = "https://batch.com/api" + }; + + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var protocol = new HttpSenderProtocol(endpoint, serviceProvider); + + var envelopes = new[] + { + new Envelope { Id = Guid.NewGuid(), Data = new byte[] { 1, 2, 3 } }, + new Envelope { Id = Guid.NewGuid(), Data = new byte[] { 4, 5, 6 } } + }; + var batch = new OutgoingMessageBatch(endpoint.Uri, envelopes); + + var callback = Substitute.For(); + + await protocol.SendBatchAsync(callback, batch); + + await mockClient.Received(1).SendBatchAsync( + Arg.Is("https://batch.com/api"), + Arg.Is(b => b.Messages.Count == 2)); + } + + [Fact] + public async Task batched_sender_protocol_throws_if_client_not_registered() + { + var endpoint = new HttpEndpoint(new Uri("https://batch.com/api"), EndpointRole.Application) + { + OutboundUri = "https://batch.com/api" + }; + + var services = new ServiceCollection(); + // Not registering IWolverineHttpTransportClient + var serviceProvider = services.BuildServiceProvider(); + + var protocol = new HttpSenderProtocol(endpoint, serviceProvider); + + var batch = new OutgoingMessageBatch(endpoint.Uri, new[] { new Envelope { Data = new byte[] { 1 } } }); + var callback = Substitute.For(); + + // GetRequiredService throws InvalidOperationException when service is not found + var ex = await Should.ThrowAsync(async () => + { + await protocol.SendBatchAsync(callback, batch); + }); + + ex.Message.ShouldContain("IWolverineHttpTransportClient"); + } + + [Fact] + public async Task inline_sender_ping_returns_true() + { + var endpoint = new HttpEndpoint(new Uri("https://test.com/api"), EndpointRole.Application); + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + var result = await sender.PingAsync(); + result.ShouldBeTrue(); + } + + [Fact] + public void inline_sender_destination_matches_endpoint_uri() + { + var uri = new Uri("https://test.com/api"); + var endpoint = new HttpEndpoint(uri, EndpointRole.Application); + + var services = new ServiceCollection(); + var mockClient = Substitute.For(); + services.AddSingleton(mockClient); + var serviceProvider = services.BuildServiceProvider(); + + var runtime = Substitute.For(); + runtime.LoggerFactory.Returns(NullLoggerFactory.Instance); + runtime.Services.Returns(serviceProvider); + + var sender = new InlineHttpSender(endpoint, runtime, serviceProvider); + + sender.Destination.ShouldBe(uri); + } +} + diff --git a/src/Http/Wolverine.Http.Tests/Transport/HttpTransportConfigurationTests.cs b/src/Http/Wolverine.Http.Tests/Transport/HttpTransportConfigurationTests.cs new file mode 100644 index 000000000..3661cae7a --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/Transport/HttpTransportConfigurationTests.cs @@ -0,0 +1,173 @@ +using System.Text.Json; +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Http.Transport; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.Http.Tests.Transport; + +public class HttpTransportConfigurationTests +{ + [Fact] + public async Task to_http_endpoint_creates_endpoint_with_correct_uri() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PublishAllMessages() + .ToHttpEndpoint("https://external-service.com/api"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpoint = transport.EndpointFor("https://external-service.com/api"); + endpoint.ShouldNotBeNull(); + endpoint.OutboundUri.ShouldBe("https://external-service.com/api"); + } + + [Fact] + public async Task to_http_endpoint_with_native_scheduled_send_sets_flag() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PublishAllMessages() + .ToHttpEndpoint("https://scheduler.com/api", supportsNativeScheduledSend: true); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpoint = transport.EndpointFor("https://scheduler.com/api"); + endpoint.SupportsNativeScheduledSend.ShouldBeTrue(); + } + + [Fact] + public async Task to_http_endpoint_without_native_scheduled_send_defaults_to_false() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PublishAllMessages() + .ToHttpEndpoint("https://regular.com/api"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpoint = transport.EndpointFor("https://regular.com/api"); + endpoint.SupportsNativeScheduledSend.ShouldBeFalse(); + } + + [Fact] + public async Task to_http_endpoint_with_cloud_events_sets_serializer_options() + { + var customOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseUpper, + WriteIndented = true + }; + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PublishAllMessages() + .ToHttpEndpoint("https://cloudevents.com/api", + useCloudEvents: true, + options: customOptions); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpoint = transport.EndpointFor("https://cloudevents.com/api"); + endpoint.SerializerOptions.ShouldBeSameAs(customOptions); + endpoint.SerializerOptions.PropertyNamingPolicy.ShouldBe(JsonNamingPolicy.SnakeCaseUpper); + endpoint.SerializerOptions.WriteIndented.ShouldBeTrue(); + } + + [Fact] + public async Task to_http_endpoint_without_cloud_events_keeps_default_options() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PublishAllMessages() + .ToHttpEndpoint("https://binary.com/api"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpoint = transport.EndpointFor("https://binary.com/api"); + // Default options should have CamelCase naming + endpoint.SerializerOptions.PropertyNamingPolicy.ShouldBe(JsonNamingPolicy.CamelCase); + endpoint.SerializerOptions.WriteIndented.ShouldBeFalse(); + } + + [Fact] + public async Task to_http_endpoint_returns_subscriber_configuration() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + var config = opts.PublishAllMessages() + .ToHttpEndpoint("https://test.com/api"); + + config.ShouldBeOfType(); + }) + .StartAsync(); + } + + [Fact] + public async Task can_chain_subscriber_configuration_methods() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PublishAllMessages() + .ToHttpEndpoint("https://test.com/api") + .SendInline() + .CustomizeOutgoing(e => e.CorrelationId = "test-correlation"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpoint = transport.EndpointFor("https://test.com/api"); + endpoint.Mode.ShouldBe(Wolverine.Configuration.EndpointMode.Inline); + } + + [Fact] + public void can_configure_multiple_http_endpoints_with_different_settings() + { + var transport = new HttpTransport(); + + var endpoint1 = transport.EndpointFor("https://service1.com/api"); + endpoint1.SupportsNativeScheduledSend = false; + + var endpoint2 = transport.EndpointFor("https://service2.com/api"); + endpoint2.SupportsNativeScheduledSend = true; + + endpoint1.SupportsNativeScheduledSend.ShouldBeFalse(); + endpoint2.SupportsNativeScheduledSend.ShouldBeTrue(); + + // Verify endpoints are cached and different + transport.EndpointFor("https://service1.com/api").ShouldBeSameAs(endpoint1); + transport.EndpointFor("https://service2.com/api").ShouldBeSameAs(endpoint2); + } +} + +public record TestMessage1(string Value); +public record TestMessage2(string Value); + diff --git a/src/Http/Wolverine.Http.Tests/Transport/HttpTransportExecutorTests.cs b/src/Http/Wolverine.Http.Tests/Transport/HttpTransportExecutorTests.cs new file mode 100644 index 000000000..ab5f204c7 --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/Transport/HttpTransportExecutorTests.cs @@ -0,0 +1,245 @@ +using System.Text; +using System.Text.Json; +using Alba; +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Shouldly; +using Wolverine.Http.Transport; +using Wolverine.Runtime.Interop; +using Wolverine.Runtime.Serialization; +using Wolverine.Tracking; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.Http.Tests.Transport; + +public class HttpTransportExecutorTests : IntegrationContext +{ + public HttpTransportExecutorTests(AppFixture fixture) : base(fixture) + { + } + + [Fact] + public async Task execute_batch_returns_415_for_wrong_content_type() + { + var result = await Scenario(s => + { + s.Post.ByteArray(new byte[] { 1, 2, 3 }).ToUrl("/_wolverine/batch/test") + .ContentType("application/json"); + s.StatusCodeShouldBe(415); + }); + } + + [Fact] + public async Task execute_batch_returns_415_for_missing_content_type() + { + var result = await Scenario(s => + { + s.Post.ByteArray(new byte[] { 1, 2, 3 }).ToUrl("/_wolverine/batch/test"); + s.StatusCodeShouldBe(415); + }); + } + + [Fact] + public async Task execute_batch_returns_500_for_invalid_envelope_data() + { + var result = await Scenario(s => + { + s.Post.ByteArray(new byte[] { 0xFF, 0xFF, 0xFF }).ToUrl("/_wolverine/batch/test") + .ContentType(HttpTransport.EnvelopeBatchContentType); + s.StatusCodeShouldBe(500); + }); + } + + [Fact] + public async Task invoke_returns_415_for_wrong_content_type() + { + var result = await Scenario(s => + { + s.Post.ByteArray(new byte[] { 1, 2, 3 }).ToUrl("/_wolverine/invoke") + .ContentType("application/xml"); + s.StatusCodeShouldBe(415); + }); + } + + [Fact] + public async Task invoke_returns_415_for_missing_content_type() + { + var result = await Scenario(s => + { + s.Post.ByteArray(new byte[] { 1, 2, 3 }).ToUrl("/_wolverine/invoke"); + s.StatusCodeShouldBe(415); + }); + } + + [Fact] + public async Task invoke_accepts_binary_envelope_content_type() + { + var serializer = new SystemTextJsonSerializer(new JsonSerializerOptions()); + var envelope = new Envelope(new HttpMessage1("test")) + { + Serializer = serializer, + ContentType = serializer.ContentType + }; + + var data = EnvelopeSerializer.Serialize(envelope); + + var (tracked, result) = await TrackedHttpCall(s => + { + s.Post.ByteArray(data).ToUrl("/_wolverine/invoke") + .ContentType(HttpTransport.EnvelopeContentType); + s.StatusCodeShouldBeOk(); + }); + + tracked.Executed.SingleMessage().Name.ShouldBe("test"); + } + + [Fact] + public async Task invoke_accepts_cloud_events_content_type() + { + var options = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + var messageData = JsonSerializer.Serialize(new { name = "cloud-test" }, options); + var cloudEvent = new + { + specversion = "1.0", + type = typeof(HttpMessage1).ToMessageTypeName(), + source = "test-source", + id = Guid.NewGuid().ToString(), + datacontenttype = "application/json", + data = JsonSerializer.Deserialize(messageData) + }; + + var json = JsonSerializer.Serialize(cloudEvent, options); + + var (tracked, result) = await TrackedHttpCall(s => + { + s.Post.Text(json).ToUrl("/_wolverine/invoke") + .ContentType("application/cloudevents+json"); + s.StatusCodeShouldBeOk(); + }); + + tracked.Executed.SingleMessage().Name.ShouldBe("cloud-test"); + } + + [Fact] + public async Task invoke_returns_400_for_unknown_message_type() + { + var envelope = new Envelope + { + MessageType = "NonExistent.MessageType", + Data = new byte[] { 1, 2, 3 }, + ContentType = "application/json" + }; + + var data = EnvelopeSerializer.Serialize(envelope); + + var result = await Scenario(s => + { + s.Post.ByteArray(data).ToUrl("/_wolverine/invoke") + .ContentType(HttpTransport.EnvelopeContentType); + s.StatusCodeShouldBe(400); + }); + } + + [Fact] + public async Task invoke_with_response_returns_binary_envelope() + { + var serializer = Host.GetRuntime().Options.DefaultSerializer; + var envelope = new Envelope(new WolverineWebApi.CustomRequest("response-test")) + { + Serializer = serializer, + ReplyRequested = typeof(WolverineWebApi.CustomResponse).ToMessageTypeName(), + ContentType = "application/json" + }; + + var data = EnvelopeSerializer.Serialize(envelope); + + var (tracked, result) = await TrackedHttpCall(s => + { + s.Post.ByteArray(data).ToUrl("/_wolverine/invoke") + .ContentType(HttpTransport.EnvelopeContentType); + s.ContentTypeShouldBe(HttpTransport.EnvelopeContentType); + }); + + var responseData = await result.Context.Response.Body.ReadAllBytesAsync(); + var responseEnvelope = EnvelopeSerializer.Deserialize(responseData); + + responseEnvelope.Message = serializer.ReadFromData(typeof(WolverineWebApi.CustomResponse), responseEnvelope); + responseEnvelope.Message.ShouldBeOfType() + .Name.ShouldBe("response-test"); + } + + [Fact] + public async Task invoke_returns_400_for_invalid_reply_requested_type() + { + var serializer = Host.GetRuntime().Options.DefaultSerializer; + var envelope = new Envelope(new HttpMessage1("test")) + { + Serializer = serializer, + ReplyRequested = "NonExistent.ResponseType", + ContentType = "application/json" + }; + + var data = EnvelopeSerializer.Serialize(envelope); + + var result = await Scenario(s => + { + s.Post.ByteArray(data).ToUrl("/_wolverine/invoke") + .ContentType(HttpTransport.EnvelopeContentType); + s.StatusCodeShouldBe(400); + }); + } + + [Fact] + public async Task batch_with_multiple_queues_routes_to_correct_queue() + { + var serializer = new SystemTextJsonSerializer(new JsonSerializerOptions()); + var envelopes = new Envelope[] + { + new(new HttpMessage1("queue-test")) { Serializer = serializer } + }; + + var data = EnvelopeSerializer.Serialize(envelopes); + + var (tracked, result) = await TrackedHttpCall(s => + { + s.Post.ByteArray(data).ToUrl("/_wolverine/batch/custom-queue") + .ContentType(HttpTransport.EnvelopeBatchContentType); + s.StatusCodeShouldBeOk(); + }); + + tracked.Executed.SingleMessage().Name.ShouldBe("queue-test"); + } +} + +public record HttpMessage1(string Name); +public static class HttpMessage1Handler +{ + public static void Handle(HttpMessage1 message) + { + // Just receive it + } +} + +public record HttpMessage2(string Name); +public static class HttpMessage2Handler +{ + public static void Handle(HttpMessage2 message) + { + // Just receive it + } +} + +public record HttpMessage3(string Name); +public static class HttpMessage3Handler +{ + public static void Handle(HttpMessage3 message) + { + // Just receive it + } +} + diff --git a/src/Http/Wolverine.Http.Tests/Transport/MapWolverineHttpTransportEndpointsTests.cs b/src/Http/Wolverine.Http.Tests/Transport/MapWolverineHttpTransportEndpointsTests.cs new file mode 100644 index 000000000..5ab356822 --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/Transport/MapWolverineHttpTransportEndpointsTests.cs @@ -0,0 +1,100 @@ +using System.Text.Json; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Wolverine.Http.Transport; +using Xunit; + +namespace Wolverine.Http.Tests.Transport; + +public class MapWolverineHttpTransportEndpointsTests +{ + [Fact] + public void can_map_with_custom_prefix() + { + var builder = WebApplication.CreateBuilder(); + builder.Host.UseWolverine(); + builder.Services.AddWolverineHttp(); + + var app = builder.Build(); + var group = app.MapWolverineHttpTransportEndpoints("/custom-prefix"); + + group.ShouldNotBeNull(); + group.ShouldBeOfType(); + } + + [Fact] + public void uses_default_prefix_when_not_specified() + { + var builder = WebApplication.CreateBuilder(); + builder.Host.UseWolverine(); + builder.Services.AddWolverineHttp(); + + var app = builder.Build(); + var group = app.MapWolverineHttpTransportEndpoints(); + + group.ShouldNotBeNull(); + group.ShouldBeOfType(); + } + + [Fact] + public void can_provide_custom_json_serializer_options() + { + var customOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, + PropertyNameCaseInsensitive = true + }; + + var builder = WebApplication.CreateBuilder(); + builder.Host.UseWolverine(); + builder.Services.AddWolverineHttp(); + + var app = builder.Build(); + var group = app.MapWolverineHttpTransportEndpoints("/_wolverine", customOptions); + + // The custom options will be passed to the HttpTransportExecutor.InvokeAsync method + // and used when deserializing CloudEvents + group.ShouldNotBeNull(); + } + + [Fact] + public void maps_both_batch_and_invoke_endpoints() + { + var builder = WebApplication.CreateBuilder(); + builder.Host.UseWolverine(); + builder.Services.AddWolverineHttp(); + + var app = builder.Build(); + var group = app.MapWolverineHttpTransportEndpoints(); + + // Verify RouteGroupBuilder is returned + group.ShouldNotBeNull(); + group.ShouldBeOfType(); + } + + [Fact] + public void returns_route_group_builder_for_further_configuration() + { + var builder = WebApplication.CreateBuilder(); + builder.Host.UseWolverine(); + builder.Services.AddWolverineHttp(); + + var app = builder.Build(); + var group = app.MapWolverineHttpTransportEndpoints(); + + group.ShouldNotBeNull(); + group.ShouldBeOfType(); + } +} + +public record TestCommand(string Value); +public static class TestCommandHandler +{ + public static void Handle(TestCommand command) + { + // Just receive it + } +} + diff --git a/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs b/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs index 41e57f5e9..44e0cea7e 100644 --- a/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs +++ b/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Runtime; @@ -20,6 +21,11 @@ public override ValueTask BuildListenerAsync(IWolverineRuntime runtim return ValueTask.FromResult(new NulloListener(Uri)); } + public JsonSerializerOptions SerializerOptions { get; set; } = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + WriteIndented = false + }; protected override ISender CreateSender(IWolverineRuntime runtime) { return Mode == EndpointMode.Inline diff --git a/src/Http/Wolverine.Http/Transport/HttpTransport.cs b/src/Http/Wolverine.Http/Transport/HttpTransport.cs index 7598f8911..054a64bac 100644 --- a/src/Http/Wolverine.Http/Transport/HttpTransport.cs +++ b/src/Http/Wolverine.Http/Transport/HttpTransport.cs @@ -16,6 +16,8 @@ public HttpTransport() : base("https", "HTTP Transport") public const string EnvelopeContentType = "binary/wolverine-envelope"; public const string EnvelopeBatchContentType = "binary/wolverine-envelopes"; + public const string CloudEventsContentType = "application/cloudevents+json"; + public const string CloudEventsBatchContentType = "application/cloudevents-batch+json"; protected override IEnumerable endpoints() { diff --git a/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs b/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs index fb5944292..a2c74f51b 100644 --- a/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs +++ b/src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using JasperFx.Core; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http.HttpResults; @@ -5,6 +6,7 @@ using Wolverine.ErrorHandling; using Wolverine.Runtime; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Interop; using Wolverine.Runtime.Serialization; using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; @@ -67,11 +69,12 @@ public async Task ExecuteBatchAsync(HttpContext httpContext) return Results.Ok(); } - public async Task InvokeAsync(HttpContext httpContext) + public async Task InvokeAsync(HttpContext httpContext, JsonSerializerOptions options) { if (httpContext.Request.Headers.TryGetValue("content-type", out var values)) { - if (values[0] != HttpTransport.EnvelopeContentType) + if (values[0] != HttpTransport.EnvelopeContentType && + values[0] != "application/cloudevents+json") { return Results.StatusCode(415); } @@ -80,9 +83,20 @@ public async Task InvokeAsync(HttpContext httpContext) { return Results.StatusCode(415); } - - var data = await httpContext.Request.Body.ReadAllBytesAsync(); - var envelope = EnvelopeSerializer.Deserialize(data); + + Envelope? envelope = null; + if (values[0] == HttpTransport.EnvelopeContentType) + { + var data = await httpContext.Request.Body.ReadAllBytesAsync(); envelope = EnvelopeSerializer.Deserialize(data); + } + else + { + var cloudEventMapper = new CloudEventsMapper(_runtime.Handlers, options); + var data = await httpContext.Request.Body.ReadAllTextAsync(); + envelope = new Envelope(); + cloudEventMapper.MapIncoming(envelope, data); + } + envelope.Destination = $"http://localhost{httpContext.Request.Path}".ToUri(); envelope.DoNotCascadeResponse = true; envelope.Serializer = _runtime.Options.FindSerializer(envelope.ContentType); diff --git a/src/Http/Wolverine.Http/Transport/HttpTransportExtensions.cs b/src/Http/Wolverine.Http/Transport/HttpTransportExtensions.cs index b95d5f09c..665e5ba76 100644 --- a/src/Http/Wolverine.Http/Transport/HttpTransportExtensions.cs +++ b/src/Http/Wolverine.Http/Transport/HttpTransportExtensions.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Text.Json; using JasperFx.Core.Reflection; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; @@ -15,14 +16,14 @@ public static class HttpTransportExtensions /// /// /// - public static RouteGroupBuilder MapWolverineHttpTransportEndpoints(this IEndpointRouteBuilder endpoints, string groupUrlPrefix = "/_wolverine") + public static RouteGroupBuilder MapWolverineHttpTransportEndpoints(this IEndpointRouteBuilder endpoints, string groupUrlPrefix = "/_wolverine", JsonSerializerOptions? jsonOptions = null) { var group = endpoints.MapGroup(groupUrlPrefix); group.MapPost( "/batch/{queue}",(HttpContext c, HttpTransportExecutor executor) => executor.ExecuteBatchAsync(c)); - group.MapPost("/invoke", (HttpContext c, HttpTransportExecutor executor) => executor.InvokeAsync(c)); + group.MapPost("/invoke", (HttpContext c, HttpTransportExecutor executor) => executor.InvokeAsync(c, jsonOptions)); return group; } @@ -32,13 +33,26 @@ public static RouteGroupBuilder MapWolverineHttpTransportEndpoints(this IEndpoin /// /// /// + /// + /// + /// /// - public static HttpTransportSubscriberConfiguration ToHttpEndpoint(this IPublishToExpression publishing, string url, bool supportsNativeScheduledSend = false) + public static HttpTransportSubscriberConfiguration ToHttpEndpoint( + this IPublishToExpression publishing, + string url, + bool supportsNativeScheduledSend = false, + bool useCloudEvents = false, + JsonSerializerOptions options = null) { - var transports = publishing.As().Parent.Transports; + var transports = + publishing.As().Parent.Transports; var transport = transports.GetOrCreate(); var endpoint = transport.EndpointFor(url); + if (useCloudEvents) + { + endpoint.SerializerOptions = options; + } // This is necessary unfortunately to hook up the subscription rules publishing.To(endpoint.Uri); diff --git a/src/Http/Wolverine.Http/Transport/IWolverineHttpTransportClient.cs b/src/Http/Wolverine.Http/Transport/IWolverineHttpTransportClient.cs index 4285ad8a1..e87c5e70e 100644 --- a/src/Http/Wolverine.Http/Transport/IWolverineHttpTransportClient.cs +++ b/src/Http/Wolverine.Http/Transport/IWolverineHttpTransportClient.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using Wolverine.Transports; namespace Wolverine.Http.Transport; @@ -5,5 +6,5 @@ namespace Wolverine.Http.Transport; public interface IWolverineHttpTransportClient { Task SendBatchAsync(string uri, OutgoingMessageBatch batch); - Task SendAsync(string uri, Envelope envelope); + Task SendAsync(string uri, Envelope envelope, JsonSerializerOptions serializerOptions); } \ No newline at end of file diff --git a/src/Http/Wolverine.Http/Transport/InlineHttpSender.cs b/src/Http/Wolverine.Http/Transport/InlineHttpSender.cs index 01bba343b..b36ec7aa0 100644 --- a/src/Http/Wolverine.Http/Transport/InlineHttpSender.cs +++ b/src/Http/Wolverine.Http/Transport/InlineHttpSender.cs @@ -14,7 +14,7 @@ public async ValueTask SendAsync(Envelope envelope) using var scope = services.CreateScope(); var client = scope.ServiceProvider.GetRequiredService() ?? throw new InvalidOperationException("IWolverineHttpTransportClient is not registered in the service container"); - await client.SendAsync(endpoint.OutboundUri, envelope); + await client.SendAsync(endpoint.OutboundUri, envelope, endpoint.SerializerOptions); } catch (Exception ex) { diff --git a/src/Http/Wolverine.Http/Transport/WolverineHttpTransportClient.cs b/src/Http/Wolverine.Http/Transport/WolverineHttpTransportClient.cs index b028ac474..ccf978736 100644 --- a/src/Http/Wolverine.Http/Transport/WolverineHttpTransportClient.cs +++ b/src/Http/Wolverine.Http/Transport/WolverineHttpTransportClient.cs @@ -1,4 +1,6 @@ using System.Net.Http.Headers; +using System.Text.Json; +using Wolverine.Runtime.Interop; using Wolverine.Runtime.Serialization; using Wolverine.Transports; @@ -14,11 +16,31 @@ public async Task SendBatchAsync(string uri, OutgoingMessageBatch batch) await client.PostAsync(client.BaseAddress, content); } - public async Task SendAsync(string uri, Envelope envelope) + public async Task SendAsync(string uri, Envelope envelope, JsonSerializerOptions options = null) { var client = clientFactory.CreateClient(uri); var content = new ByteArrayContent(EnvelopeSerializer.Serialize(envelope)); content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransport.EnvelopeContentType); await client.PostAsync(client.BaseAddress, content); } +} + +public class WolverineHttpTransportClientCloudEvents(IHttpClientFactory clientFactory) : IWolverineHttpTransportClient +{ + public async Task SendBatchAsync(string uri, OutgoingMessageBatch batch) + { + var client = clientFactory.CreateClient(uri); + var content = new ByteArrayContent(EnvelopeSerializer.Serialize(batch.Messages)); + content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransport.EnvelopeBatchContentType); + await client.PostAsync(client.BaseAddress, content); + } + + public async Task SendAsync(string uri, Envelope envelope, JsonSerializerOptions options = null) + { + var client = clientFactory.CreateClient(uri); + var ce = new CloudEventsEnvelope(envelope); + var content = new StringContent(JsonSerializer.Serialize(ce, options)); + content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransport.CloudEventsContentType); + await client.PostAsync(client.BaseAddress, content); + } } \ No newline at end of file