diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs index a8dfb8690..02c1799ac 100644 --- a/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs @@ -8,6 +8,7 @@ using Xunit; using Xunit.Abstractions; using FluentAssertions; +using NATS.Client.Core; namespace Wolverine.Nats.Tests; @@ -184,6 +185,42 @@ public void server_version_is_detected_and_scheduled_send_support_is_determined( } } + [Fact] + public async Task receive_message_without_type_header_using_default_incoming_message() + { + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + if (!await IsNatsAvailable(natsUrl)) return; + + var subject = Guid.NewGuid().ToString(); + + using var receiver = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "ReceiverWithDefault"; + opts.UseNats(natsUrl).AutoProvision(); + opts.ListenToNatsSubject(subject) + .DefaultIncomingMessage() + .BufferedInMemory(); + }) + .StartAsync(); + + await using var nats = new NatsConnection(new NatsOpts { Url = natsUrl }); + await nats.ConnectAsync(); + + var messageData = System.Text.Encoding.UTF8.GetBytes("{\"Text\":\"Hello without header\"}"); + + var tracked = await receiver.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(receiver) + .ExecuteAndWaitAsync(c => + { + return nats.PublishAsync(subject, messageData).AsTask(); + }); + + tracked.Received.SingleMessage().Text.Should().Be("Hello without header"); + } + private async Task IsNatsAvailable(string natsUrl) { try @@ -213,3 +250,12 @@ public void Handle(TestMessage message) { } } + +public record DefaultTestMessage(string Text); + +public class DefaultTestMessageHandler +{ + public void Handle(DefaultTestMessage message) + { + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs b/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs index 1faa0d32e..ce8892e95 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs @@ -105,7 +105,7 @@ var msg in ((INatsSub)subscription).Msgs.ReadAllAsync(cancellation) // Skip messages without headers or without message-type header. // These are typically NATS protocol messages (JetStream acks, etc.) // that should not be processed by Wolverine. - if (msg.Headers == null || !msg.Headers.ContainsKey("message-type")) + if (_endpoint.MessageType == null && (msg.Headers == null || !msg.Headers.ContainsKey("message-type"))) { _logger.LogDebug( "Skipping NATS message without message-type header from subject {Subject}. DataLength={DataLength}, HasHeaders={HasHeaders}", diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs index da4778769..f931d8833 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs @@ -129,12 +129,17 @@ CancellationToken cancellation // Skip messages without headers or without message-type header. // These are typically NATS protocol messages that should not be processed by Wolverine. - if (msg.Headers == null || !msg.Headers.ContainsKey("message-type")) + if (_endpoint.MessageType == null && (msg.Headers == null || !msg.Headers.ContainsKey("message-type"))) { + _logger.LogDebug( + "Skipping NATS message without message-type header from subject {Subject}. DataLength={DataLength}, HasHeaders={HasHeaders}", + msg.Subject, + msg.Data.Length, + msg.Headers != null + ); await msg.AckAsync(cancellationToken: cancellation); continue; } - var envelope = new NatsEnvelope(null, msg); _mapper.MapIncomingToEnvelope(envelope, msg);