diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs index 1e12797d3..8867801fd 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs @@ -1,4 +1,5 @@ -using Weasel.Core; +using Microsoft.Extensions.Logging; +using Weasel.Core; using Wolverine.Transports; namespace Wolverine.RDBMS; @@ -9,6 +10,7 @@ public abstract partial class MessageDatabase public Task ScheduleExecutionAsync(Envelope envelope) { + Logger.LogDebug("Persisting envelope {EnvelopeId} ({MessageType}) as Scheduled in database inbox at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); return CreateCommand( $"update {SchemaName}.{DatabaseConstants.IncomingTable} set execution_time = @time, status = \'{EnvelopeStatus.Scheduled}\', attempts = @attempts, owner_id = {TransportConstants.AnyNode} where id = @id and {DatabaseConstants.ReceivedAt} = @uri;") .With("time", envelope.ScheduledTime!.Value) @@ -20,6 +22,7 @@ public Task ScheduleExecutionAsync(Envelope envelope) public Task RescheduleExistingEnvelopeForRetryAsync(Envelope envelope) { + Logger.LogDebug("Rescheduling envelope {EnvelopeId} ({MessageType}) for retry in database inbox at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); envelope.Status = EnvelopeStatus.Scheduled; envelope.OwnerId = TransportConstants.AnyNode; diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 254841797..a6d70dd7b 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -169,13 +169,19 @@ public async Task FlushOutgoingMessagesAsync() { if (envelope.Sender!.SupportsNativeScheduledSend) { + Runtime.Logger.LogDebug("Sending scheduled envelope {EnvelopeId} ({MessageType}) via native scheduled send to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); await sendEnvelopeAsync(envelope); } else { + Runtime.Logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) for in-memory execution (non-durable, no native scheduling) to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope); } } + else + { + Runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) is scheduled with durable sender to {Destination}, relying on durable inbox scheduling", envelope.Id, envelope.MessageType, envelope.Destination); + } // If NullMessageStore, then we're calling a different Send method that is marking the message if (Runtime.Storage is not NullMessageStore) @@ -291,10 +297,12 @@ public async Task ReScheduleAsync(DateTimeOffset scheduledTime) Envelope.ScheduledTime = scheduledTime; if (tryGetRescheduler(_channel, Envelope) is ISupportNativeScheduling c) { + Runtime.Logger.LogDebug("Rescheduling envelope {EnvelopeId} ({MessageType}) via native scheduling to {ScheduledTime}", Envelope.Id, Envelope.MessageType, scheduledTime); await c.MoveToScheduledUntilAsync(Envelope, Envelope.ScheduledTime.Value); } else { + Runtime.Logger.LogDebug("Rescheduling envelope {EnvelopeId} ({MessageType}) via durable inbox to {ScheduledTime}", Envelope.Id, Envelope.MessageType, scheduledTime); await Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(Envelope); } } @@ -673,6 +681,7 @@ private async Task flushScheduledMessagesAsync() { foreach (var envelope in Scheduled) { + Runtime.Logger.LogDebug("Flushing scheduled envelope {EnvelopeId} ({MessageType}) to in-memory execution (NullMessageStore)", envelope.Id, envelope.MessageType); Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope); } } @@ -680,6 +689,7 @@ private async Task flushScheduledMessagesAsync() { foreach (var envelope in Scheduled) { + Runtime.Logger.LogDebug("Flushing scheduled envelope {EnvelopeId} ({MessageType}) to durable inbox for retry scheduling", envelope.Id, envelope.MessageType); await Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(envelope); } } diff --git a/src/Wolverine/Runtime/Routing/MessageRoute.cs b/src/Wolverine/Runtime/Routing/MessageRoute.cs index 16c6707b3..601855552 100644 --- a/src/Wolverine/Runtime/Routing/MessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/MessageRoute.cs @@ -2,6 +2,7 @@ using ImTools; using JasperFx.Core; using JasperFx.Core.Reflection; +using Microsoft.Extensions.Logging; using Wolverine.Attributes; using Wolverine.Configuration; using Wolverine.Runtime.Partitioning; @@ -120,11 +121,17 @@ public Envelope CreateForSending(object message, DeliveryOptions? options, ISend { envelope.Status = EnvelopeStatus.Scheduled; envelope.OwnerId = TransportConstants.AnyNode; + runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) marked as Scheduled for local execution at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); } else if (!Sender.SupportsNativeScheduledSend) { + runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) wrapped for durable scheduled send to {Destination} (transport does not support native scheduling)", envelope.Id, envelope.MessageType, envelope.Destination); return envelope.ForScheduledSend(localDurableQueue); } + else + { + runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) scheduled via native transport scheduling to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); + } } else { diff --git a/src/Wolverine/Runtime/Scheduled/InMemoryScheduledJobProcessor.cs b/src/Wolverine/Runtime/Scheduled/InMemoryScheduledJobProcessor.cs index 40f5bf485..1dc045f60 100644 --- a/src/Wolverine/Runtime/Scheduled/InMemoryScheduledJobProcessor.cs +++ b/src/Wolverine/Runtime/Scheduled/InMemoryScheduledJobProcessor.cs @@ -1,4 +1,6 @@ using JasperFx.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Wolverine.Runtime.WorkerQueues; namespace Wolverine.Runtime.Scheduled; @@ -8,14 +10,17 @@ public class InMemoryScheduledJobProcessor : IScheduledJobProcessor private readonly Cache _outstandingJobs = new(); private readonly ILocalQueue _queue; + private readonly ILogger _logger; - public InMemoryScheduledJobProcessor(ILocalQueue queue) + public InMemoryScheduledJobProcessor(ILocalQueue queue, ILogger? logger = null) { _queue = queue; + _logger = logger ?? NullLogger.Instance; } public void Enqueue(DateTimeOffset executionTime, Envelope envelope) { + _logger.LogDebug("Enqueuing envelope {EnvelopeId} ({MessageType}) for in-memory scheduled execution at {ExecutionTime}", envelope.Id, envelope.MessageType, executionTime); _outstandingJobs[envelope.Id] = new InMemoryScheduledJob(this, envelope, executionTime); } @@ -90,10 +95,12 @@ public InMemoryScheduledJob(InMemoryScheduledJobProcessor parent, Envelope envel var delayTime = ExecutionTime.Subtract(DateTimeOffset.UtcNow); if (delayTime <= TimeSpan.Zero) { + _parent._logger.LogDebug("Scheduled envelope {EnvelopeId} ({MessageType}) firing immediately (execution time already passed)", envelope.Id, envelope.MessageType); _task = Task.Run(() => publish()); } else { + _parent._logger.LogDebug("Scheduled envelope {EnvelopeId} ({MessageType}) will fire after delay of {DelayTime}", envelope.Id, envelope.MessageType, delayTime); _task = Task.Delay(delayTime, _cancellation.Token).ContinueWith(_ => publish(), TaskScheduler.Default); } @@ -137,6 +144,7 @@ public ScheduledJob ToReport() public void Enqueue() { + _parent._logger.LogDebug("In-memory scheduled job firing, enqueuing envelope {EnvelopeId} ({MessageType}) to local queue", Envelope.Id, Envelope.MessageType); _parent._queue.Enqueue(Envelope); Cancel(); } diff --git a/src/Wolverine/Runtime/Scheduled/ScheduledSendEnvelopeHandler.cs b/src/Wolverine/Runtime/Scheduled/ScheduledSendEnvelopeHandler.cs index b13402304..0b2bc878f 100644 --- a/src/Wolverine/Runtime/Scheduled/ScheduledSendEnvelopeHandler.cs +++ b/src/Wolverine/Runtime/Scheduled/ScheduledSendEnvelopeHandler.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.Logging; using Wolverine.Runtime.Handlers; namespace Wolverine.Runtime.Scheduled; @@ -17,6 +18,9 @@ public override Task HandleAsync(MessageContext context, CancellationToken cance } var scheduled = (Envelope)context.Envelope!.Message!; + + context.Runtime.Logger.LogDebug("Forwarding previously scheduled envelope {EnvelopeId} ({MessageType}) for execution to {Destination}", scheduled.Id, scheduled.MessageType, scheduled.Destination); + scheduled.Source = context.Runtime.Options.ServiceName; scheduled.ScheduledTime = null; scheduled.Status = EnvelopeStatus.Outgoing; diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index c5cd5450c..35e87b2cb 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -236,7 +236,7 @@ public async Task StopAsync(CancellationToken cancellationToken) private void startInMemoryScheduledJobs() { ScheduledJobs = - new InMemoryScheduledJobProcessor((ILocalQueue)Endpoints.AgentForLocalQueue(TransportConstants.Replies)); + new InMemoryScheduledJobProcessor((ILocalQueue)Endpoints.AgentForLocalQueue(TransportConstants.Replies), Logger); // Bit of a hack, but it's necessary. Came up in compliance tests if (Storage is NullMessageStore p) diff --git a/src/Wolverine/Runtime/WolverineRuntime.cs b/src/Wolverine/Runtime/WolverineRuntime.cs index 37a25b3a3..d73bfeebf 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.cs @@ -197,6 +197,7 @@ public void ScheduleLocalExecutionInMemory(DateTimeOffset executionTime, Envelop throw new InvalidOperationException( $"This action is invalid when {nameof(WolverineOptions)}.{nameof(WolverineOptions.Durability)}.{nameof(DurabilitySettings.Mode)} = {Options.Durability.Mode}"); + Logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) for in-memory execution at {ExecutionTime}", envelope.Id, envelope.MessageType, executionTime); MessageTracking.Sent(envelope); ScheduledJobs.Enqueue(executionTime, envelope); } diff --git a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs index a4eaa0dd4..6112976de 100644 --- a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs @@ -35,7 +35,7 @@ public BufferedReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPi _settings = runtime.DurabilitySettings; Pipeline = pipeline; - _scheduler = new InMemoryScheduledJobProcessor(this); + _scheduler = new InMemoryScheduledJobProcessor(this, _logger); _deferBlock = new RetryBlock((env, _) => env.Listener!.DeferAsync(env).AsTask(), runtime.Logger, runtime.Cancellation); @@ -226,6 +226,7 @@ public Task MoveToErrorsAsync(Envelope envelope, Exception exception) Task ISupportNativeScheduling.MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) { + _logger.LogDebug("Moving envelope {EnvelopeId} ({MessageType}) to scheduled status in buffered receiver until {ScheduledTime}", envelope.Id, envelope.MessageType, time); envelope.ScheduledTime = time; ScheduleExecution(envelope); @@ -240,6 +241,7 @@ public void ScheduleExecution(Envelope envelope) $"There is no {nameof(Envelope.ScheduledTime)} value"); } + _logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) execution via in-memory scheduler at {ExecutionTime}", envelope.Id, envelope.MessageType, envelope.ScheduledTime.Value); _scheduler.Enqueue(envelope.ScheduledTime.Value, envelope); } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 07a23641d..419c261d0 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -317,6 +317,7 @@ public Task MoveToErrorsAsync(Envelope envelope, Exception exception) public Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) { + _logger.LogDebug("Moving envelope {EnvelopeId} ({MessageType}) to scheduled status until {ScheduledTime} in durable receiver", envelope.Id, envelope.MessageType, time); envelope.OwnerId = TransportConstants.AnyNode; envelope.ScheduledTime = time; envelope.Status = EnvelopeStatus.Scheduled;