Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 115 additions & 6 deletions src/Persistence/Wolverine.Marten/TestingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events;
using Marten.Events.Daemon.Coordination;
using Microsoft.Extensions.DependencyInjection;
using Wolverine.Runtime;
using Wolverine.Tracking;

namespace Wolverine.Marten;
Expand Down Expand Up @@ -46,16 +52,55 @@ public static TrackedSessionConfiguration PauseThenCatchUpOnMartenDaemonActivity
{
configuration.BeforeExecution(async (runtime, cancellation) =>
{
var envelope = new Envelope
{
MessageType = "Pause:Marten:Daemons"
};

runtime.MessageTracking.ExecutionStarted(envelope);

await runtime.Services.PauseAllDaemonsAsync();

runtime.MessageTracking.ExecutionFinished(envelope);
});

return configuration.AddStage(async (runtime, _, cancellation) =>
{
var exceptions = await runtime.Services.ForceAllMartenDaemonActivityToCatchUpAsync(cancellation, mode);
foreach (var e in exceptions)
var envelope = new Envelope
{
MessageType = "CatchUp:Marten:DaemonActivity"
};

runtime.MessageTracking.ExecutionStarted(envelope);

// TODO -- be nice if this was in Marten itself
var coordinator = runtime.Services.GetRequiredService<IProjectionCoordinator>();
var daemons = await coordinator.AllDaemonsAsync().ConfigureAwait(false);
var subscriptions = new List<IDisposable>();
var observer = new TrackedSessionShardWatcher(runtime);
foreach (var daemon in daemons)
{
var subscription = daemon.Tracker.Subscribe(observer);
subscriptions.Add(subscription);
}

try
{
var exceptions = await runtime.Services.ForceAllMartenDaemonActivityToCatchUpAsync(cancellation, mode);
foreach (var exception in exceptions)
{
runtime.MessageTracking.LogException(exception);
}
}
finally
{
runtime.MessageTracking.LogException(e);
foreach (var subscription in subscriptions)
{
subscription.SafeDispose();
}
}

runtime.MessageTracking.ExecutionFinished(envelope);
});
}

Expand All @@ -73,16 +118,55 @@ public static TrackedSessionConfiguration PauseThenCatchUpOnMartenDaemonActivity
{
configuration.BeforeExecution(async (runtime, cancellation) =>
{
var envelope = new Envelope
{
MessageType = "Pause:Marten:Daemons:" + typeof(T).NameInCode()
};

runtime.MessageTracking.ExecutionStarted(envelope);

await runtime.Services.PauseAllDaemonsAsync<T>();

runtime.MessageTracking.ExecutionFinished(envelope);
});

return configuration.AddStage(async (runtime, _, cancellation) =>
{
var exceptions = await runtime.Services.ForceAllMartenDaemonActivityToCatchUpAsync<T>(cancellation, mode);
foreach (var exception in exceptions)
var envelope = new Envelope
{
MessageType = "CatchUp:Marten:DaemonActivity:" + typeof(T).FullNameInCode()
};

runtime.MessageTracking.ExecutionStarted(envelope);

// TODO -- be nice if this was in Marten itself
var coordinator = runtime.Services.GetRequiredService<IProjectionCoordinator<T>>();
var daemons = await coordinator.AllDaemonsAsync().ConfigureAwait(false);
var subscriptions = new List<IDisposable>();
var observer = new TrackedSessionShardWatcher(runtime);
foreach (var daemon in daemons)
{
var subscription = daemon.Tracker.Subscribe(observer);
subscriptions.Add(subscription);
}

try
{
var exceptions = await runtime.Services.ForceAllMartenDaemonActivityToCatchUpAsync<T>(cancellation, mode);
foreach (var exception in exceptions)
{
runtime.MessageTracking.LogException(exception);
}
}
finally
{
runtime.MessageTracking.LogException(exception);
foreach (var subscription in subscriptions)
{
subscription.SafeDispose();
}
}

runtime.MessageTracking.ExecutionFinished(envelope);
});
}

Expand Down Expand Up @@ -132,4 +216,29 @@ public static TrackedSessionConfiguration WaitForNonStaleDaemonDataAfterExecutio
}
});
}
}

internal class TrackedSessionShardWatcher : IObserver<ShardState>
{
private readonly IWolverineRuntime _runtime;

public TrackedSessionShardWatcher(IWolverineRuntime runtime)
{
_runtime = runtime;
}

public void OnCompleted()
{

}

public void OnError(Exception error)
{

}

public void OnNext(ShardState value)
{
_runtime.MessageTracking.LogStatus(value.ToString());
}
}
7 changes: 7 additions & 0 deletions src/Wolverine/Logging/IMessageLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ namespace Wolverine.Logging;
/// </summary>
public interface IMessageTracker
{
/// <summary>
/// Strictly for automated testing support. Simply track a message that will be displayed
/// in the execution order in any test failures from timeouts with tracked sessions
/// </summary>
/// <param name="message"></param>
void LogStatus(string message);

/// <summary>
/// Catch all hook for any exceptions encountered by the messaging
/// </summary>
Expand Down
7 changes: 6 additions & 1 deletion src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public void LogException(Exception ex, object? correlationId = null, string mess
{
_runtime.LogException(ex, correlationId, message);
}


public void LogStatus(string message)
{
_runtime.ActiveSession?.LogStatus(message);
}

public ILogger Logger { get; }

public void Sent(Envelope envelope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public void Received(Envelope envelope)
{
_runtime.Received(envelope);
}

public void LogStatus(string message)
{
_runtime.ActiveSession?.LogStatus(message);
}

public void ExecutionStarted(Envelope envelope)
{
Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/Runtime/WolverineRuntime.Tracking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ public void LogException(Exception ex, object? correlationId = null,
ActiveSession?.LogException(ex, _serviceName);
Logger.LogError(ex, message);
}

public void LogStatus(string message)
{
ActiveSession?.LogStatus(message);
}
}
12 changes: 6 additions & 6 deletions src/Wolverine/Tracking/EnvelopeRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ namespace Wolverine.Tracking;

public class EnvelopeRecord
{
public EnvelopeRecord(MessageEventType eventType, Envelope envelope, long sessionTime, Exception? exception)
public EnvelopeRecord(MessageEventType eventType, Envelope? envelope, long sessionTime, Exception? exception)
{
Envelope = envelope;
SessionTime = sessionTime;
Exception = exception;
MessageEventType = eventType;
AttemptNumber = envelope.Attempts;
AttemptNumber = envelope?.Attempts ?? 0;

WasScheduled = envelope.IsScheduledForLater(DateTimeOffset.UtcNow);
WasScheduled = envelope?.IsScheduledForLater(DateTimeOffset.UtcNow) ?? false;

var activity = Activity.Current;
if (activity != null)
Expand All @@ -24,10 +24,10 @@ public EnvelopeRecord(MessageEventType eventType, Envelope envelope, long sessio
ActivityId = activity.Id;
}
}

public bool WasScheduled { get; set; }

public object? Message => Envelope.Message;
public object? Message => Envelope?.Message;

/// <summary>
/// If available, the open telemetry activity id when
Expand All @@ -37,7 +37,7 @@ public EnvelopeRecord(MessageEventType eventType, Envelope envelope, long sessio
public string? ParentId { get; init; }
public string? RootId { get; init; }

public Envelope Envelope { get; private set; }
public Envelope? Envelope { get; private set; }

/// <summary>
/// A timestamp of the milliseconds since the tracked session was started before this event
Expand Down
3 changes: 2 additions & 1 deletion src/Wolverine/Tracking/MessageEventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public enum MessageEventType
MovedToErrorQueue,
Requeued,
Scheduled,
Discarded
Discarded,
Status
}
#endregion
10 changes: 9 additions & 1 deletion src/Wolverine/Tracking/TrackedSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal partial class TrackedSession : ITrackedSession
private readonly IList<ITrackedCondition> _conditions = new List<ITrackedCondition>();

private Cache<Guid, EnvelopeHistory> _envelopes = new(id => new EnvelopeHistory(id));
private readonly List<EnvelopeRecord> _statuses = new();

private readonly IList<Exception> _exceptions = new List<Exception>();

Expand Down Expand Up @@ -142,13 +143,14 @@ public EnvelopeRecord[] FindEnvelopesWithMessageType<T>()

public EnvelopeRecord[] AllRecordsInOrder()
{
return _envelopes.SelectMany(x => x.Records).OrderBy(x => x.SessionTime).ToArray();
return _envelopes.SelectMany(x => x.Records).Concat(_statuses).OrderBy(x => x.SessionTime).ToArray();
}

public EnvelopeRecord[] AllRecordsInOrder(MessageEventType eventType)
{
return _envelopes
.SelectMany(x => x.Records)
.Concat(_statuses)
.Where(x => x.MessageEventType == eventType)
.OrderBy(x => x.SessionTime)
.ToArray();
Expand Down Expand Up @@ -483,6 +485,12 @@ public void IgnoreMessageTypes(Func<Type, bool> filter)
{
_ignoreMessageRules.Add(filter);
}

public void LogStatus(string message)
{
var record = new EnvelopeRecord(MessageEventType.Status, null, _stopwatch.ElapsedMilliseconds, null);
_statuses.Add(record);
}
}

internal class HostWrapper : IHost
Expand Down
Loading