Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void warns_when_a_readaggregate_handler_is_promoted_by_the_aggregatehandl
// GH-2922 guardrail: bootstrapping should have logged a warning for AggregateHandler, which uses
// [ReadAggregate] and is auto-promoted into the aggregate workflow purely by its name.
_context.Warnings.ShouldContain(w =>
w.Contains(typeof(AggregateHandler).FullName!) && w.Contains("AggregateHandler"));
w.Contains(typeof(SomethingAggregateHandler).FullName!) && w.Contains("AggregateHandler"));
}
}

Expand Down Expand Up @@ -142,7 +142,7 @@ public async Task InitializeAsync()
opts.Policies.AutoApplyTransactions();

opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(AggregateHandler))
.IncludeType(typeof(SomethingAggregateHandler))
.IncludeType(typeof(PublishReader))
.IncludeType(typeof(ScheduleReader))
.IncludeType(typeof(SomeOtherHandler));
Expand Down Expand Up @@ -216,7 +216,7 @@ public record SomethingWasScheduled(Guid Id);

// Name ends with "AggregateHandler" -> auto-promoted into the Marten aggregate event-sourcing
// workflow, so the return value is appended to the LetterAggregate stream as an event.
public static class AggregateHandler
public static class SomethingAggregateHandler
{
public static SomethingWasScheduled Handle(
PublishSomethingUsingAggregate command,
Expand Down
15 changes: 2 additions & 13 deletions src/Persistence/MartenTests/Distribution/with_ancillary_stores.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ protected async Task<IHost> startHostAsync()

opts.UseMessagePackSerialization();

opts.UseSharedMemoryQueueing();

opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
Expand Down Expand Up @@ -123,17 +121,8 @@ public async Task can_do_the_full_marten_reset_all_data_call()

await theOriginalHost.ResetAllMartenDataAsync<ITripStore>();
}

// Skipped pending #2965: the SharedMemory transport does not serialize
// envelopes on cross-host send, so AssignAgent system commands arrive at
// other nodes with empty Data/MessageType and the receive pipeline can't
// deserialize them — agents get stopped on the leader but never re-start
// on the destination node, causing this distribution test to time out.
// All 6 event-subscription agents enumerate and start correctly on a
// single host; the failure is purely in cross-node redistribution.
// Marten CI has been red here through V6.2.0 and V6.2.1; unblocking the
// V6.2.2 release with this skip while #2965 is resolved separately.
[Fact(Skip = "Pending fix for SharedMemory transport cross-node AssignAgent serialization — see #2965")]

[Fact]
public async Task spread_out_over_multiple_hosts()
{
await theOriginalHost.WaitUntilAssumesLeadershipAsync(5.Seconds());
Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/Envelope.Internals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,9 @@ internal void Reset()
WireTap = null;
Store = null;
}

internal bool IsEmpty()
{
return (_message == null && (_data == null || (MessageType.IsEmpty()))) ;
}
}
10 changes: 6 additions & 4 deletions src/Wolverine/Envelope.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using JasperFx.Core;
using System.Diagnostics;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.MultiTenancy;
using MassTransit;
Expand Down Expand Up @@ -42,6 +43,7 @@ public Envelope(object message, IEnumerable<Envelope> batch)

public Envelope()
{
Debug.WriteLine("Being created");
}

public Envelope(object message)
Expand Down Expand Up @@ -149,7 +151,7 @@ public TimeSpan? ScheduleDelay
{
return _data;
}
AssertMessage();
assertMessage();

if(Serializer is IAsyncMessageSerializer asyncMessaeSerializer)
{
Expand Down Expand Up @@ -179,7 +181,7 @@ public byte[]? Data
return _data;
}

AssertMessage();
assertMessage();

if (Serializer == null)
{
Expand Down Expand Up @@ -207,7 +209,7 @@ public byte[]? Data
set => _data = value;
}

private void AssertMessage()
private void assertMessage()
{
if (_message == null)
{
Expand Down