Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>6.4.0</Version>
<Version>6.4.1</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
17 changes: 17 additions & 0 deletions src/Testing/CoreTests/Runtime/MoveToErrorQueueTester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,21 @@ public async Task logging_calls()
theRuntime.MessageTracking.Received().MessageFailed(theEnvelope, theException);
theRuntime.MessageTracking.Received().MovedToErrorQueue(theEnvelope, theException);
}

[Fact]
public async Task does_not_NRE_when_envelope_has_no_destination()
{
// GH-3013: a malformed system envelope (no Destination) must not NRE on the
// `lifecycle.Envelope!.Destination!.Scheme` read before EnableAutomaticFailureAcks
// even gets a chance to short-circuit. Enabling acks reproduces the pre-fix bug.
theRuntime.Options.EnableAutomaticFailureAcks = true;
theEnvelope.Destination = null;

// A throw here fails the test — pre-fix this NRE'd at the `scheme` variable init.
await theContinuation.ExecuteAsync(theLifecycle, theRuntime, DateTimeOffset.Now, null);

// No scheme → no failure ack attempted, and the move-to-DLQ still happens.
await theLifecycle.DidNotReceive().SendFailureAcknowledgementAsync(Arg.Any<string>());
await theLifecycle.Received().MoveToDeadLetterQueueAsync(theException);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System.Collections.Concurrent;
using System.Reflection;
using JasperFx.Blocks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine;
using Wolverine.Runtime;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports.Local;
using Xunit;

namespace CoreTests.Runtime.WorkerQueues;

public class buffered_receiver_null_listener_guard_3013
{
[Fact]
public async Task listenerless_envelope_does_not_NRE_in_defer_or_complete_blocks()
{
// GH-3013: the empty all-zero startup / agent-handshake envelope reaches the buffered
// receiver's defer/complete retry blocks with Listener == null. Pre-fix the unguarded
// env.Listener! deref NRE'd inside the retry loop; RetryBlock catches + logs it (and retries),
// so the failure surfaces only as logged errors that aggregate into tracked-session failures
// across the consumer's whole integration sweep — not as a thrown exception here.
var captor = new CapturingLoggerProvider();

using var host = await Host.CreateDefaultBuilder()
.ConfigureLogging(logging => logging.AddProvider(captor))
.UseWolverine(opts => { opts.LocalQueue("buffered-3013"); })
.StartAsync();

var runtime = host.Services.GetRequiredService<IWolverineRuntime>();
var endpoint = (LocalQueue?)runtime.Endpoints.EndpointByName("buffered-3013")
?? throw new InvalidOperationException("buffered-3013 not found");
var receiver = (BufferedReceiver)endpoint.Agent!;

captor.Errors.Clear(); // ignore anything logged during bootstrap

var deferBlock = retryBlock(receiver, "_deferBlock");
var completeBlock = retryBlock(receiver, "_completeBlock");

// The malformed system envelope: no Listener, no Destination.
var empty = new Envelope();

await deferBlock.PostAsync(empty);
await completeBlock.PostAsync(empty);
await deferBlock.DrainAsync();
await completeBlock.DrainAsync();

captor.Errors.ShouldNotContain(e => e is NullReferenceException);
}

private static RetryBlock<Envelope> retryBlock(BufferedReceiver receiver, string field)
{
var info = typeof(BufferedReceiver).GetField(field, BindingFlags.NonPublic | BindingFlags.Instance)!;
return (RetryBlock<Envelope>)info.GetValue(receiver)!;
}
}

/// <summary>
/// Minimal <see cref="ILoggerProvider"/> that records the exceptions passed to any log call,
/// so a test can assert no <see cref="NullReferenceException"/> was logged (the GH-3013 symptom,
/// which <c>RetryBlock</c> swallows and logs rather than re-throwing).
/// </summary>
public sealed class CapturingLoggerProvider : ILoggerProvider
{
public ConcurrentBag<Exception> Errors { get; } = new();

public ILogger CreateLogger(string categoryName) => new CapturingLogger(Errors);

public void Dispose()
{
}

private sealed class CapturingLogger(ConcurrentBag<Exception> errors) : ILogger
{
public IDisposable BeginScope<TState>(TState state) where TState : notnull => NullScope.Instance;

public bool IsEnabled(LogLevel logLevel) => true;

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (exception != null)
{
errors.Add(exception);
}
}

private sealed class NullScope : IDisposable
{
public static readonly NullScope Instance = new();
public void Dispose() { }
}
}
}
7 changes: 5 additions & 2 deletions src/Wolverine/ErrorHandling/MoveToErrorQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle,
DateTimeOffset now, Activity? activity)
{
// TODO -- at some point, we need a more systematic way of doing this
var scheme = lifecycle.Envelope!.Destination!.Scheme;
if (runtime.Options.EnableAutomaticFailureAcks && scheme != TransportConstants.Local && scheme != "external-table")
// Defensive: a malformed system envelope (no Destination) shouldn't NRE here before
// EnableAutomaticFailureAcks even gets a chance to short-circuit. The envelope itself is
// always present (the block below already relies on it); only Destination can be null. GH-3013.
var scheme = lifecycle.Envelope!.Destination?.Scheme;
if (scheme is not null && runtime.Options.EnableAutomaticFailureAcks && scheme != TransportConstants.Local && scheme != "external-table")
{
await lifecycle.SendFailureAcknowledgementAsync(
$"Moved message {lifecycle.Envelope!.Id} to the Error Queue.\n{Exception}");
Expand Down
9 changes: 7 additions & 2 deletions src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ public BufferedReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPi

_scheduler = new InMemoryScheduledJobProcessor(this, _logger);

_deferBlock = new RetryBlock<Envelope>((env, _) => env.Listener!.DeferAsync(env).AsTask(), runtime.Logger,
// Guard against a listener-less envelope (e.g. the empty all-zero system/agent-handshake
// envelope produced during startup) reaching either retry block — env.Listener is null in
// that case, and an unguarded deref NREs into the retry loop. GH-3013.
_deferBlock = new RetryBlock<Envelope>(
(env, _) => env.Listener is { } l ? l.DeferAsync(env).AsTask() : Task.CompletedTask, runtime.Logger,
runtime.Cancellation);
_completeBlock = new RetryBlock<Envelope>((env, _) => env.Listener!.CompleteAsync(env).AsTask(), runtime.Logger,
_completeBlock = new RetryBlock<Envelope>(
(env, _) => env.Listener is { } l ? l.CompleteAsync(env).AsTask() : Task.CompletedTask, runtime.Logger,
runtime.Cancellation);

_receivingBlock = endpoint.GroupShardingSlotNumber == null
Expand Down
Loading