Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/tutorials/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,46 @@ The idempotency check and the process of marking an incoming envelope are themse
to avoid Wolverine from making unnecessary database calls. ~~~~
:::

## Idempotency on Non Transactional Handlers

::: tip
Idempotency checks are automatic for any message handler that uses any kind of
transactional middleware.
:::

::: warning
This functionality does require some kind of message persistence to be configured for your application as it utilizes
Wolverine's inbox functionality
:::

Every usage you've seen so far has featured utilizing Wolverine's transactional middleware support on handlers that
use [EF Core](/guide/durability/efcore/transactional-middleware) or [Marten](/guide/durability/marten/transactional-middleware).

But of course, you may have message handlers that don't need to touch your underlying storage at all. For example, a message
handler might do nothing but call an external web service. You may want to make this message handler be idempotent to protect
against duplicated calls to that web service. You're in luck, because Wolverine exposes this policy to do exactly that:

snippet: sample_using_AutoApplyIdempotencyOnNonTransactionalHandlers

Specifically, see the call to `WolverineOptions.Policies.AutoApplyIdempotencyOnNonTransactionalHandlers()` above. What that
is doing is:

1. Inserting a call to assert that the current message doesn't already exist in your applications default envelope storage by
the Wolverine message id. If the message is already marked as `Handled` in the inbox, Wolverine will reject and discard the current
message processing
2. Assuming the message is all new, Wolverine will try to persist the `Handled` state in the default inbox storage. In the case
of failures to the database storage (stuff happens), Wolverine will attempt to retry out of band, but allow the message processing
to go through otherwise without triggering error policies so the message is not retried

::: tip
While we're talking about call outs to external web services, the Wolverine team recommends isolating the call to that web
service in its own handler with isolated error handling and maybe even a circuit breaker for outages of that service. Or at
least making that your default practice.
:::

You can also opt into this behavior on a message type by message type basis by decorating the
message handler type or handler method with the Wolverine `[Idempotent]` attribute.

## Handled Message Retention

The way that the idempotency checks work is to keep track of messages that have already been processed in the persisted
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using IntegrationTests;
using JasperFx;
using JasperFx.CodeGeneration.Frames;
using JasperFx.Resources;
using Marten;
using Microsoft.Data.SqlClient;
Expand All @@ -15,6 +16,7 @@
using Wolverine.EntityFrameworkCore;
using Wolverine.Marten;
using Wolverine.Persistence;
using Wolverine.Runtime;
using Wolverine.SqlServer;
using Wolverine.Tracking;

Expand Down Expand Up @@ -151,9 +153,60 @@ public async Task happy_and_sad_path_with_message_and_destination_tracking(Idemp

tracked2.Discarded.SingleEnvelope<MaybeIdempotent>().ShouldNotBeNull();
}

[Fact]
public async Task apply_idempotency_to_non_transactional_handler()
{
#region sample_using_AutoApplyIdempotencyOnNonTransactionalHandlers

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddDbContextWithWolverineIntegration<CleanDbContext>(x =>
x.UseSqlServer(Servers.SqlServerConnectionString));

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);

opts.Policies.AutoApplyTransactions(IdempotencyStyle.Eager);

opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "idempotency");
opts.UseEntityFrameworkCoreTransactions();

// THIS RIGHT HERE
opts.Policies.AutoApplyIdempotencyOnNonTransactionalHandlers();
}).StartAsync();

#endregion

var chain = host.GetRuntime().Handlers.ChainFor<MaybeIdempotentNotTransactional>();
chain.IsTransactional.ShouldBeFalse();
chain.Middleware.OfType<MethodCall>().Any(x => x.Method.Name == nameof(MessageContext.AssertEagerIdempotencyAsync)).ShouldBeTrue();
chain.Postprocessors.OfType<MethodCall>().Any(x => x.Method.Name == nameof(MessageContext.PersistHandledAsync)).ShouldBeTrue();

var messageId = Guid.NewGuid();
var tracked1 = await host.SendMessageAndWaitAsync(new MaybeIdempotentNotTransactional(messageId));

// First time through should be perfectly fine
var sentMessage = tracked1.Executed.SingleEnvelope<MaybeIdempotentNotTransactional>();

var runtime = host.GetRuntime();
var circuit = runtime.Endpoints.FindListenerCircuit(sentMessage.Destination);

var tracked2 = await host.TrackActivity()
.DoNotAssertOnExceptionsDetected()
.ExecuteAndWaitAsync(c =>
{
sentMessage.WasPersistedInInbox = false;
sentMessage.Attempts = 0;
return circuit.EnqueueDirectlyAsync([sentMessage]);
});

tracked2.Discarded.SingleEnvelope<MaybeIdempotentNotTransactional>().ShouldNotBeNull();
}
}

public record MaybeIdempotent(Guid Id);
public record MaybeIdempotentNotTransactional(Guid Id);

public static class MaybeIdempotentHandler
{
Expand All @@ -166,4 +219,9 @@ public static void Handle(MaybeIdempotent message, CleanDbContext dbContext)
{
// Nothing
}

public static void Handle(MaybeIdempotentNotTransactional message)
{
// Nothing
}
}
13 changes: 13 additions & 0 deletions src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,19 @@ public override DbCommandBuilder ToCommandBuilder()
return new DbCommandBuilder(new NpgsqlCommand());
}

public override async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
if (HasDisposed) return false;

await using var conn = await NpgsqlDataSource.OpenConnectionAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = :id")
.With("id", envelope.Id)
.ExecuteScalarAsync(cancellation);

return ((long)count) > 0;
}

public override void WriteLoadScheduledEnvelopeSql(DbCommandBuilder builder, DateTimeOffset utcNow)
{
builder.Append(
Expand Down
3 changes: 3 additions & 0 deletions src/Persistence/Wolverine.RDBMS/MessageDatabase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Data;
using System.Data.Common;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Descriptors;
using Microsoft.Extensions.Logging;
using Weasel.Core;
Expand Down Expand Up @@ -222,6 +223,8 @@ public async ValueTask DisposeAsync()

public abstract DbCommandBuilder ToCommandBuilder();

public abstract Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation);

public async Task ReleaseIncomingAsync(int ownerId, Uri receivedAt)
{
if (HasDisposed) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
await session.SaveChangesAsync();
}

public async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
using var session = _store.OpenAsyncSession();
var identity = IdentityFor(envelope);
return (await session.LoadAsync<IncomingMessage>(identity) == null);
}

public Task RescheduleExistingEnvelopeForRetryAsync(Envelope envelope)
{
envelope.Status = EnvelopeStatus.Scheduled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@ public override DbCommandBuilder ToCommandBuilder()
return new DbCommandBuilder(new SqlCommand());
}

public override async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
if (HasDisposed) return false;

await using var conn = CreateConnection();
await conn.OpenAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = @id")
.With("id", envelope.Id)
.ExecuteScalarAsync(cancellation);

return ((int)count) > 0;
}

public override void WriteLoadScheduledEnvelopeSql(DbCommandBuilder builder, DateTimeOffset utcNow)
{
builder.Append( $"select TOP {Durability.RecoveryBatchSize} {DatabaseConstants.IncomingFields} from {SchemaName}.{DatabaseConstants.IncomingTable} where status = '{EnvelopeStatus.Scheduled}' and execution_time <= ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public async Task use_transactional_policies_to_eager()
await host.InvokeAsync(new TM4(Guid.NewGuid()));

var runtime = host.GetRuntime();

// Just seeing that this caught
runtime.Handlers.ChainFor<DoSomething>().IsTransactional.ShouldBeTrue();

runtime.Handlers.ChainFor<DoSomething>().Idempotency.ShouldBe(IdempotencyStyle.Eager);

// Override by transactional attribute!
Expand Down
7 changes: 7 additions & 0 deletions src/Testing/CoreTests/Runtime/Handlers/HandlerChainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ public void the_default_log_level_is_information()
chain.SuccessLogLevel.ShouldBe(LogLevel.Information);
}

[Fact]
public void is_transactional_is_false_by_default()
{
var chain = HandlerChain.For<Target>(x => x.Go(null), null);
chain.IsTransactional.ShouldBeFalse();
}

[Fact]
public void default_idempotency_is_none()
{
Expand Down
14 changes: 14 additions & 0 deletions src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ public async Task store_a_single_incoming_envelope()

stored.SentAt.ShouldBe(envelope.SentAt);
}

[Fact]
public async Task incoming_exists()
{
var envelope = ObjectMother.Envelope();
envelope.Status = EnvelopeStatus.Incoming;
envelope.SentAt = ((DateTimeOffset)DateTime.Today).ToUniversalTime();

(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeFalse();

await thePersistence.Inbox.StoreIncomingAsync(envelope);

(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeTrue();
}

[Fact]
public async Task store_a_single_incoming_envelope_that_is_handled()
Expand Down
18 changes: 18 additions & 0 deletions src/Wolverine/Attributes/IdempotentAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using JasperFx.CodeGeneration;
using Wolverine.Runtime.Handlers;

namespace Wolverine.Attributes;

/// <summary>
/// Adds idempotency checks to this message handler
///
/// ONLY use this for message handlers that do not use transactional
/// middleware
/// </summary>
public class IdempotentAttribute : ModifyHandlerChainAttribute
{
public override void Modify(HandlerChain chain, GenerationRules rules)
{
chain.ApplyIdempotencyCheck();
}
}
2 changes: 2 additions & 0 deletions src/Wolverine/Attributes/TransactionalAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public override void Modify(IChain chain, GenerationRules rules, IServiceContain
chain.ApplyImpliedMiddlewareFromHandlers(rules);
var transactionFrameProvider = rules.As<GenerationRules>().GetPersistenceProviders(chain, container);
transactionFrameProvider.ApplyTransactionSupport(chain, container);

chain.IsTransactional = true;
}

public IdempotencyStyle? Idempotency { get; set; }
Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/Configuration/Chain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public abstract class Chain<TChain, TModifyAttribute> : IChain

public abstract bool TryInferMessageIdentity(out PropertyInfo? property);

public bool IsTransactional { get; set; }
public abstract bool ShouldFlushOutgoingMessages();
public abstract bool RequiresOutbox();

Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/Configuration/IChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public interface IChain
/// </summary>
IReturnVariableActionSource ReturnVariableActionSource { get; set; }

/// <summary>
/// Does this chain have any transactional middleware attached to it?
/// </summary>
bool IsTransactional { get; set; }

/// <summary>
/// Used internally by Wolverine for "outbox" mechanics
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions src/Wolverine/IPolicies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ public interface IPolicies : IEnumerable<IWolverinePolicy>, IWithFailurePolicies
/// <param name="idempotency">Define a default IdempotencyStyle for message handlers executing in Buffered or Inline endpoints</param>
void AutoApplyTransactions(IdempotencyStyle idempotency);

/// <summary>
/// Apply eager message idempotency checks to any message handler chains that are not otherwise transactional.
/// Example is a handler that calls an external web service but does not make any changes to the current system's
/// databases or storage
/// </summary>
void AutoApplyIdempotencyOnNonTransactionalHandlers();

/// <summary>
/// Add Wolverine middleware to message handlers
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/Persistence/AutoApplyTransactions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void Apply(IReadOnlyList<IChain> chains, GenerationRules rules, IServiceC
if (potentials.Length == 1)
{
potentials.Single().ApplyTransactionSupport(chain, container);
chain.IsTransactional = true;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ public async Task ReleaseIncomingAsync(int ownerId, Uri receivedAt)
await database.Inbox.ReleaseIncomingAsync(ownerId, receivedAt);
}
}

public Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
return _inner.ExistsAsync(envelope, cancellation);
}
}
1 change: 1 addition & 0 deletions src/Wolverine/Persistence/Durability/IMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface IMessageInbox
Task StoreIncomingAsync(Envelope envelope);
Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes);

Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation);

Task MarkIncomingEnvelopeAsHandledAsync(Envelope envelope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ async Task IMessageInbox.StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
}
}

public async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
var database = await GetDatabaseAsync(envelope.TenantId);
return await database.Inbox.ExistsAsync(envelope, cancellation);
}

async Task IMessageInbox.RescheduleExistingEnvelopeForRetryAsync(Envelope envelope)
{
var database = await GetDatabaseAsync(envelope.TenantId);
Expand Down
4 changes: 4 additions & 0 deletions src/Wolverine/Persistence/Durability/NullMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public class NullMessageStore : IMessageStore, IMessageInbox, IMessageOutbox, IM
{
internal IScheduledJobProcessor? ScheduledJobs { get; set; }

public Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
return Task.FromResult(false);
}

public MessageStoreRole Role => MessageStoreRole.Main;
public Uri Uri => new Uri($"{PersistenceConstants.AgentScheme}://null");
Expand Down
14 changes: 14 additions & 0 deletions src/Wolverine/Persistence/Durability/PersistHandled.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;

namespace Wolverine.Persistence.Durability;

public class PersistHandled(Envelope Handled) : IAgentCommand
{
public async Task<AgentCommands> ExecuteAsync(IWolverineRuntime runtime, CancellationToken cancellationToken)
{
await runtime.Storage.Inbox.StoreIncomingAsync(Handled);

return AgentCommands.Empty;
}
}
Loading
Loading