Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using NSubstitute;
using Wolverine.ComplianceTests;
using Wolverine.Runtime;
using Wolverine.Runtime.Serialization;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using Wolverine.Transports.Stub;
using Xunit;

namespace CoreTests.Runtime.WorkerQueues;

public class when_durable_receiver_fails_to_unwrap_envelope_metadata : IAsyncLifetime
{
private readonly Envelope theEnvelope = ObjectMother.Envelope();
private readonly IListener theListener = Substitute.For<IListener>();
private readonly IHandlerPipeline thePipeline = Substitute.For<IHandlerPipeline>();
private readonly DurableReceiver theReceiver;
private readonly MockWolverineRuntime theRuntime;

public when_durable_receiver_fails_to_unwrap_envelope_metadata()
{
theRuntime = new MockWolverineRuntime();

var stubEndpoint = new StubEndpoint("one", new StubTransport());
theReceiver = new DurableReceiver(stubEndpoint, theRuntime, thePipeline);

// Use a serializer that implements IUnwrapsMetadataMessageSerializer so
// UnwrapEnvelopeIfNecessary will call Unwrap — then make it throw
var serializer = Substitute.For<IUnwrapsMetadataMessageSerializer>();
serializer.When(s => s.Unwrap(Arg.Any<Envelope>()))
.Throw(new Exception("Failed to unwrap metadata"));

theEnvelope.MessageType = null; // triggers the unwrap path
theEnvelope.Serializer = serializer;
}

public async Task InitializeAsync()
{
await theReceiver.ReceivedAsync(theListener, theEnvelope);
await theReceiver.DrainAsync();
}

public Task DisposeAsync() => Task.CompletedTask;

[Fact]
public async Task the_listener_was_completed_so_the_transport_does_not_redeliver_the_message()
{
await theListener.Received().CompleteAsync(theEnvelope);
}

[Fact]
public async Task the_envelope_was_not_processed()
{
await thePipeline.DidNotReceive().InvokeAsync(theEnvelope, theReceiver);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using NSubstitute;
using Wolverine.ComplianceTests;
using Wolverine.Runtime;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using Wolverine.Transports.Stub;
using Xunit;

namespace CoreTests.Runtime.WorkerQueues;

public class when_durable_receiver_receives_envelope_with_missing_message_type : IAsyncLifetime
{
private readonly Envelope theEnvelope = ObjectMother.Envelope();
private readonly IListener theListener = Substitute.For<IListener>();
private readonly IHandlerPipeline thePipeline = Substitute.For<IHandlerPipeline>();
private readonly DurableReceiver theReceiver;
private readonly MockWolverineRuntime theRuntime;

public when_durable_receiver_receives_envelope_with_missing_message_type()
{
theRuntime = new MockWolverineRuntime();

var stubEndpoint = new StubEndpoint("one", new StubTransport());
theReceiver = new DurableReceiver(stubEndpoint, theRuntime, thePipeline);

theEnvelope.MessageType = null;
}

public async Task InitializeAsync()
{
await theReceiver.ReceivedAsync(theListener, theEnvelope);
await theReceiver.DrainAsync();
}

public Task DisposeAsync() => Task.CompletedTask;

[Fact]
public async Task the_listener_was_completed_so_the_transport_does_not_redeliver_the_message()
{
await theListener.Received().CompleteAsync(theEnvelope);
}

[Fact]
public async Task the_envelope_was_not_processed()
{
await thePipeline.DidNotReceive().InvokeAsync(theEnvelope, theReceiver);
}
}
2 changes: 2 additions & 0 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ await executeWithRetriesAsync(async () =>
envelope.MessageType ??= $"unknown/{e.GetType().Name}";
envelope.Failure = e;
await _moveToErrors.PostAsync(envelope);
await _completeBlock.PostAsync(envelope);
return;
}

Expand All @@ -442,6 +443,7 @@ await executeWithRetriesAsync(async () =>
{
_logger.LogInformation("Empty or missing message type name for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination);
await _moveToErrors.PostAsync(envelope);
await _completeBlock.PostAsync(envelope);
return;
}

Expand Down
Loading