From f488f3a68f923c474a55f77cd4871a3f46e605a9 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 9 Mar 2026 19:25:46 -0500 Subject: [PATCH 1/5] Add EditAndReplayAsync to IDeadLetters for editing DLQ message bodies before replay Supports updating the serialized message body inside a dead letter envelope and marking it as replayable in a single operation. Implemented across all persistence providers (RDBMS, RavenDb, CosmosDb) with no-op stubs for NullMessageStore and MultiTenantedMessageStore. Co-Authored-By: Claude Opus 4.6 --- .../CosmosDbMessageStore.DeadLetters.cs | 23 +++++++++++++++++++ .../MessageDatabase.DeadLetterAdminService.cs | 23 +++++++++++++++++++ .../RavenDbMessageStore.DeadLetters.cs | 15 ++++++++++++ .../Persistence/Durability/IDeadLetters.cs | 9 ++++++++ .../Durability/MultiTenantedMessageStore.cs | 5 ++++ .../Durability/NullMessageStore.cs | 5 ++++ 6 files changed, 80 insertions(+) diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs index 4d0823a50..794187281 100644 --- a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs @@ -3,6 +3,7 @@ using Microsoft.Azure.Cosmos; using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Runtime.Serialization; namespace Wolverine.CosmosDb.Internals; @@ -301,6 +302,28 @@ private async Task> LoadDeadLettersByQuery(DeadLetterEnv return messages; } + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + try + { + var response = await _container.ReadItemAsync(DlqId(envelopeId), + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + var message = response.Resource; + + // Deserialize the stored envelope, replace its Data with the new message body, re-serialize + var envelope = EnvelopeSerializer.Deserialize(message.Body); + envelope.Data = newBody; + message.Body = EnvelopeSerializer.Serialize(envelope); + message.Replayable = true; + await _container.ReplaceItemAsync(message, message.Id, + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + private QueryDefinition RebuildQueryDef(string queryText, DeadLetterEnvelopeQuery query) { var queryDef = new QueryDefinition(queryText) diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs index 280e8e868..a2c106abe 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs @@ -4,6 +4,7 @@ using Weasel.Core; using Wolverine.Persistence.Durability.DeadLetterManagement; using Wolverine.RDBMS.Durability; +using Wolverine.Runtime.Serialization; namespace Wolverine.RDBMS; @@ -186,4 +187,26 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) return executeCommandBatch(builder, token); } + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + // Read the current serialized envelope, replace its Data, and re-serialize + var deadLetter = await DeadLetterEnvelopeByIdAsync(envelopeId); + if (deadLetter == null) return; + + deadLetter.Envelope.Data = newBody; + var serialized = EnvelopeSerializer.Serialize(deadLetter.Envelope); + + var builder = ToCommandBuilder(); + builder.Append( + $"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Body} = "); + builder.AppendParameter(serialized); + builder.Append($", {DatabaseConstants.Replayable} = "); + builder.AppendParameter(true); + builder.Append($" where {DatabaseConstants.Id} = "); + builder.AppendParameter(envelopeId); + builder.Append(';'); + + await executeCommandBatch(builder, token); + } + } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs index ad599b58a..d9bd672c0 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs @@ -5,6 +5,7 @@ using Raven.Client.Documents.Queries; using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Runtime.Serialization; namespace Wolverine.RavenDb.Internals; @@ -249,4 +250,18 @@ public async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? t { await ReplayAsync(new DeadLetterEnvelopeQuery { MessageIds = ids }, CancellationToken.None); } + + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + using var session = _store.OpenAsyncSession(); + var message = await session.LoadAsync(dlqId(envelopeId), token); + if (message is null) return; + + // Deserialize the stored envelope, replace its Data with the new message body, re-serialize + var envelope = EnvelopeSerializer.Deserialize(message.Body); + envelope.Data = newBody; + message.Body = EnvelopeSerializer.Serialize(envelope); + message.Replayable = true; + await session.SaveChangesAsync(token); + } } diff --git a/src/Wolverine/Persistence/Durability/IDeadLetters.cs b/src/Wolverine/Persistence/Durability/IDeadLetters.cs index 245e604f2..ffa6ef828 100644 --- a/src/Wolverine/Persistence/Durability/IDeadLetters.cs +++ b/src/Wolverine/Persistence/Durability/IDeadLetters.cs @@ -71,4 +71,13 @@ Task> SummarizeAllAsync(string serviceName, /// /// Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token); + + /// + /// Updates the body of a dead letter envelope and marks it as replayable + /// + /// The envelope to edit + /// The new serialized message body + /// + /// + Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token); } \ No newline at end of file diff --git a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs index f97433699..cfaaaebfd 100644 --- a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs @@ -74,6 +74,11 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) throw new NotSupportedException(); } + public Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + throw new NotSupportedException(); + } + public async Task DeadLetterEnvelopeByIdAsync(Guid id, string? tenantId = null) { if (tenantId is not null) diff --git a/src/Wolverine/Persistence/Durability/NullMessageStore.cs b/src/Wolverine/Persistence/Durability/NullMessageStore.cs index 153abf4f0..1dce8a885 100644 --- a/src/Wolverine/Persistence/Durability/NullMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/NullMessageStore.cs @@ -234,6 +234,11 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) return Task.CompletedTask; } + public Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + return Task.CompletedTask; + } + public Task RebuildAsync() { return Task.CompletedTask; From c838aa488da919a15e4960243886853be9ed3611 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 9 Mar 2026 20:44:51 -0500 Subject: [PATCH 2/5] Dispose IHost instances after StopAsync in all test fixtures StopAsync only stops background services but does not dispose the DI container, so NpgsqlDataSource and other resources leak their connection pools. This fixes 58 test files across all test projects. Co-Authored-By: Claude Opus 4.6 --- .../Marten/reacting_to_read_aggregate.cs | 1 + .../Persistence/global_entity_defaults_http.cs | 1 + .../Persistence/reacting_to_entity_attributes.cs | 1 + .../Persistence/reacting_to_required_attribute.cs | 6 +++++- src/Testing/BackPressureTests/Harness.cs | 2 ++ src/Testing/Benchmarks/Driver.cs | 1 + src/Testing/CoreTests/Acceptance/batch_processing.cs | 1 + src/Testing/CoreTests/Acceptance/multi_tenancy.cs | 1 + src/Testing/CoreTests/Acceptance/remote_invocation.cs | 3 +++ .../CoreTests/Acceptance/using_with_keyed_services.cs | 10 ++++++---- .../CoreTests/Bugs/Bug_2056_saga_code_generation.cs | 1 + .../Bugs/Bug_2073_completed_saga_still_persisted.cs | 1 + .../custom_error_action_raises_new_message.cs | 4 ++++ .../CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs | 2 ++ ...is_tracked_for_published_message_without_handler.cs | 6 +++++- src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs | 5 +++-- .../SlowTests/invoke_async_with_delivery_options.cs | 2 ++ .../StorageActionCompliance.cs | 1 + .../send_to_topic_and_receive_in_queue.cs | 1 + .../send_to_topic_and_receive_in_queue_in_aws.cs | 1 + ..._to_topic_and_receive_in_queue_with_cloud_events.cs | 1 + ...pic_and_receive_in_queue_with_rawMessageDelivery.cs | 3 ++- .../RawJson/receive_raw_json_as_buffered.cs | 2 ++ .../RawJson/receive_raw_json_as_inline.cs | 2 ++ .../AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs | 5 +++-- .../send_and_receive_with_CloudEvents.cs | 5 +++-- .../end_to_end_with_conventional_routing.cs | 2 ++ .../Wolverine.AzureServiceBus.Tests/end_to_end.cs | 1 + .../end_to_end_with_CloudEvents.cs | 1 + .../multi_tenancy_through_separate_namespaces.cs | 4 ++++ .../end_to_end_with_conventional_routing.cs | 2 ++ .../GCP/Wolverine.Pubsub.Tests/send_and_receive.cs | 2 ++ .../Wolverine.Kafka.Tests/broadcast_to_topic_async.cs | 2 ++ .../Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs | 2 ++ .../configure_consumers_and_publishers.cs | 1 + .../end_to_end_with_CloudEvents.cs | 2 ++ .../propagate_group_id_to_partition_key.cs | 1 + .../publish_and_receive_raw_json.cs | 2 ++ .../when_publishing_and_receiving_by_partition_key.cs | 2 ++ .../MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs | 2 ++ .../Wolverine.MQTT.Tests/broadcast_to_topic_async.cs | 2 ++ .../broadcast_to_topic_by_user_logic.cs | 2 ++ .../Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs | 2 ++ .../listen_with_emqx_shared_group_topic.cs | 2 ++ .../listen_with_topic_wildcards.cs | 2 ++ .../MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs | 4 ++++ .../Wolverine.Pulsar.Tests/PulsarListenerTests.cs | 2 ++ src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs | 2 ++ .../dynamic_object_creation_smoke_tests.cs | 1 + .../Wolverine.RabbitMQ.Tests/exclusive_listeners.cs | 7 ++++++- .../Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs | 4 +++- .../moving_unknown_message_type_to_dlq.cs | 2 ++ .../multi_tenancy_through_virtual_hosts.cs | 4 ++++ .../rate_limiting_end_to_end.cs | 2 ++ .../response_queue_disabling.cs | 6 +++++- .../response_queue_mechanics.cs | 1 + .../ResponseStreamMechanicsTests.cs | 2 ++ .../Wolverine.SignalR.Tests/WebSocketTestContext.cs | 4 ++++ 58 files changed, 130 insertions(+), 16 deletions(-) diff --git a/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs b/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs index 838b74dc0..61134de4e 100644 --- a/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs +++ b/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs @@ -44,6 +44,7 @@ async Task IAsyncLifetime.DisposeAsync() if (theHost != null) { await theHost.StopAsync(); + theHost.Dispose(); } } diff --git a/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs b/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs index 8aa03501b..7d1655ad8 100644 --- a/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs +++ b/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs @@ -46,6 +46,7 @@ async Task IAsyncLifetime.DisposeAsync() if (theHost != null) { await theHost.StopAsync(); + theHost.Dispose(); } } diff --git a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs index 9ea188c1a..a2d0f4ad9 100644 --- a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs +++ b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs @@ -52,6 +52,7 @@ async Task IAsyncLifetime.DisposeAsync() if (theHost != null) { await theHost.StopAsync(); + theHost.Dispose(); } } diff --git a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs index 22403e866..bc14e3913 100644 --- a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs +++ b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs @@ -46,7 +46,11 @@ public async Task InitializeAsync() async Task IAsyncLifetime.DisposeAsync() { - if (theHost != null) await theHost.StopAsync(); + if (theHost != null) + { + await theHost.StopAsync(); + theHost.Dispose(); + } } [Fact] diff --git a/src/Testing/BackPressureTests/Harness.cs b/src/Testing/BackPressureTests/Harness.cs index e176cacd4..fb87bbb8f 100644 --- a/src/Testing/BackPressureTests/Harness.cs +++ b/src/Testing/BackPressureTests/Harness.cs @@ -99,7 +99,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Testing/Benchmarks/Driver.cs b/src/Testing/Benchmarks/Driver.cs index f929eabf8..1a0666027 100644 --- a/src/Testing/Benchmarks/Driver.cs +++ b/src/Testing/Benchmarks/Driver.cs @@ -77,6 +77,7 @@ public async Task Teardown() if (Host != null) { await Host.StopAsync(); + Host.Dispose(); Host = null; } } diff --git a/src/Testing/CoreTests/Acceptance/batch_processing.cs b/src/Testing/CoreTests/Acceptance/batch_processing.cs index 204e930a7..69feab7b7 100644 --- a/src/Testing/CoreTests/Acceptance/batch_processing.cs +++ b/src/Testing/CoreTests/Acceptance/batch_processing.cs @@ -50,6 +50,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theHost.StopAsync(); + theHost.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Acceptance/multi_tenancy.cs b/src/Testing/CoreTests/Acceptance/multi_tenancy.cs index 7d644d7c6..cff08dbb1 100644 --- a/src/Testing/CoreTests/Acceptance/multi_tenancy.cs +++ b/src/Testing/CoreTests/Acceptance/multi_tenancy.cs @@ -29,6 +29,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Acceptance/remote_invocation.cs b/src/Testing/CoreTests/Acceptance/remote_invocation.cs index 33ee2f3eb..564940086 100644 --- a/src/Testing/CoreTests/Acceptance/remote_invocation.cs +++ b/src/Testing/CoreTests/Acceptance/remote_invocation.cs @@ -71,8 +71,11 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _receiver1.StopAsync(); + _receiver1.Dispose(); await _receiver2.StopAsync(); + _receiver2.Dispose(); await _sender.StopAsync(); + _sender.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs b/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs index 50870e44f..503d0d4ad 100644 --- a/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs +++ b/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs @@ -27,9 +27,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] @@ -82,9 +83,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs b/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs index 832ae7907..92234e7c0 100644 --- a/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs +++ b/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs @@ -30,6 +30,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs b/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs index abfa32239..a091ffd2d 100644 --- a/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs +++ b/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs @@ -26,6 +26,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs b/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs index 57dad115d..3c21897fb 100644 --- a/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs +++ b/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs @@ -52,7 +52,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theReceiver.StopAsync(); + theReceiver.Dispose(); await theSender.StopAsync(); + theSender.Dispose(); } [Fact] @@ -130,7 +132,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theReceiver.StopAsync(); + theReceiver.Dispose(); await theSender.StopAsync(); + theSender.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs b/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs index 2225003c4..d39e55f5e 100644 --- a/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs +++ b/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs @@ -37,7 +37,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theSender.StopAsync(); + theSender.Dispose(); await theReceiver.StopAsync(); + theReceiver.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs b/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs index a5d4f4d26..f44a177cf 100644 --- a/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs +++ b/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs @@ -111,5 +111,9 @@ public async Task should_apply_equally_when_tracked_across_multiple_hosts() } - public async Task DisposeAsync() => await _host.StopAsync(); + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } } diff --git a/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs b/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs index 14ae4282b..0c3e904e9 100644 --- a/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs +++ b/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs @@ -78,8 +78,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { - // await WebApi.DisposeAsync(); - // await FirstSubscriber.StopAsync(); + await WebApi.DisposeAsync(); + await FirstSubscriber.StopAsync(); + FirstSubscriber.Dispose(); await SecondSubscriber.StopAsync(); SecondSubscriber.Dispose(); } diff --git a/src/Testing/SlowTests/invoke_async_with_delivery_options.cs b/src/Testing/SlowTests/invoke_async_with_delivery_options.cs index f03ad8e31..c13b51949 100644 --- a/src/Testing/SlowTests/invoke_async_with_delivery_options.cs +++ b/src/Testing/SlowTests/invoke_async_with_delivery_options.cs @@ -39,7 +39,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _publisher.StopAsync(); + _publisher.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs b/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs index a905e5988..689498a48 100644 --- a/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs @@ -42,6 +42,7 @@ public async Task DisposeAsync() } await Host.StopAsync(); + Host.Dispose(); } diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs index c383d5338..8abfa6f3a 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs @@ -48,6 +48,7 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs index c0cb95c4a..57dfc34d3 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs @@ -49,6 +49,7 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs index 4eea932bc..84cc5c52d 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs @@ -48,6 +48,7 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs index eca012282..d0aa03d81 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs @@ -49,8 +49,9 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } - + [Fact] public async Task send_to_topic_and_receive_in_queue_raw_a_single_message_raw() { diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs index 2f0fd3af8..4d020d3f1 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs @@ -112,7 +112,9 @@ private async Task SendRawJsonMessage(Guid id, TimeSpan timeout) public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await _sender.StopAsync(); + _sender.Dispose(); } } } diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs index 4b60a4696..5e268b511 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs @@ -112,6 +112,8 @@ private async Task SendRawJsonMessage(Guid id, TimeSpan timeout) public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await _sender.StopAsync(); + _sender.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs index f7ccf9e47..027e2595d 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs @@ -23,9 +23,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs index 9911cbfea..b3def9e43 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs @@ -23,9 +23,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs index 57846c3e1..1a4b7cb6f 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs @@ -38,7 +38,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs index 9cf10c9c7..0aea79581 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs @@ -63,6 +63,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs index 64409d045..731f57239 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs @@ -51,6 +51,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs index b17522b49..13a5f4e54 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs @@ -105,9 +105,13 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await Main.StopAsync(); + Main.Dispose(); await One.StopAsync(); + One.Dispose(); await Two.StopAsync(); + Two.Dispose(); await Three.StopAsync(); + Three.Dispose(); } } diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs index fee32dbf5..4454f59be 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs @@ -55,7 +55,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs index ef7ba85ab..aeaf60cab 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs @@ -32,6 +32,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] @@ -138,6 +139,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs index 6686490ca..89b16061d 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs @@ -60,7 +60,9 @@ public async Task broadcast() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs index 54f1d5bdf..c7d77154a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs @@ -99,7 +99,9 @@ public async Task route_by_derived_topics_2() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs index 0b4a43097..f219944e4 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs @@ -101,6 +101,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs index d7c141b1b..f9e738574 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs @@ -60,7 +60,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs index d653cd9d1..ec4f41578 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs @@ -70,6 +70,7 @@ public async Task cascaded_message_receives_partition_key_from_originating_group public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs index 2229a5197..d689bded1 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs @@ -93,6 +93,8 @@ public async Task do_not_go_into_infinite_loop_with_garbage_data() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs index 033b1d4e1..7e8d49c15 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs @@ -97,6 +97,8 @@ public async Task receive_message_with_group_id() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs index baa70e473..eecc74009 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs @@ -72,7 +72,9 @@ public async Task send_ack_message() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); await Broker.StopAsync(); await Broker.DisposeAsync(); } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs index 600959680..f5b639b7a 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs @@ -67,7 +67,9 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs index 21c62fa41..34480bc96 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs @@ -89,6 +89,8 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs index e5c5a687d..fb6fcaf3b 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs @@ -91,7 +91,9 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs index 4335867a6..fc17b7207 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs @@ -72,6 +72,8 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs index 8fdfd29cc..de44547e9 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs @@ -70,6 +70,8 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs index 2a1307365..edfd35a64 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs @@ -110,7 +110,9 @@ public async Task send_to_shared_topic_and_receive_with_mosquitto() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } @@ -169,6 +171,8 @@ public async Task can_receive_with_shared_subscription_wildcard() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs index e183e9978..b08c7f0c4 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs @@ -25,6 +25,7 @@ public async Task UnsubscribeOnClose() await host.MessageBus().PublishAsync(new PulsarListenerTestMessage()); await host.StopAsync(); + host.Dispose(); var subscriptionExists = await SubscriptionExists(); @@ -47,6 +48,7 @@ public async Task KeepSubscriptionOnClose() await host.MessageBus()!.PublishAsync(new PulsarListenerTestMessage()); await host.StopAsync(); + host.Dispose(); var subscriptionExists = await SubscriptionExists(); diff --git a/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs b/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs index 07a26022c..1feb96d5f 100644 --- a/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs +++ b/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs @@ -279,6 +279,7 @@ public async Task StartReceiver(string name) public async Task StopReceiver(string name) { await _receivers[name].StopAsync(); + _receivers[name].Dispose(); _receivers.Remove(name); _output.WriteLine($"Stopped receiver {name}"); @@ -312,6 +313,7 @@ public async Task StartSender(string name) public async Task StopSender(string name) { await _senders[name].StopAsync(); + _senders[name].Dispose(); _senders.Remove(name); _output.WriteLine($"Stopped sender {name}"); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs index 5a904c29f..7206c1911 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs @@ -21,6 +21,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs index 6a5b81d2f..a11c1659a 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs @@ -77,7 +77,11 @@ public async Task InitializeAsync() public async Task DisposeAsync() { _hosts.Reverse(); - foreach (var host in _hosts) await host.StopAsync(); + foreach (var host in _hosts) + { + await host.StopAsync(); + host.Dispose(); + } } private async Task startHostAsync() @@ -111,6 +115,7 @@ private async Task shutdownHostAsync(IHost host) { host.GetRuntime().Agents.DisableHealthChecks(); await host.StopAsync(); + host.Dispose(); _hosts.Remove(host); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs index 570ec5ca7..8a7607c4e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs @@ -56,6 +56,7 @@ public async ValueTask DisposeAsync() foreach (var host in _hosts) { await host.StopAsync(); + host.Dispose(); } } @@ -104,8 +105,9 @@ await host.WaitUntilAssignmentsChangeTo(w => host4.GetRuntime().Endpoints.ActiveListeners().Where(x => x.Endpoint.Role == EndpointRole.Application).Any(x => x.Uri.Scheme == "rabbitmq").ShouldBeFalse(); await host.StopAsync(); + host.Dispose(); _hosts.Remove(host); - + await host2.WaitUntilAssumesLeadershipAsync(30.Seconds()); await host2.WaitUntilAssignmentsChangeTo(w => { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs index 5fca425ca..65387ab82 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs @@ -70,7 +70,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs index dea23b2d4..a4c428f4e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs @@ -120,9 +120,13 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await Main.StopAsync(); + Main.Dispose(); await One.StopAsync(); + One.Dispose(); await Two.StopAsync(); + Two.Dispose(); await Three.StopAsync(); + Three.Dispose(); } private static async Task declareVirtualHost(string vhname) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs index 0f366cfb4..59f62a4e4 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs @@ -199,6 +199,8 @@ private static async Task safeStopAsync(IHost host) { } + host.Dispose(); + try { host.Dispose(); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs index 227f40d49..4836aeba5 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs @@ -28,6 +28,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() => _host.StopAsync(); + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs index f78314c5a..2b18d4700 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs @@ -35,6 +35,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs index e9b698a65..cb5948511 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs @@ -43,6 +43,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] @@ -97,6 +98,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs b/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs index 5adb274c1..53c2e3e7d 100644 --- a/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs +++ b/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs @@ -104,10 +104,12 @@ public async Task StartClientHost(string serviceName = "Client") public async Task DisposeAsync() { await theWebApp.StopAsync(); + theWebApp.Dispose(); foreach (var clientHost in _clientHosts) { await clientHost.StopAsync(); + clientHost.Dispose(); } } @@ -223,10 +225,12 @@ public async Task StartClientHost(string serviceName = "Client", string a public virtual async Task DisposeAsync() { await theWebApp.StopAsync(); + theWebApp.Dispose(); foreach (var clientHost in _clientHosts) { await clientHost.StopAsync(); + clientHost.Dispose(); } } From e166900ffe465cccd448abd8f519693910ef4c20 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 10 Mar 2026 11:29:35 -0500 Subject: [PATCH 3/5] Add Oracle EditAndReplayAsync, centralize compliance test, fix EFCore test schema setup Implement the missing IDeadLetters.EditAndReplayAsync method on the Oracle message store and add a centralized compliance test in MessageStoreCompliance so all persistence providers are covered. Fix EFCoreTests failures by dropping the mt_items schema before host startup, ensuring Weasel migrations can cleanly create/alter columns without conflicting with stale data from previous test runs. Co-Authored-By: Claude Opus 4.6 --- .../EfCoreTests/Bug_252_codegen_issue.cs | 12 +++++++ .../end_to_end_efcore_persistence.cs | 30 ++++++++++++----- .../OracleTests/OracleMessageStoreTests.cs | 1 + .../OracleMessageStore.DeadLetters.cs | 32 +++++++++++++++++++ .../MessageStoreCompliance.cs | 18 +++++++++++ 5 files changed, 85 insertions(+), 8 deletions(-) diff --git a/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs b/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs index c461c9f96..a3c78d9ba 100644 --- a/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs +++ b/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs @@ -3,11 +3,13 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using JasperFx.Resources; +using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using SharedPersistenceModels.Items; using Shouldly; +using Weasel.SqlServer; using Wolverine; using Wolverine.Attributes; using Wolverine.EntityFrameworkCore; @@ -31,6 +33,11 @@ public Bug_252_codegen_issue(ITestOutputHelper output) [Fact] public async Task use_the_saga_type_to_determine_the_correct_DbContext_type() { + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("mt_items"); + await conn.CloseAsync(); + using var host = await Host.CreateDefaultBuilder() .UseWolverine(opt => { @@ -60,6 +67,11 @@ public async Task use_the_saga_type_to_determine_the_correct_DbContext_type() [Fact] public async Task bug_256_message_bus_should_be_in_outbox_transaction() { + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("mt_items"); + await conn.CloseAsync(); + using var host = await Host.CreateDefaultBuilder() .UseWolverine(opt => { diff --git a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs index 2543da4eb..99b9da626 100644 --- a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs +++ b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs @@ -8,7 +8,6 @@ using JasperFx.Resources; using SharedPersistenceModels.Items; using Shouldly; -using Wolverine.ComplianceTests; using Weasel.Core; using Weasel.SqlServer; using Weasel.SqlServer.Tables; @@ -24,11 +23,21 @@ namespace EfCoreTests; -public class EFCorePersistenceContext : BaseContext +public class EFCorePersistenceContext : IAsyncLifetime { - public EFCorePersistenceContext() : base(true) + public IHost theHost { get; private set; } = null!; + + public async Task InitializeAsync() { - builder.ConfigureServices((c, services) => + // Drop the schema first so Weasel migrations can cleanly create/alter columns + // without conflicting with stale data from previous test runs + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("mt_items"); + await conn.CloseAsync(); + + theHost = await Host.CreateDefaultBuilder() + .ConfigureServices((c, services) => { services.AddDbContext(x => x.UseSqlServer(Servers.SqlServerConnectionString)); services.AddDbContext(x => x.UseSqlServer(Servers.SqlServerConnectionString)); @@ -39,16 +48,21 @@ public EFCorePersistenceContext() : base(true) options.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString); options.Services.AddResourceSetupOnStartup(StartupAction.ResetState); options.UseEntityFrameworkCoreTransactions(); - + options.Policies.ConfigureConventionalLocalRouting() .CustomizeQueues((_, q) => q.UseDurableInbox()); - + options.Policies.AutoApplyTransactions(); - + options.UseEntityFrameworkCoreWolverineManagedMigrations(); - }); + }).StartAsync(); } + public async Task DisposeAsync() + { + await theHost.StopAsync(); + theHost.Dispose(); + } } [Collection("sqlserver")] diff --git a/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs b/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs index c77652acf..839a293b8 100644 --- a/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs +++ b/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs @@ -98,4 +98,5 @@ public async Task can_schedule_envelope() var counts = await thePersistence.Admin.FetchCountsAsync(); counts.Scheduled.ShouldBeGreaterThanOrEqualTo(1); } + } diff --git a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs index 536c6d484..7d21a7145 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs @@ -5,6 +5,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; using Wolverine.RDBMS; +using Wolverine.Runtime.Serialization; namespace Wolverine.Oracle; @@ -180,6 +181,37 @@ public async Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken t } } + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + var deadLetter = await DeadLetterEnvelopeByIdAsync(envelopeId); + if (deadLetter == null) return; + + deadLetter.Envelope.Data = newBody; + var serialized = EnvelopeSerializer.Serialize(deadLetter.Envelope); + + var builder = ToOracleCommandBuilder(); + builder.Append( + $"UPDATE {SchemaName}.{DatabaseConstants.DeadLetterTable} SET {DatabaseConstants.Body} = "); + builder.AppendParameter(serialized); + builder.Append($", {DatabaseConstants.Replayable} = "); + builder.AppendParameter(1); // Oracle uses NUMBER(1) for bool + builder.Append($" WHERE {DatabaseConstants.Id} = "); + builder.AppendParameter(envelopeId); + + var cmd = builder.Compile(); + + await using var conn = await _dataSource.OpenConnectionAsync(token); + try + { + cmd.Connection = conn; + await cmd.ExecuteNonQueryAsync(token); + } + finally + { + await conn.CloseAsync(); + } + } + private void writeDeadLetterWhereClause(DeadLetterEnvelopeQuery query, Weasel.Oracle.CommandBuilder builder) { if (query.Range.From.HasValue) diff --git a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs index 8e8e5ba15..089335d75 100644 --- a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs @@ -821,6 +821,24 @@ await thePersistence.Nodes.PersistAgentRestrictionsAsync( state.Restrictions.Current.OrderBy(x => x.AgentUri.ToString()).ShouldBe([restriction1, restriction2, restriction5, restriction6]); } + [Fact] + public async Task can_edit_and_replay_dead_letter_envelope() + { + var envelope = ObjectMother.Envelope(); + await thePersistence.Inbox.StoreIncomingAsync(envelope); + + var exception = new InvalidOperationException("Test error"); + await thePersistence.Inbox.MoveToDeadLetterStorageAsync(envelope, exception); + + var newBody = new byte[] { 1, 2, 3, 4, 5 }; + await thePersistence.DeadLetters.EditAndReplayAsync(envelope.Id, newBody, CancellationToken.None); + + var deadLetter = await thePersistence.DeadLetters.DeadLetterEnvelopeByIdAsync(envelope.Id); + deadLetter.ShouldNotBeNull(); + deadLetter.Envelope.Data.ShouldBe(newBody); + deadLetter.Replayable.ShouldBeTrue(); + } + [Fact] public async Task query_scheduled_messages() { From d6129c240f87317ac13fce0d9ca32d189ffa09c7 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 10 Mar 2026 17:14:40 -0500 Subject: [PATCH 4/5] Fix race condition in ReplyListener timeout cancellation Unregister the listener and set status before calling TrySetException, so that with RunContinuationsAsynchronously the listener is cleaned up before any awaiter continuation resumes. Co-Authored-By: Claude Opus 4.6 --- .../Runtime/RemoteInvocation/ReplyListener.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs b/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs index 13b0572b1..1cd5f073c 100644 --- a/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs +++ b/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs @@ -54,6 +54,13 @@ public void Dispose() private void onCancellation() { + // Unregister before setting the exception so the listener is removed + // before any continuation runs (TaskCreationOptions.RunContinuationsAsynchronously + // can cause the awaiter to resume before subsequent lines execute) + Parent.Unregister(this); + + Status = TaskStatus.Faulted; + if (typeof(T) == typeof(Acknowledgement)) { _completion?.TrySetException(new TimeoutException( @@ -64,9 +71,5 @@ private void onCancellation() _completion?.TrySetException(new TimeoutException( $"Timed out waiting for expected response {typeof(T).FullNameInCode()} for original message {RequestId} of type {RequestType} with a configured timeout of {_timeout.TotalMilliseconds} milliseconds")); } - - Parent.Unregister(this); - - Status = TaskStatus.Faulted; } } \ No newline at end of file From fd62e4c78453607d26673b06e38bf65ff53db264 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 10 Mar 2026 17:30:15 -0500 Subject: [PATCH 5/5] Fix WebApplication disposal in SignalR test contexts WebApplication implements IAsyncDisposable, not IDisposable in .NET 9. Use DisposeAsync() instead of Dispose() to fix compilation errors. Co-Authored-By: Claude Opus 4.6 --- .../SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs b/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs index 53c2e3e7d..6454a67d5 100644 --- a/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs +++ b/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs @@ -104,7 +104,7 @@ public async Task StartClientHost(string serviceName = "Client") public async Task DisposeAsync() { await theWebApp.StopAsync(); - theWebApp.Dispose(); + await theWebApp.DisposeAsync(); foreach (var clientHost in _clientHosts) { @@ -225,7 +225,7 @@ public async Task StartClientHost(string serviceName = "Client", string a public virtual async Task DisposeAsync() { await theWebApp.StopAsync(); - theWebApp.Dispose(); + await theWebApp.DisposeAsync(); foreach (var clientHost in _clientHosts) {