Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;
using NATS.Client.Core;

namespace Wolverine.Nats.Tests;

Expand Down Expand Up @@ -184,6 +185,42 @@ public void server_version_is_detected_and_scheduled_send_support_is_determined(
}
}

[Fact]
public async Task receive_message_without_type_header_using_default_incoming_message()
{
var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222";
if (!await IsNatsAvailable(natsUrl)) return;

var subject = Guid.NewGuid().ToString();

using var receiver = await Host.CreateDefaultBuilder()
.ConfigureLogging(logging => logging.AddXunitLogging(_output))
.UseWolverine(opts =>
{
opts.ServiceName = "ReceiverWithDefault";
opts.UseNats(natsUrl).AutoProvision();
opts.ListenToNatsSubject(subject)
.DefaultIncomingMessage<DefaultTestMessage>()
.BufferedInMemory();
})
.StartAsync();

await using var nats = new NatsConnection(new NatsOpts { Url = natsUrl });
await nats.ConnectAsync();

var messageData = System.Text.Encoding.UTF8.GetBytes("{\"Text\":\"Hello without header\"}");

var tracked = await receiver.TrackActivity()
.Timeout(10.Seconds())
.WaitForMessageToBeReceivedAt<DefaultTestMessage>(receiver)
.ExecuteAndWaitAsync(c =>
{
return nats.PublishAsync(subject, messageData).AsTask();
});

tracked.Received.SingleMessage<DefaultTestMessage>().Text.Should().Be("Hello without header");
}

private async Task<bool> IsNatsAvailable(string natsUrl)
{
try
Expand Down Expand Up @@ -213,3 +250,12 @@ public void Handle(TestMessage message)
{
}
}

public record DefaultTestMessage(string Text);

public class DefaultTestMessageHandler
{
public void Handle(DefaultTestMessage message)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var msg in ((INatsSub<byte[]>)subscription).Msgs.ReadAllAsync(cancellation)
// Skip messages without headers or without message-type header.
// These are typically NATS protocol messages (JetStream acks, etc.)
// that should not be processed by Wolverine.
if (msg.Headers == null || !msg.Headers.ContainsKey("message-type"))
if (_endpoint.MessageType == null && (msg.Headers == null || !msg.Headers.ContainsKey("message-type")))
{
_logger.LogDebug(
"Skipping NATS message without message-type header from subject {Subject}. DataLength={DataLength}, HasHeaders={HasHeaders}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,17 @@ CancellationToken cancellation

// Skip messages without headers or without message-type header.
// These are typically NATS protocol messages that should not be processed by Wolverine.
if (msg.Headers == null || !msg.Headers.ContainsKey("message-type"))
if (_endpoint.MessageType == null && (msg.Headers == null || !msg.Headers.ContainsKey("message-type")))
{
_logger.LogDebug(
"Skipping NATS message without message-type header from subject {Subject}. DataLength={DataLength}, HasHeaders={HasHeaders}",
msg.Subject,
msg.Data.Length,
msg.Headers != null
);
await msg.AckAsync(cancellationToken: cancellation);
continue;
}

var envelope = new NatsEnvelope(null, msg);
_mapper.MapIncomingToEnvelope(envelope, msg);

Expand Down
Loading