diff --git a/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs b/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs index 838b74dc0..61134de4e 100644 --- a/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs +++ b/src/Http/Wolverine.Http.Tests/Marten/reacting_to_read_aggregate.cs @@ -44,6 +44,7 @@ async Task IAsyncLifetime.DisposeAsync() if (theHost != null) { await theHost.StopAsync(); + theHost.Dispose(); } } diff --git a/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs b/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs index 8aa03501b..7d1655ad8 100644 --- a/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs +++ b/src/Http/Wolverine.Http.Tests/Persistence/global_entity_defaults_http.cs @@ -46,6 +46,7 @@ async Task IAsyncLifetime.DisposeAsync() if (theHost != null) { await theHost.StopAsync(); + theHost.Dispose(); } } diff --git a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs index 9ea188c1a..a2d0f4ad9 100644 --- a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs +++ b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_entity_attributes.cs @@ -52,6 +52,7 @@ async Task IAsyncLifetime.DisposeAsync() if (theHost != null) { await theHost.StopAsync(); + theHost.Dispose(); } } diff --git a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs index 22403e866..bc14e3913 100644 --- a/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs +++ b/src/Http/Wolverine.Http.Tests/Persistence/reacting_to_required_attribute.cs @@ -46,7 +46,11 @@ public async Task InitializeAsync() async Task IAsyncLifetime.DisposeAsync() { - if (theHost != null) await theHost.StopAsync(); + if (theHost != null) + { + await theHost.StopAsync(); + theHost.Dispose(); + } } [Fact] diff --git a/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs b/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs index c461c9f96..a3c78d9ba 100644 --- a/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs +++ b/src/Persistence/EfCoreTests/Bug_252_codegen_issue.cs @@ -3,11 +3,13 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using JasperFx.Resources; +using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using SharedPersistenceModels.Items; using Shouldly; +using Weasel.SqlServer; using Wolverine; using Wolverine.Attributes; using Wolverine.EntityFrameworkCore; @@ -31,6 +33,11 @@ public Bug_252_codegen_issue(ITestOutputHelper output) [Fact] public async Task use_the_saga_type_to_determine_the_correct_DbContext_type() { + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("mt_items"); + await conn.CloseAsync(); + using var host = await Host.CreateDefaultBuilder() .UseWolverine(opt => { @@ -60,6 +67,11 @@ public async Task use_the_saga_type_to_determine_the_correct_DbContext_type() [Fact] public async Task bug_256_message_bus_should_be_in_outbox_transaction() { + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("mt_items"); + await conn.CloseAsync(); + using var host = await Host.CreateDefaultBuilder() .UseWolverine(opt => { diff --git a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs index 2543da4eb..99b9da626 100644 --- a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs +++ b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs @@ -8,7 +8,6 @@ using JasperFx.Resources; using SharedPersistenceModels.Items; using Shouldly; -using Wolverine.ComplianceTests; using Weasel.Core; using Weasel.SqlServer; using Weasel.SqlServer.Tables; @@ -24,11 +23,21 @@ namespace EfCoreTests; -public class EFCorePersistenceContext : BaseContext +public class EFCorePersistenceContext : IAsyncLifetime { - public EFCorePersistenceContext() : base(true) + public IHost theHost { get; private set; } = null!; + + public async Task InitializeAsync() { - builder.ConfigureServices((c, services) => + // Drop the schema first so Weasel migrations can cleanly create/alter columns + // without conflicting with stale data from previous test runs + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("mt_items"); + await conn.CloseAsync(); + + theHost = await Host.CreateDefaultBuilder() + .ConfigureServices((c, services) => { services.AddDbContext(x => x.UseSqlServer(Servers.SqlServerConnectionString)); services.AddDbContext(x => x.UseSqlServer(Servers.SqlServerConnectionString)); @@ -39,16 +48,21 @@ public EFCorePersistenceContext() : base(true) options.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString); options.Services.AddResourceSetupOnStartup(StartupAction.ResetState); options.UseEntityFrameworkCoreTransactions(); - + options.Policies.ConfigureConventionalLocalRouting() .CustomizeQueues((_, q) => q.UseDurableInbox()); - + options.Policies.AutoApplyTransactions(); - + options.UseEntityFrameworkCoreWolverineManagedMigrations(); - }); + }).StartAsync(); } + public async Task DisposeAsync() + { + await theHost.StopAsync(); + theHost.Dispose(); + } } [Collection("sqlserver")] diff --git a/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs b/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs index c77652acf..839a293b8 100644 --- a/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs +++ b/src/Persistence/Oracle/OracleTests/OracleMessageStoreTests.cs @@ -98,4 +98,5 @@ public async Task can_schedule_envelope() var counts = await thePersistence.Admin.FetchCountsAsync(); counts.Scheduled.ShouldBeGreaterThanOrEqualTo(1); } + } diff --git a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs index 536c6d484..7d21a7145 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.DeadLetters.cs @@ -5,6 +5,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; using Wolverine.RDBMS; +using Wolverine.Runtime.Serialization; namespace Wolverine.Oracle; @@ -180,6 +181,37 @@ public async Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken t } } + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + var deadLetter = await DeadLetterEnvelopeByIdAsync(envelopeId); + if (deadLetter == null) return; + + deadLetter.Envelope.Data = newBody; + var serialized = EnvelopeSerializer.Serialize(deadLetter.Envelope); + + var builder = ToOracleCommandBuilder(); + builder.Append( + $"UPDATE {SchemaName}.{DatabaseConstants.DeadLetterTable} SET {DatabaseConstants.Body} = "); + builder.AppendParameter(serialized); + builder.Append($", {DatabaseConstants.Replayable} = "); + builder.AppendParameter(1); // Oracle uses NUMBER(1) for bool + builder.Append($" WHERE {DatabaseConstants.Id} = "); + builder.AppendParameter(envelopeId); + + var cmd = builder.Compile(); + + await using var conn = await _dataSource.OpenConnectionAsync(token); + try + { + cmd.Connection = conn; + await cmd.ExecuteNonQueryAsync(token); + } + finally + { + await conn.CloseAsync(); + } + } + private void writeDeadLetterWhereClause(DeadLetterEnvelopeQuery query, Weasel.Oracle.CommandBuilder builder) { if (query.Range.From.HasValue) diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs index 4d0823a50..794187281 100644 --- a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs @@ -3,6 +3,7 @@ using Microsoft.Azure.Cosmos; using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Runtime.Serialization; namespace Wolverine.CosmosDb.Internals; @@ -301,6 +302,28 @@ private async Task> LoadDeadLettersByQuery(DeadLetterEnv return messages; } + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + try + { + var response = await _container.ReadItemAsync(DlqId(envelopeId), + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + var message = response.Resource; + + // Deserialize the stored envelope, replace its Data with the new message body, re-serialize + var envelope = EnvelopeSerializer.Deserialize(message.Body); + envelope.Data = newBody; + message.Body = EnvelopeSerializer.Serialize(envelope); + message.Replayable = true; + await _container.ReplaceItemAsync(message, message.Id, + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + private QueryDefinition RebuildQueryDef(string queryText, DeadLetterEnvelopeQuery query) { var queryDef = new QueryDefinition(queryText) diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs index 280e8e868..a2c106abe 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs @@ -4,6 +4,7 @@ using Weasel.Core; using Wolverine.Persistence.Durability.DeadLetterManagement; using Wolverine.RDBMS.Durability; +using Wolverine.Runtime.Serialization; namespace Wolverine.RDBMS; @@ -186,4 +187,26 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) return executeCommandBatch(builder, token); } + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + // Read the current serialized envelope, replace its Data, and re-serialize + var deadLetter = await DeadLetterEnvelopeByIdAsync(envelopeId); + if (deadLetter == null) return; + + deadLetter.Envelope.Data = newBody; + var serialized = EnvelopeSerializer.Serialize(deadLetter.Envelope); + + var builder = ToCommandBuilder(); + builder.Append( + $"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Body} = "); + builder.AppendParameter(serialized); + builder.Append($", {DatabaseConstants.Replayable} = "); + builder.AppendParameter(true); + builder.Append($" where {DatabaseConstants.Id} = "); + builder.AppendParameter(envelopeId); + builder.Append(';'); + + await executeCommandBatch(builder, token); + } + } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs index ad599b58a..d9bd672c0 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs @@ -5,6 +5,7 @@ using Raven.Client.Documents.Queries; using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Runtime.Serialization; namespace Wolverine.RavenDb.Internals; @@ -249,4 +250,18 @@ public async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? t { await ReplayAsync(new DeadLetterEnvelopeQuery { MessageIds = ids }, CancellationToken.None); } + + public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + using var session = _store.OpenAsyncSession(); + var message = await session.LoadAsync(dlqId(envelopeId), token); + if (message is null) return; + + // Deserialize the stored envelope, replace its Data with the new message body, re-serialize + var envelope = EnvelopeSerializer.Deserialize(message.Body); + envelope.Data = newBody; + message.Body = EnvelopeSerializer.Serialize(envelope); + message.Replayable = true; + await session.SaveChangesAsync(token); + } } diff --git a/src/Testing/BackPressureTests/Harness.cs b/src/Testing/BackPressureTests/Harness.cs index e176cacd4..fb87bbb8f 100644 --- a/src/Testing/BackPressureTests/Harness.cs +++ b/src/Testing/BackPressureTests/Harness.cs @@ -99,7 +99,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Testing/Benchmarks/Driver.cs b/src/Testing/Benchmarks/Driver.cs index f929eabf8..1a0666027 100644 --- a/src/Testing/Benchmarks/Driver.cs +++ b/src/Testing/Benchmarks/Driver.cs @@ -77,6 +77,7 @@ public async Task Teardown() if (Host != null) { await Host.StopAsync(); + Host.Dispose(); Host = null; } } diff --git a/src/Testing/CoreTests/Acceptance/batch_processing.cs b/src/Testing/CoreTests/Acceptance/batch_processing.cs index 204e930a7..69feab7b7 100644 --- a/src/Testing/CoreTests/Acceptance/batch_processing.cs +++ b/src/Testing/CoreTests/Acceptance/batch_processing.cs @@ -50,6 +50,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theHost.StopAsync(); + theHost.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Acceptance/multi_tenancy.cs b/src/Testing/CoreTests/Acceptance/multi_tenancy.cs index 7d644d7c6..cff08dbb1 100644 --- a/src/Testing/CoreTests/Acceptance/multi_tenancy.cs +++ b/src/Testing/CoreTests/Acceptance/multi_tenancy.cs @@ -29,6 +29,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Acceptance/remote_invocation.cs b/src/Testing/CoreTests/Acceptance/remote_invocation.cs index 33ee2f3eb..564940086 100644 --- a/src/Testing/CoreTests/Acceptance/remote_invocation.cs +++ b/src/Testing/CoreTests/Acceptance/remote_invocation.cs @@ -71,8 +71,11 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _receiver1.StopAsync(); + _receiver1.Dispose(); await _receiver2.StopAsync(); + _receiver2.Dispose(); await _sender.StopAsync(); + _sender.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs b/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs index 50870e44f..503d0d4ad 100644 --- a/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs +++ b/src/Testing/CoreTests/Acceptance/using_with_keyed_services.cs @@ -27,9 +27,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] @@ -82,9 +83,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs b/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs index 832ae7907..92234e7c0 100644 --- a/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs +++ b/src/Testing/CoreTests/Bugs/Bug_2056_saga_code_generation.cs @@ -30,6 +30,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs b/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs index abfa32239..a091ffd2d 100644 --- a/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs +++ b/src/Testing/CoreTests/Bugs/Bug_2073_completed_saga_still_persisted.cs @@ -26,6 +26,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs b/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs index 57dad115d..3c21897fb 100644 --- a/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs +++ b/src/Testing/CoreTests/ErrorHandling/custom_error_action_raises_new_message.cs @@ -52,7 +52,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theReceiver.StopAsync(); + theReceiver.Dispose(); await theSender.StopAsync(); + theSender.Dispose(); } [Fact] @@ -130,7 +132,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theReceiver.StopAsync(); + theReceiver.Dispose(); await theSender.StopAsync(); + theSender.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs b/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs index 2225003c4..d39e55f5e 100644 --- a/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs +++ b/src/Testing/CoreTests/Runtime/Stubs/using_stubs_end_to_end.cs @@ -37,7 +37,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await theSender.StopAsync(); + theSender.Dispose(); await theReceiver.StopAsync(); + theReceiver.Dispose(); } [Fact] diff --git a/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs b/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs index a5d4f4d26..f44a177cf 100644 --- a/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs +++ b/src/Testing/CoreTests/Tracking/when_session_is_tracked_for_published_message_without_handler.cs @@ -111,5 +111,9 @@ public async Task should_apply_equally_when_tracked_across_multiple_hosts() } - public async Task DisposeAsync() => await _host.StopAsync(); + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } } diff --git a/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs b/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs index 14ae4282b..0c3e904e9 100644 --- a/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs +++ b/src/Testing/OpenTelemetry/TracingTests/HostsFixture.cs @@ -78,8 +78,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { - // await WebApi.DisposeAsync(); - // await FirstSubscriber.StopAsync(); + await WebApi.DisposeAsync(); + await FirstSubscriber.StopAsync(); + FirstSubscriber.Dispose(); await SecondSubscriber.StopAsync(); SecondSubscriber.Dispose(); } diff --git a/src/Testing/SlowTests/invoke_async_with_delivery_options.cs b/src/Testing/SlowTests/invoke_async_with_delivery_options.cs index f03ad8e31..c13b51949 100644 --- a/src/Testing/SlowTests/invoke_async_with_delivery_options.cs +++ b/src/Testing/SlowTests/invoke_async_with_delivery_options.cs @@ -39,7 +39,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _publisher.StopAsync(); + _publisher.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs index 8e8e5ba15..089335d75 100644 --- a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs @@ -821,6 +821,24 @@ await thePersistence.Nodes.PersistAgentRestrictionsAsync( state.Restrictions.Current.OrderBy(x => x.AgentUri.ToString()).ShouldBe([restriction1, restriction2, restriction5, restriction6]); } + [Fact] + public async Task can_edit_and_replay_dead_letter_envelope() + { + var envelope = ObjectMother.Envelope(); + await thePersistence.Inbox.StoreIncomingAsync(envelope); + + var exception = new InvalidOperationException("Test error"); + await thePersistence.Inbox.MoveToDeadLetterStorageAsync(envelope, exception); + + var newBody = new byte[] { 1, 2, 3, 4, 5 }; + await thePersistence.DeadLetters.EditAndReplayAsync(envelope.Id, newBody, CancellationToken.None); + + var deadLetter = await thePersistence.DeadLetters.DeadLetterEnvelopeByIdAsync(envelope.Id); + deadLetter.ShouldNotBeNull(); + deadLetter.Envelope.Data.ShouldBe(newBody); + deadLetter.Replayable.ShouldBeTrue(); + } + [Fact] public async Task query_scheduled_messages() { diff --git a/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs b/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs index a905e5988..689498a48 100644 --- a/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/StorageActionCompliance.cs @@ -42,6 +42,7 @@ public async Task DisposeAsync() } await Host.StopAsync(); + Host.Dispose(); } diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs index c383d5338..8abfa6f3a 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue.cs @@ -48,6 +48,7 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs index c0cb95c4a..57dfc34d3 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_in_aws.cs @@ -49,6 +49,7 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs index 4eea932bc..84cc5c52d 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_cloud_events.cs @@ -48,6 +48,7 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs index eca012282..d0aa03d81 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/send_to_topic_and_receive_in_queue_with_rawMessageDelivery.cs @@ -49,8 +49,9 @@ public async Task DisposeAsync() } await _host.StopAsync(); + _host.Dispose(); } - + [Fact] public async Task send_to_topic_and_receive_in_queue_raw_a_single_message_raw() { diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs index 2f0fd3af8..4d020d3f1 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_buffered.cs @@ -112,7 +112,9 @@ private async Task SendRawJsonMessage(Guid id, TimeSpan timeout) public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await _sender.StopAsync(); + _sender.Dispose(); } } } diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs index 4b60a4696..5e268b511 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/RawJson/receive_raw_json_as_inline.cs @@ -112,6 +112,8 @@ private async Task SendRawJsonMessage(Guid id, TimeSpan timeout) public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await _sender.StopAsync(); + _sender.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs index f7ccf9e47..027e2595d 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs @@ -23,9 +23,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs index 9911cbfea..b3def9e43 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive_with_CloudEvents.cs @@ -23,9 +23,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() + public async Task DisposeAsync() { - return _host.StopAsync(); + await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs index 57846c3e1..1a4b7cb6f 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs @@ -38,7 +38,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs index 9cf10c9c7..0aea79581 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs @@ -63,6 +63,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs index 64409d045..731f57239 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end_with_CloudEvents.cs @@ -51,6 +51,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs index b17522b49..13a5f4e54 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/multi_tenancy_through_separate_namespaces.cs @@ -105,9 +105,13 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await Main.StopAsync(); + Main.Dispose(); await One.StopAsync(); + One.Dispose(); await Two.StopAsync(); + Two.Dispose(); await Three.StopAsync(); + Three.Dispose(); } } diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs index fee32dbf5..4454f59be 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs @@ -55,7 +55,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs index ef7ba85ab..aeaf60cab 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/send_and_receive.cs @@ -32,6 +32,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] @@ -138,6 +139,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs index 6686490ca..89b16061d 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs @@ -60,7 +60,9 @@ public async Task broadcast() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs index 54f1d5bdf..c7d77154a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs @@ -99,7 +99,9 @@ public async Task route_by_derived_topics_2() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs index 0b4a43097..f219944e4 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs @@ -101,6 +101,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs index d7c141b1b..f9e738574 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs @@ -60,7 +60,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs index d653cd9d1..ec4f41578 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs @@ -70,6 +70,7 @@ public async Task cascaded_message_receives_partition_key_from_originating_group public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs index 2229a5197..d689bded1 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs @@ -93,6 +93,8 @@ public async Task do_not_go_into_infinite_loop_with_garbage_data() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs index 033b1d4e1..7e8d49c15 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs @@ -97,6 +97,8 @@ public async Task receive_message_with_group_id() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs index baa70e473..eecc74009 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs @@ -72,7 +72,9 @@ public async Task send_ack_message() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); await Broker.StopAsync(); await Broker.DisposeAsync(); } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs index 600959680..f5b639b7a 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_async.cs @@ -67,7 +67,9 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs index 21c62fa41..34480bc96 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_by_user_logic.cs @@ -89,6 +89,8 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs index e5c5a687d..fb6fcaf3b 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broadcast_to_topic_rules.cs @@ -91,7 +91,9 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs index 4335867a6..fc17b7207 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_emqx_shared_group_topic.cs @@ -72,6 +72,8 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs index 8fdfd29cc..de44547e9 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs @@ -70,6 +70,8 @@ public async Task DisposeAsync() await Broker.StopAsync(); await Broker.DisposeAsync(); await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs index 2a1307365..edfd35a64 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/mosquitto_compliance.cs @@ -110,7 +110,9 @@ public async Task send_to_shared_topic_and_receive_with_mosquitto() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } @@ -169,6 +171,8 @@ public async Task can_receive_with_shared_subscription_wildcard() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } } diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs index e183e9978..b08c7f0c4 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/PulsarListenerTests.cs @@ -25,6 +25,7 @@ public async Task UnsubscribeOnClose() await host.MessageBus().PublishAsync(new PulsarListenerTestMessage()); await host.StopAsync(); + host.Dispose(); var subscriptionExists = await SubscriptionExists(); @@ -47,6 +48,7 @@ public async Task KeepSubscriptionOnClose() await host.MessageBus()!.PublishAsync(new PulsarListenerTestMessage()); await host.StopAsync(); + host.Dispose(); var subscriptionExists = await SubscriptionExists(); diff --git a/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs b/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs index 07a26022c..1feb96d5f 100644 --- a/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs +++ b/src/Transports/RabbitMQ/ChaosTesting/ChaosDriver.cs @@ -279,6 +279,7 @@ public async Task StartReceiver(string name) public async Task StopReceiver(string name) { await _receivers[name].StopAsync(); + _receivers[name].Dispose(); _receivers.Remove(name); _output.WriteLine($"Stopped receiver {name}"); @@ -312,6 +313,7 @@ public async Task StartSender(string name) public async Task StopSender(string name) { await _senders[name].StopAsync(); + _senders[name].Dispose(); _senders.Remove(name); _output.WriteLine($"Stopped sender {name}"); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs index 5a904c29f..7206c1911 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dynamic_object_creation_smoke_tests.cs @@ -21,6 +21,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs index 6a5b81d2f..a11c1659a 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs @@ -77,7 +77,11 @@ public async Task InitializeAsync() public async Task DisposeAsync() { _hosts.Reverse(); - foreach (var host in _hosts) await host.StopAsync(); + foreach (var host in _hosts) + { + await host.StopAsync(); + host.Dispose(); + } } private async Task startHostAsync() @@ -111,6 +115,7 @@ private async Task shutdownHostAsync(IHost host) { host.GetRuntime().Agents.DisableHealthChecks(); await host.StopAsync(); + host.Dispose(); _hosts.Remove(host); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs index 570ec5ca7..8a7607c4e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs @@ -56,6 +56,7 @@ public async ValueTask DisposeAsync() foreach (var host in _hosts) { await host.StopAsync(); + host.Dispose(); } } @@ -104,8 +105,9 @@ await host.WaitUntilAssignmentsChangeTo(w => host4.GetRuntime().Endpoints.ActiveListeners().Where(x => x.Endpoint.Role == EndpointRole.Application).Any(x => x.Uri.Scheme == "rabbitmq").ShouldBeFalse(); await host.StopAsync(); + host.Dispose(); _hosts.Remove(host); - + await host2.WaitUntilAssumesLeadershipAsync(30.Seconds()); await host2.WaitUntilAssignmentsChangeTo(w => { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs index 5fca425ca..65387ab82 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs @@ -70,7 +70,9 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _sender.StopAsync(); + _sender.Dispose(); await _receiver.StopAsync(); + _receiver.Dispose(); } [Fact] diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs index dea23b2d4..a4c428f4e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs @@ -120,9 +120,13 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await Main.StopAsync(); + Main.Dispose(); await One.StopAsync(); + One.Dispose(); await Two.StopAsync(); + Two.Dispose(); await Three.StopAsync(); + Three.Dispose(); } private static async Task declareVirtualHost(string vhname) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs index 0f366cfb4..59f62a4e4 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/rate_limiting_end_to_end.cs @@ -199,6 +199,8 @@ private static async Task safeStopAsync(IHost host) { } + host.Dispose(); + try { host.Dispose(); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs index 227f40d49..4836aeba5 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_disabling.cs @@ -28,6 +28,10 @@ public async Task InitializeAsync() }).StartAsync(); } - public Task DisposeAsync() => _host.StopAsync(); + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs index f78314c5a..2b18d4700 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/response_queue_mechanics.cs @@ -35,6 +35,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs index e9b698a65..cb5948511 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/ResponseStreamMechanicsTests.cs @@ -43,6 +43,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] @@ -97,6 +98,7 @@ public async Task InitializeAsync() public async Task DisposeAsync() { await _host.StopAsync(); + _host.Dispose(); } [Fact] diff --git a/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs b/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs index 5adb274c1..6454a67d5 100644 --- a/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs +++ b/src/Transports/SignalR/Wolverine.SignalR.Tests/WebSocketTestContext.cs @@ -104,10 +104,12 @@ public async Task StartClientHost(string serviceName = "Client") public async Task DisposeAsync() { await theWebApp.StopAsync(); + await theWebApp.DisposeAsync(); foreach (var clientHost in _clientHosts) { await clientHost.StopAsync(); + clientHost.Dispose(); } } @@ -223,10 +225,12 @@ public async Task StartClientHost(string serviceName = "Client", string a public virtual async Task DisposeAsync() { await theWebApp.StopAsync(); + await theWebApp.DisposeAsync(); foreach (var clientHost in _clientHosts) { await clientHost.StopAsync(); + clientHost.Dispose(); } } diff --git a/src/Wolverine/Persistence/Durability/IDeadLetters.cs b/src/Wolverine/Persistence/Durability/IDeadLetters.cs index 245e604f2..ffa6ef828 100644 --- a/src/Wolverine/Persistence/Durability/IDeadLetters.cs +++ b/src/Wolverine/Persistence/Durability/IDeadLetters.cs @@ -71,4 +71,13 @@ Task> SummarizeAllAsync(string serviceName, /// /// Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token); + + /// + /// Updates the body of a dead letter envelope and marks it as replayable + /// + /// The envelope to edit + /// The new serialized message body + /// + /// + Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token); } \ No newline at end of file diff --git a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs index f97433699..cfaaaebfd 100644 --- a/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs @@ -74,6 +74,11 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) throw new NotSupportedException(); } + public Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + throw new NotSupportedException(); + } + public async Task DeadLetterEnvelopeByIdAsync(Guid id, string? tenantId = null) { if (tenantId is not null) diff --git a/src/Wolverine/Persistence/Durability/NullMessageStore.cs b/src/Wolverine/Persistence/Durability/NullMessageStore.cs index 153abf4f0..1dce8a885 100644 --- a/src/Wolverine/Persistence/Durability/NullMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/NullMessageStore.cs @@ -234,6 +234,11 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) return Task.CompletedTask; } + public Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, CancellationToken token) + { + return Task.CompletedTask; + } + public Task RebuildAsync() { return Task.CompletedTask; diff --git a/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs b/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs index 13b0572b1..1cd5f073c 100644 --- a/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs +++ b/src/Wolverine/Runtime/RemoteInvocation/ReplyListener.cs @@ -54,6 +54,13 @@ public void Dispose() private void onCancellation() { + // Unregister before setting the exception so the listener is removed + // before any continuation runs (TaskCreationOptions.RunContinuationsAsynchronously + // can cause the awaiter to resume before subsequent lines execute) + Parent.Unregister(this); + + Status = TaskStatus.Faulted; + if (typeof(T) == typeof(Acknowledgement)) { _completion?.TrySetException(new TimeoutException( @@ -64,9 +71,5 @@ private void onCancellation() _completion?.TrySetException(new TimeoutException( $"Timed out waiting for expected response {typeof(T).FullNameInCode()} for original message {RequestId} of type {RequestType} with a configured timeout of {_timeout.TotalMilliseconds} milliseconds")); } - - Parent.Unregister(this); - - Status = TaskStatus.Faulted; } } \ No newline at end of file