Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/Testing/CoreTests/Persistence/ClaimCheck/local_queue_round_trip.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
using System.Text;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Persistence;
using Wolverine.Persistence.ClaimCheck.Internal;
using Wolverine.Tracking;
using Xunit;

namespace CoreTests.Persistence.ClaimCheck;

/// <summary>
/// Local-queue counterpart to <see cref="end_to_end_round_trip"/>. The cross-transport
/// suite only ever exercises the TCP path, which forces a serialize -> deserialize round
/// trip on the receiver. A message whose handler lives in the same process routes to an
/// in-process local queue instead, and a *durable* local queue serializes the envelope on
/// store (DurableLocalQueue.writeMessageData -> _serializer.Write) which fires the
/// claim-check off-load + Clear side-effect against the live in-memory message. This test
/// pins two things at once:
/// 1. the handler sees the fully re-hydrated payload (the local-queue re-hydration bug), and
/// 2. the off-loaded payload is NOT present in the serialized envelope body — i.e. the fix
/// that restores the live message after serialization does NOT smuggle the blob back into
/// the bytes on the bus and quietly defeat claim-check.
/// </summary>
public class local_queue_round_trip : IAsyncLifetime
{
private string _claimCheckDirectory = null!;

public Task InitializeAsync()
{
CapturedMessages.Reset();
_claimCheckDirectory = Path.Combine(Path.GetTempPath(),
"wolverine-claim-check-local-tests-" + Guid.NewGuid().ToString("N"));
return Task.CompletedTask;
}

public Task DisposeAsync()
{
try
{
if (Directory.Exists(_claimCheckDirectory))
{
Directory.Delete(_claimCheckDirectory, recursive: true);
}
}
catch
{
// ignore cleanup failures
}

return Task.CompletedTask;
}

private async Task<IHost> StartHostAsync()
{
return await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseClaimCheck(c => c.UseFileSystem(_claimCheckDirectory));

// Durable local queues serialize the envelope on store, which is the path that
// triggers the claim-check off-load + Clear side-effect on the in-memory message.
// A buffered (in-memory) local queue never serializes on the local hand-off, so the
// off-load never fires there and the bug does not reproduce.
opts.Policies.UseDurableLocalQueues();
}).StartAsync();
}

[Fact]
public async Task durable_local_queue_round_trips_a_string_blob_property()
{
// Distinctive sentinel so the body-content assertion below is unambiguous, and long
// enough that there is no inline/off-load threshold ambiguity.
var marker = "LOCAL-CLAIMCHECK-BODY-" + Guid.NewGuid().ToString("N");
var body = string.Join("\n", Enumerable.Repeat(marker, 200));

using var host = await StartHostAsync();

// SendMessageAndWaitAsync (not InvokeMessageAndWaitAsync) is deliberate: invoke executes
// the handler inline in the caller's context and never routes through the local queue's
// store-and-forward path, so it would not exercise the durable Write/Clear side-effect.
var session = await host.TrackActivity()
.SendMessageAndWaitAsync(new BlobStringMessage("note", body));

// 1. The handler must see the re-hydrated body. Before the fix this is null: the durable
// queue's Write nulls the property on the same in-memory message that the receiver
// re-enqueues, and the receive path skips deserialization because envelope.Message
// is not null.
var received = CapturedMessages.LastOf<BlobStringMessage>();
received.ShouldNotBeNull();
received.Title.ShouldBe("note");
received.Body.ShouldBe(body);

// 2. Claim-check must still be in force on the bus: the off-loaded payload must be
// replaced by a header token and must NOT appear in the serialized envelope body.
// This is the guard against the restore-on-send fix accidentally leaking the blob
// back into the bytes that get persisted/transmitted.
var sent = session.Sent.SingleEnvelope<BlobStringMessage>();
sent.Headers.Keys.ShouldContain(ClaimCheckHeaders.Prefix + nameof(BlobStringMessage.Body));

var serializedBody = sent.Data;
serializedBody.ShouldNotBeNull();
Encoding.UTF8.GetString(serializedBody!).ShouldNotContain(marker);
}

[Fact]
public async Task durable_local_queue_round_trips_a_byte_array_blob_property()
{
// A recognizable byte pattern that is vanishingly unlikely to appear in the JSON
// envelope scaffolding by chance, so "payload absent from body" is a real assertion.
var payload = Enumerable.Range(0, 2048).Select(i => (byte)(0xA0 | (i & 0x0F))).ToArray();

using var host = await StartHostAsync();

var session = await host.TrackActivity()
.SendMessageAndWaitAsync(new BlobByteArrayMessage("doc.pdf", payload));

var received = CapturedMessages.LastOf<BlobByteArrayMessage>();
received.ShouldNotBeNull();
received.Name.ShouldBe("doc.pdf");
received.Payload.ShouldNotBeNull();
received.Payload!.ShouldBe(payload);

var sent = session.Sent.SingleEnvelope<BlobByteArrayMessage>();
sent.Headers.Keys.ShouldContain(ClaimCheckHeaders.Prefix + nameof(BlobByteArrayMessage.Payload));

var serializedBody = sent.Data;
serializedBody.ShouldNotBeNull();
ContainsSubsequence(serializedBody!, payload).ShouldBeFalse(
"the off-loaded blob payload must not be present in the serialized envelope body");
}

// Naive substring search over bytes: enough to prove the contiguous payload is absent.
private static bool ContainsSubsequence(byte[] haystack, byte[] needle)
{
if (needle.Length == 0 || haystack.Length < needle.Length)
{
return false;
}

for (var i = 0; i <= haystack.Length - needle.Length; i++)
{
var match = true;
for (var j = 0; j < needle.Length; j++)
{
if (haystack[i + j] != needle[j])
{
match = false;
break;
}
}

if (match)
{
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,31 @@ public ClaimCheckMessageSerializer(IMessageSerializer inner, IClaimCheckStore st
public byte[] Write(Envelope envelope)
{
var message = envelope.Message;
List<OffloadedBlob>? offloaded = null;
if (message is not null)
{
var info = BlobTypeInfo.For(message.GetType());
if (info.HasBlobs)
{
#pragma warning disable VSTHRD002 // Documented blocking call, see class remarks
StoreBlobsAsync(envelope, message, info).GetAwaiter().GetResult();
offloaded = StoreBlobsAsync(envelope, message, info).GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}
}

return _inner.Write(envelope);
try
{
return _inner.Write(envelope);
}
finally
{
RestoreBlobs(message, offloaded);
}
Comment thread
prom3theu5 marked this conversation as resolved.
}

public byte[] WriteMessage(object message)
{
List<OffloadedBlob>? offloaded = null;
if (message is not null)
{
var info = BlobTypeInfo.For(message.GetType());
Expand All @@ -61,32 +70,47 @@ public byte[] WriteMessage(object message)
// but nullify the properties so the inner serializer doesn't pull
// bytes through the wire under both paths.
#pragma warning disable VSTHRD002
StoreBlobsAsync(envelope: null, message, info).GetAwaiter().GetResult();
offloaded = StoreBlobsAsync(envelope: null, message, info).GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}
}

return _inner.WriteMessage(message!);
try
{
return _inner.WriteMessage(message!);
}
finally
{
RestoreBlobs(message, offloaded);
}
}

public async ValueTask<byte[]> WriteAsync(Envelope envelope)
{
var message = envelope.Message;
List<OffloadedBlob>? offloaded = null;
if (message is not null)
{
var info = BlobTypeInfo.For(message.GetType());
if (info.HasBlobs)
{
await StoreBlobsAsync(envelope, message, info).ConfigureAwait(false);
offloaded = await StoreBlobsAsync(envelope, message, info).ConfigureAwait(false);
}
}

if (_innerAsync is not null)
try
{
return await _innerAsync.WriteAsync(envelope).ConfigureAwait(false);
}
if (_innerAsync is not null)
{
return await _innerAsync.WriteAsync(envelope).ConfigureAwait(false);
}

return _inner.Write(envelope);
return _inner.Write(envelope);
}
finally
{
RestoreBlobs(message, offloaded);
}
}

public object ReadFromData(Type messageType, Envelope envelope)
Expand Down Expand Up @@ -136,8 +160,18 @@ public object ReadFromData(byte[] data)
return message;
}

private async Task StoreBlobsAsync(Envelope? envelope, object message, BlobTypeInfo info)
/// <summary>
/// Off-load each blob property to the store and stamp the claim-check header. The property
/// is nulled on the message only so the inner serializer does not pull the payload bytes into
/// the persisted/wire body; the original payload is returned so the caller can restore the
/// live message after serialization. This mirrors <c>EncryptingMessageSerializer</c>, which
/// never leaves the in-memory message mutated — a local (in-process) hand-off reuses the same
/// object and skips deserialization, so a leaked Clear would reach the handler as a null
/// property (GH claim-check local-queue re-hydration bug).
/// </summary>
private async Task<List<OffloadedBlob>?> StoreBlobsAsync(Envelope? envelope, object message, BlobTypeInfo info)
{
List<OffloadedBlob>? offloaded = null;
foreach (var accessor in info.Properties)
{
var bytes = accessor.ReadPayload(message);
Expand All @@ -152,9 +186,33 @@ private async Task StoreBlobsAsync(Envelope? envelope, object message, BlobTypeI
envelope.Headers[accessor.HeaderName] = token.Serialize();
}
accessor.Clear(message);
(offloaded ??= new List<OffloadedBlob>()).Add(new OffloadedBlob(accessor, bytes));
}

return offloaded;
}

/// <summary>
/// Re-apply the off-loaded payloads to the live in-memory message after the inner serializer
/// has produced the body. <see cref="BlobPropertyAccessor.ApplyLoaded"/> is the same routine
/// the receive path uses to re-hydrate from the store, so a restored Stream property comes back
/// as a fresh readable stream rather than the consumed original.
/// </summary>
private static void RestoreBlobs(object? message, List<OffloadedBlob>? offloaded)
{
if (message is null || offloaded is null)
{
return;
}

foreach (var blob in offloaded)
{
blob.Accessor.ApplyLoaded(message, blob.Payload);
}
}

private readonly record struct OffloadedBlob(BlobPropertyAccessor Accessor, ReadOnlyMemory<byte> Payload);

private async Task LoadBlobsAsync(Envelope envelope, object message, BlobTypeInfo info)
{
foreach (var accessor in info.Properties)
Expand Down
Loading