Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Weasel.Core;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.Transports;

namespace Wolverine.RDBMS;
Expand All @@ -9,6 +10,7 @@ public abstract partial class MessageDatabase<T>

public Task ScheduleExecutionAsync(Envelope envelope)
{
Logger.LogDebug("Persisting envelope {EnvelopeId} ({MessageType}) as Scheduled in database inbox at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
return CreateCommand(
$"update {SchemaName}.{DatabaseConstants.IncomingTable} set execution_time = @time, status = \'{EnvelopeStatus.Scheduled}\', attempts = @attempts, owner_id = {TransportConstants.AnyNode} where id = @id and {DatabaseConstants.ReceivedAt} = @uri;")
.With("time", envelope.ScheduledTime!.Value)
Expand All @@ -20,6 +22,7 @@ public Task ScheduleExecutionAsync(Envelope envelope)

public Task RescheduleExistingEnvelopeForRetryAsync(Envelope envelope)
{
Logger.LogDebug("Rescheduling envelope {EnvelopeId} ({MessageType}) for retry in database inbox at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
envelope.Status = EnvelopeStatus.Scheduled;
envelope.OwnerId = TransportConstants.AnyNode;

Expand Down
10 changes: 10 additions & 0 deletions src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ public async Task FlushOutgoingMessagesAsync()
{
if (envelope.Sender!.SupportsNativeScheduledSend)
{
Runtime.Logger.LogDebug("Sending scheduled envelope {EnvelopeId} ({MessageType}) via native scheduled send to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
await sendEnvelopeAsync(envelope);
}
else
{
Runtime.Logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) for in-memory execution (non-durable, no native scheduling) to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
}
}
else
{
Runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) is scheduled with durable sender to {Destination}, relying on durable inbox scheduling", envelope.Id, envelope.MessageType, envelope.Destination);
}

// If NullMessageStore, then we're calling a different Send method that is marking the message
if (Runtime.Storage is not NullMessageStore)
Expand Down Expand Up @@ -291,10 +297,12 @@ public async Task ReScheduleAsync(DateTimeOffset scheduledTime)
Envelope.ScheduledTime = scheduledTime;
if (tryGetRescheduler(_channel, Envelope) is ISupportNativeScheduling c)
{
Runtime.Logger.LogDebug("Rescheduling envelope {EnvelopeId} ({MessageType}) via native scheduling to {ScheduledTime}", Envelope.Id, Envelope.MessageType, scheduledTime);
await c.MoveToScheduledUntilAsync(Envelope, Envelope.ScheduledTime.Value);
}
else
{
Runtime.Logger.LogDebug("Rescheduling envelope {EnvelopeId} ({MessageType}) via durable inbox to {ScheduledTime}", Envelope.Id, Envelope.MessageType, scheduledTime);
await Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(Envelope);
}
}
Expand Down Expand Up @@ -673,13 +681,15 @@ private async Task flushScheduledMessagesAsync()
{
foreach (var envelope in Scheduled)
{
Runtime.Logger.LogDebug("Flushing scheduled envelope {EnvelopeId} ({MessageType}) to in-memory execution (NullMessageStore)", envelope.Id, envelope.MessageType);
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
}
}
else
{
foreach (var envelope in Scheduled)
{
Runtime.Logger.LogDebug("Flushing scheduled envelope {EnvelopeId} ({MessageType}) to durable inbox for retry scheduling", envelope.Id, envelope.MessageType);
await Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(envelope);
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/Wolverine/Runtime/Routing/MessageRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using ImTools;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Logging;
using Wolverine.Attributes;
using Wolverine.Configuration;
using Wolverine.Runtime.Partitioning;
Expand Down Expand Up @@ -120,11 +121,17 @@ public Envelope CreateForSending(object message, DeliveryOptions? options, ISend
{
envelope.Status = EnvelopeStatus.Scheduled;
envelope.OwnerId = TransportConstants.AnyNode;
runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) marked as Scheduled for local execution at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
}
else if (!Sender.SupportsNativeScheduledSend)
{
runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) wrapped for durable scheduled send to {Destination} (transport does not support native scheduling)", envelope.Id, envelope.MessageType, envelope.Destination);
return envelope.ForScheduledSend(localDurableQueue);
}
else
{
runtime.Logger.LogDebug("Envelope {EnvelopeId} ({MessageType}) scheduled via native transport scheduling to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Wolverine.Runtime.WorkerQueues;

namespace Wolverine.Runtime.Scheduled;
Expand All @@ -8,14 +10,17 @@ public class InMemoryScheduledJobProcessor : IScheduledJobProcessor
private readonly Cache<Guid, InMemoryScheduledJob> _outstandingJobs = new();

private readonly ILocalQueue _queue;
private readonly ILogger _logger;

public InMemoryScheduledJobProcessor(ILocalQueue queue)
public InMemoryScheduledJobProcessor(ILocalQueue queue, ILogger? logger = null)
{
_queue = queue;
_logger = logger ?? NullLogger.Instance;
}

public void Enqueue(DateTimeOffset executionTime, Envelope envelope)
{
_logger.LogDebug("Enqueuing envelope {EnvelopeId} ({MessageType}) for in-memory scheduled execution at {ExecutionTime}", envelope.Id, envelope.MessageType, executionTime);
_outstandingJobs[envelope.Id] = new InMemoryScheduledJob(this, envelope, executionTime);
}

Expand Down Expand Up @@ -90,10 +95,12 @@ public InMemoryScheduledJob(InMemoryScheduledJobProcessor parent, Envelope envel
var delayTime = ExecutionTime.Subtract(DateTimeOffset.UtcNow);
if (delayTime <= TimeSpan.Zero)
{
_parent._logger.LogDebug("Scheduled envelope {EnvelopeId} ({MessageType}) firing immediately (execution time already passed)", envelope.Id, envelope.MessageType);
_task = Task.Run(() => publish());
}
else
{
_parent._logger.LogDebug("Scheduled envelope {EnvelopeId} ({MessageType}) will fire after delay of {DelayTime}", envelope.Id, envelope.MessageType, delayTime);
_task = Task.Delay(delayTime, _cancellation.Token).ContinueWith(_ => publish(), TaskScheduler.Default);
}

Expand Down Expand Up @@ -137,6 +144,7 @@ public ScheduledJob ToReport()

public void Enqueue()
{
_parent._logger.LogDebug("In-memory scheduled job firing, enqueuing envelope {EnvelopeId} ({MessageType}) to local queue", Envelope.Id, Envelope.MessageType);
_parent._queue.Enqueue(Envelope);
Cancel();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.Extensions.Logging;
using Wolverine.Runtime.Handlers;

namespace Wolverine.Runtime.Scheduled;
Expand All @@ -17,6 +18,9 @@ public override Task HandleAsync(MessageContext context, CancellationToken cance
}

var scheduled = (Envelope)context.Envelope!.Message!;

context.Runtime.Logger.LogDebug("Forwarding previously scheduled envelope {EnvelopeId} ({MessageType}) for execution to {Destination}", scheduled.Id, scheduled.MessageType, scheduled.Destination);

scheduled.Source = context.Runtime.Options.ServiceName;
scheduled.ScheduledTime = null;
scheduled.Status = EnvelopeStatus.Outgoing;
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
private void startInMemoryScheduledJobs()
{
ScheduledJobs =
new InMemoryScheduledJobProcessor((ILocalQueue)Endpoints.AgentForLocalQueue(TransportConstants.Replies));
new InMemoryScheduledJobProcessor((ILocalQueue)Endpoints.AgentForLocalQueue(TransportConstants.Replies), Logger);

// Bit of a hack, but it's necessary. Came up in compliance tests
if (Storage is NullMessageStore p)
Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/Runtime/WolverineRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void ScheduleLocalExecutionInMemory(DateTimeOffset executionTime, Envelop
throw new InvalidOperationException(
$"This action is invalid when {nameof(WolverineOptions)}.{nameof(WolverineOptions.Durability)}.{nameof(DurabilitySettings.Mode)} = {Options.Durability.Mode}");

Logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) for in-memory execution at {ExecutionTime}", envelope.Id, envelope.MessageType, executionTime);
MessageTracking.Sent(envelope);
ScheduledJobs.Enqueue(executionTime, envelope);
}
Expand Down
4 changes: 3 additions & 1 deletion src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
_settings = runtime.DurabilitySettings;
Pipeline = pipeline;

_scheduler = new InMemoryScheduledJobProcessor(this);
_scheduler = new InMemoryScheduledJobProcessor(this, _logger);

_deferBlock = new RetryBlock<Envelope>((env, _) => env.Listener!.DeferAsync(env).AsTask(), runtime.Logger,
runtime.Cancellation);
Expand Down Expand Up @@ -108,7 +108,7 @@
}
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; }

Expand Down Expand Up @@ -226,6 +226,7 @@

Task ISupportNativeScheduling.MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time)
{
_logger.LogDebug("Moving envelope {EnvelopeId} ({MessageType}) to scheduled status in buffered receiver until {ScheduledTime}", envelope.Id, envelope.MessageType, time);
envelope.ScheduledTime = time;
ScheduleExecution(envelope);

Expand All @@ -240,6 +241,7 @@
$"There is no {nameof(Envelope.ScheduledTime)} value");
}

_logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) execution via in-memory scheduler at {ExecutionTime}", envelope.Id, envelope.MessageType, envelope.ScheduledTime.Value);
_scheduler.Enqueue(envelope.ScheduledTime.Value, envelope);
}
}
1 change: 1 addition & 0 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
await EnqueueAsync(envelope);
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; set; }

Expand Down Expand Up @@ -317,6 +317,7 @@

public Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time)
{
_logger.LogDebug("Moving envelope {EnvelopeId} ({MessageType}) to scheduled status until {ScheduledTime} in durable receiver", envelope.Id, envelope.MessageType, time);
envelope.OwnerId = TransportConstants.AnyNode;
envelope.ScheduledTime = time;
envelope.Status = EnvelopeStatus.Scheduled;
Expand Down
Loading