From 98beb7c27c7b72f1b1e26720cae86c77f1f5cbcb Mon Sep 17 00:00:00 2001 From: Daniel Winkler Date: Mon, 20 Oct 2025 19:53:15 +0200 Subject: [PATCH 1/2] feat: Replace PubSub low level API client with SubscriberClient --- .../DocumentationSamples.cs | 23 +- .../DurableSendingAndReceivingCompliance.cs | 206 +++++++++--------- .../Wolverine.Pubsub/IPubsubEnvelopeMapper.cs | 3 +- .../Internal/BatchedPubsubListener.cs | 60 ++--- .../Internal/InlinePubsubListener.cs | 31 ++- .../Internal/PubsubEnvelopeMapper.cs | 25 +-- .../Internal/PubsubListener.cs | 120 ++-------- .../GCP/Wolverine.Pubsub/PubsubEndpoint.cs | 7 +- .../GCP/Wolverine.Pubsub/PubsubEnvelope.cs | 6 - 9 files changed, 185 insertions(+), 296 deletions(-) delete mode 100644 src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs index edf42994a..279b2d226 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs @@ -238,23 +238,12 @@ public async Task customize_mappers() #region sample_custom_pubsub_mapper -public class CustomPubsubMapper : EnvelopeMapper, IPubsubEnvelopeMapper +public class CustomPubsubMapper : EnvelopeMapper, IPubsubEnvelopeMapper { public CustomPubsubMapper(PubsubEndpoint endpoint) : base(endpoint) { } - public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming) - { - envelope.AckId = incoming.AckId; - - // You will have to help Wolverine out by either telling Wolverine - // what the message type is, or by reading the actual message object, - // or by telling Wolverine separately what the default message type - // is for a listening endpoint - envelope.MessageType = typeof(Message1).ToMessageTypeName(); - } - public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message) { message.Data = ByteString.CopyFrom(outgoing.Data); @@ -265,19 +254,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, outgoing.Attributes[key] = value; } - protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope) + protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope) { - if (incoming.Message.Attributes is null) + if (incoming.Attributes is null) { return; } - foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value; } - protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value) { - if (incoming.Message.Attributes.TryGetValue(key, out var header)) + if (incoming.Attributes.TryGetValue(key, out var header)) { value = header; diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs index 545b7c80f..339da7f03 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs @@ -1,103 +1,103 @@ -using IntegrationTests; -using Marten; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging.Abstractions; -using JasperFx.Resources; -using Shouldly; -using Wolverine.ComplianceTests.Compliance; -using Wolverine.Marten; -using Wolverine.Runtime; -using Xunit; - -namespace Wolverine.Pubsub.Tests; - -public class DurableComplianceFixture : TransportComplianceFixture, IAsyncLifetime -{ - public DurableComplianceFixture() : base(new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver"), - 120) - { - } - - public async Task InitializeAsync() - { - var id = Guid.NewGuid().ToString(); - - OutboundAddress = new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver.{id}"); - - await SenderIs(opts => - { - opts - .UsePubsubTesting() - .AutoProvision() - .AutoPurgeOnStartup() - .EnableDeadLettering() - .EnableSystemEndpoints() - .ConfigureListeners(x => x.UseDurableInbox()) - .ConfigureSenders(x => x.UseDurableOutbox()); - - opts.Services - .AddMarten(store => - { - store.Connection(Servers.PostgresConnectionString); - store.DatabaseSchemaName = "sender"; - }) - .IntegrateWithWolverine(x => x.MessageStorageSchemaName = "sender"); - - opts.Services.AddResourceSetupOnStartup(); - }); - - await ReceiverIs(opts => - { - opts - .UsePubsubTesting() - .AutoProvision() - .AutoPurgeOnStartup() - .EnableDeadLettering() - .EnableSystemEndpoints() - .ConfigureListeners(x => x.UseDurableInbox()) - .ConfigureSenders(x => x.UseDurableOutbox()); - - opts.Services.AddMarten(store => - { - store.Connection(Servers.PostgresConnectionString); - store.DatabaseSchemaName = "receiver"; - }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "receiver"); - - opts.Services.AddResourceSetupOnStartup(); - - opts.ListenToPubsubTopic($"durable-receiver.{id}"); - }); - } - - public new async Task DisposeAsync() - { - await DisposeAsync(); - } -} - -[Collection("acceptance")] -public class DurableSendingAndReceivingCompliance : TransportCompliance -{ - [Fact] - public virtual async Task dl_mechanics() - { - throwOnAttempt(1); - throwOnAttempt(2); - throwOnAttempt(3); - - await shouldMoveToErrorQueueOnAttempt(1); - - var runtime = theReceiver.Services.GetRequiredService(); - var transport = runtime.Options.Transports.GetOrCreate(); - var dl = transport.Topics[PubsubTransport.DeadLetterName]; - - await dl.InitializeAsync(NullLogger.Instance); - - var pullResponse = await transport.SubscriberApiClient!.PullAsync( - dl.Server.Subscription.Name, - 1 - ); - - pullResponse.ReceivedMessages.ShouldNotBeEmpty(); - } -} \ No newline at end of file +// using IntegrationTests; +// using Marten; +// using Microsoft.Extensions.DependencyInjection; +// using Microsoft.Extensions.Logging.Abstractions; +// using JasperFx.Resources; +// using Shouldly; +// using Wolverine.ComplianceTests.Compliance; +// using Wolverine.Marten; +// using Wolverine.Runtime; +// using Xunit; + +// namespace Wolverine.Pubsub.Tests; + +// public class DurableComplianceFixture : TransportComplianceFixture, IAsyncLifetime +// { +// public DurableComplianceFixture() : base(new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver"), +// 120) +// { +// } + +// public async Task InitializeAsync() +// { +// var id = Guid.NewGuid().ToString(); + +// OutboundAddress = new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver.{id}"); + +// await SenderIs(opts => +// { +// opts +// .UsePubsubTesting() +// .AutoProvision() +// .AutoPurgeOnStartup() +// .EnableDeadLettering() +// .EnableSystemEndpoints() +// .ConfigureListeners(x => x.UseDurableInbox()) +// .ConfigureSenders(x => x.UseDurableOutbox()); + +// opts.Services +// .AddMarten(store => +// { +// store.Connection(Servers.PostgresConnectionString); +// store.DatabaseSchemaName = "sender"; +// }) +// .IntegrateWithWolverine(x => x.MessageStorageSchemaName = "sender"); + +// opts.Services.AddResourceSetupOnStartup(); +// }); + +// await ReceiverIs(opts => +// { +// opts +// .UsePubsubTesting() +// .AutoProvision() +// .AutoPurgeOnStartup() +// .EnableDeadLettering() +// .EnableSystemEndpoints() +// .ConfigureListeners(x => x.UseDurableInbox()) +// .ConfigureSenders(x => x.UseDurableOutbox()); + +// opts.Services.AddMarten(store => +// { +// store.Connection(Servers.PostgresConnectionString); +// store.DatabaseSchemaName = "receiver"; +// }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "receiver"); + +// opts.Services.AddResourceSetupOnStartup(); + +// opts.ListenToPubsubTopic($"durable-receiver.{id}"); +// }); +// } + +// public new async Task DisposeAsync() +// { +// await DisposeAsync(); +// } +// } + +// [Collection("acceptance")] +// public class DurableSendingAndReceivingCompliance : TransportCompliance +// { +// [Fact] +// public virtual async Task dl_mechanics() +// { +// throwOnAttempt(1); +// throwOnAttempt(2); +// throwOnAttempt(3); + +// await shouldMoveToErrorQueueOnAttempt(1); + +// var runtime = theReceiver.Services.GetRequiredService(); +// var transport = runtime.Options.Transports.GetOrCreate(); +// var dl = transport.Topics[PubsubTransport.DeadLetterName]; + +// await dl.InitializeAsync(NullLogger.Instance); + +// var pullResponse = await transport.SubscriberApiClient!.PullAsync( +// dl.Server.Subscription.Name, +// 1 +// ); + +// pullResponse.ReceivedMessages.ShouldNotBeEmpty(); +// } +// } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs b/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs index a4b555f30..dc0cf4200 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs @@ -6,8 +6,7 @@ namespace Wolverine.Pubsub; /// /// Pluggable strategy for reading and writing data to Google Cloud Platform Pub/Sub /// -public interface IPubsubEnvelopeMapper : IEnvelopeMapper +public interface IPubsubEnvelopeMapper : IEnvelopeMapper { - void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming); void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message); } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs index 11beed826..1b1a9fcaf 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs @@ -1,7 +1,4 @@ -using Google.Api.Gax.Grpc; using Google.Cloud.PubSub.V1; -using JasperFx.Blocks; -using Microsoft.Extensions.Logging; using Wolverine.Runtime; using Wolverine.Transports; @@ -20,50 +17,35 @@ IWolverineRuntime runtime public override async Task StartAsync() { - if (_transport.SubscriberApiClient is null) + await listenForMessagesAsync(async () => { - throw new WolverinePubsubTransportNotConnectedException(); - } - - using var streamingPull = - _transport.SubscriberApiClient.StreamingPull(CallSettings.FromCancellationToken(_cancellation.Token)); - - await streamingPull.WriteAsync(new StreamingPullRequest - { - SubscriptionAsSubscriptionName = _endpoint.Server.Subscription.Name, - StreamAckDeadlineSeconds = 20, - MaxOutstandingMessages = _endpoint.Client.MaxOutstandingMessages, - MaxOutstandingBytes = _endpoint.Client.MaxOutstandingByteCount - }); - - await using var stream = streamingPull.GetResponseStream(); - - _acknowledge = new RetryBlock((ackIds, _) => streamingPull.WriteAsync(new StreamingPullRequest - { - AckIds = { ackIds } - }), _logger, _runtime.Cancellation); - - try - { - await listenForMessagesAsync(async () => + var subscriptionName = _endpoint.Server.Subscription.Name; + await using SubscriberClient subscriber = await new SubscriberClientBuilder { - while (await stream.MoveNextAsync(_cancellation.Token)) + SubscriptionName = subscriptionName, + EmulatorDetection = _transport.EmulatorDetection, + Settings = new() { - await handleMessagesAsync(stream.Current.ReceivedMessages); + // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriberClient.Settings#Google_Cloud_PubSub_V1_SubscriberClient_Settings_FlowControlSettings + // Remarks: Flow control uses these settings for two purposes: fetching messages to process, and processing them. + // In terms of fetching messages, a single SubscriberClient creates multiple instances of SubscriberServiceApiClient, and each will observe the flow control settings independently + FlowControlSettings = new(_endpoint.Client.MaxOutstandingMessages, _endpoint.Client.MaxOutstandingByteCount), } - }); - } - finally - { + }.BuildAsync(); + var ctRegistration = _cancellation.Token.Register(() => subscriber.StopAsync(CancellationToken.None)); try { - await streamingPull.WriteCompleteAsync(); + await subscriber.StartAsync(async (PubsubMessage message, CancellationToken cancel) => + { + var success = await handleMessageAsync(message); + return success ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack; + }); } - catch (Exception ex) + finally { - _logger.LogError(ex, "{Uri}: Error while completing the Google Cloud Platform Pub/Sub streaming pull.", - _endpoint.Uri); + ctRegistration.Unregister(); + await subscriber.StopAsync(CancellationToken.None); } - } + }); } } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs index 0f5ecd0a1..11ce11bb3 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs @@ -1,3 +1,4 @@ +using Google.Cloud.PubSub.V1; using Wolverine.Runtime; using Wolverine.Transports; @@ -12,25 +13,33 @@ public InlinePubsubListener( IWolverineRuntime runtime ) : base(endpoint, transport, receiver, runtime) { - + } public override async Task StartAsync() { await listenForMessagesAsync(async () => { - if (_transport.SubscriberApiClient is null) + var subscriptionName = _endpoint.Server.Subscription.Name; + await using SubscriberClient subscriber = await new SubscriberClientBuilder + { + SubscriptionName = subscriptionName, + EmulatorDetection = _transport.EmulatorDetection, + }.BuildAsync(); + var ctRegistration = _cancellation.Token.Register(() => subscriber.StopAsync(CancellationToken.None)); + try { - throw new WolverinePubsubTransportNotConnectedException(); + await subscriber.StartAsync(async (PubsubMessage message, CancellationToken cancel) => + { + var success = await handleMessageAsync(message); + return success ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack; + }); + } + finally + { + ctRegistration.Unregister(); + await subscriber.StopAsync(CancellationToken.None); } - - var response = await _transport.SubscriberApiClient.PullAsync( - _endpoint.Server.Subscription.Name, - 1, - _cancellation.Token - ); - - await handleMessagesAsync(response.ReceivedMessages); }); } } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs index 8bdd70cc5..d23a9bf5a 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs @@ -6,7 +6,7 @@ namespace Wolverine.Pubsub.Internal; -public class PubsubEnvelopeMapper : EnvelopeMapper, IPubsubEnvelopeMapper +public class PubsubEnvelopeMapper : EnvelopeMapper, IPubsubEnvelopeMapper { public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) { @@ -14,12 +14,12 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) e => e.Data!, (e, m) => { - if (m.Message.Data.IsEmpty) + if (m.Data.IsEmpty) { return; } - e.Data = m.Message.Data.ToByteArray(); + e.Data = m.Data.ToByteArray(); }, (e, m) => { @@ -31,19 +31,12 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) m.Data = ByteString.CopyFrom(e.Data); } ); - + MapPropertyToHeader(x => x.GroupId, "group-id"); MapPropertyToHeader(x => x.DeduplicationId, "deduplication-id"); MapPropertyToHeader(x => x.PartitionKey, "partition-key"); } - public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming) - { - envelope.AckId = incoming.AckId; - - base.MapIncomingToEnvelope(envelope, incoming); - } - public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message) { message.Data = ByteString.CopyFrom(outgoing.Data); @@ -56,19 +49,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, outgoing.Attributes[key] = value; } - protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope) + protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope) { - if (incoming.Message.Attributes is null) + if (incoming.Attributes is null) { return; } - foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value; } - protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value) { - if (incoming.Message.Attributes.TryGetValue(key, out var header)) + if (incoming.Attributes.TryGetValue(key, out var header)) { value = header; diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs index 6b90ee4e2..3ce12f643 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs @@ -13,7 +13,6 @@ namespace Wolverine.Pubsub.Internal; public abstract class PubsubListener : IListener, ISupportDeadLetterQueue { protected readonly CancellationTokenSource _cancellation = new(); - protected readonly RetryBlock _complete; protected readonly RetryBlock _deadLetter; protected readonly PubsubEndpoint? _deadLetterTopic; protected readonly PubsubEndpoint _endpoint; @@ -23,7 +22,6 @@ public abstract class PubsubListener : IListener, ISupportDeadLetterQueue protected readonly IWolverineRuntime _runtime; protected readonly PubsubTransport _transport; - protected RetryBlock _acknowledge; protected Task _task; private readonly IPubsubEnvelopeMapper _mapper; @@ -54,70 +52,20 @@ IWolverineRuntime runtime NativeDeadLetterQueueEnabled = true; } - _acknowledge = new RetryBlock(async (ackIds, _) => - { - if (transport.SubscriberApiClient is null) - { - throw new WolverinePubsubTransportNotConnectedException(); - } - - if (ackIds.Any()) - { - await transport.SubscriberApiClient.AcknowledgeAsync( - _endpoint.Server.Subscription.Name, - ackIds - ); - } - }, _logger, runtime.Cancellation); - _deadLetter = new RetryBlock(async (e, _) => { if (_deadLetterTopic is null) { return; } - - if (e is PubsubEnvelope pubsubEnvelope) - { - await _acknowledge.PostAsync([pubsubEnvelope.AckId]); - } - await _deadLetterTopic.SendMessageAsync(e, _logger); }, _logger, runtime.Cancellation); _requeue = new RetryBlock(async (e, _) => { - if (e is PubsubEnvelope pubsubEnvelope) - { - await _acknowledge.PostAsync([pubsubEnvelope.AckId]); - } - await _endpoint.SendMessageAsync(e, _logger); }, _logger, runtime.Cancellation); - _complete = new RetryBlock(async (envelopes, _) => - { - var pubsubEnvelopes = envelopes.OfType().ToArray(); - - if (!pubsubEnvelopes.Any()) - { - return; - } - - if (transport.SubscriberApiClient is null) - { - throw new WolverinePubsubTransportNotConnectedException(); - } - - var ackIds = pubsubEnvelopes - .Select(e => e.AckId) - .Where(x => !string.IsNullOrEmpty(x)) - .Distinct() - .ToArray(); - - await _acknowledge.PostAsync(ackIds); - }, _logger, _cancellation.Token); - _task = StartAsync(); } @@ -125,9 +73,9 @@ await transport.SubscriberApiClient.AcknowledgeAsync( public IHandlerPipeline? Pipeline => _receiver.Pipeline; - public async ValueTask CompleteAsync(Envelope envelope) + public ValueTask CompleteAsync(Envelope envelope) { - await _complete.PostAsync([envelope]); + return ValueTask.CompletedTask; } public async ValueTask DeferAsync(Envelope envelope) @@ -153,7 +101,6 @@ public ValueTask DisposeAsync() { _cancellation.Cancel(); _task.SafeDispose(); - _complete.SafeDispose(); _requeue.SafeDispose(); _deadLetter.SafeDispose(); @@ -232,61 +179,36 @@ protected async Task listenForMessagesAsync(Func listenAsync) } } - protected async Task handleMessagesAsync(RepeatedField messages) + protected async Task handleMessageAsync(PubsubMessage message) { - var envelopes = new List(messages.Count); - foreach (var message in messages) + if (message.Attributes.Keys.Contains("batched")) { - if (message.Message.Attributes.Keys.Contains("batched")) - { - var batched = EnvelopeSerializer.ReadMany(message.Message.Data.ToByteArray()); - - if (batched.Any()) - { - await _receiver.ReceivedAsync(this, batched); - } - - await _acknowledge.PostAsync([message.AckId]); + var batched = EnvelopeSerializer.ReadMany(message.Data.ToByteArray()); - continue; + if (batched.Any()) + { + await _receiver.ReceivedAsync(this, batched); } - try - { - var envelope = new PubsubEnvelope(); + return true; + } - _mapper.MapIncomingToEnvelope(envelope, message); + try + { + var envelope = new Envelope(); - if (envelope.IsPing()) - { - try - { - await _complete.PostAsync([envelope]); - } - catch (Exception ex) - { - _logger.LogError(ex, - "{Uri}: Error while acknowledging Google Cloud Platform Pub/Sub ping message \"{AckId}\".", - _endpoint.Uri, message.AckId); - } - - continue; - } + _mapper.MapIncomingToEnvelope(envelope, message); - envelopes.Add(envelope); - } - catch (Exception ex) - { - _logger.LogError(ex, "{Uri}: Error while mapping Google Cloud Platform Pub/Sub message {AckId}.", - _endpoint.Uri, message.AckId); - } - } + await _receiver.ReceivedAsync(this, [envelope]); - if (envelopes.Any()) + return true; + } + catch (Exception ex) { - await _receiver.ReceivedAsync(this, envelopes.ToArray()); - await _complete.PostAsync(envelopes.ToArray()); + _logger.LogError(ex, "{Uri}: Error while mapping Google Cloud Platform Pub/Sub message {MessageId}.", _endpoint.Uri, message.MessageId); + return false; } } + } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs index 07ba3645d..8dce60045 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs @@ -281,7 +281,7 @@ public override async ValueTask InitializeAsync(ILogger logger) public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) { EnvelopeMapper ??= BuildMapper(runtime); - + if (Mode == EndpointMode.Inline) { return ValueTask.FromResult(new InlinePubsubListener( @@ -303,7 +303,7 @@ public override ValueTask BuildListenerAsync(IWolverineRuntime runtim public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISender? deadLetterSender) { EnvelopeMapper ??= BuildMapper(runtime); - + if (DeadLetterName.IsNotEmpty()) { var initialized = _transport.Topics.Contains(DeadLetterName); @@ -344,6 +344,7 @@ internal async Task SendMessageAsync(Envelope envelope, ILogger logger) var message = new PubsubMessage(); var orderBy = Server.Topic.OrderBy(envelope); + EnvelopeMapper ??= new PubsubEnvelopeMapper(this); EnvelopeMapper.MapEnvelopeToOutgoing(envelope, message); message.OrderingKey = envelope.GroupId ?? orderBy ?? message.OrderingKey; @@ -381,7 +382,7 @@ internal void ConfigureDeadLetter(Action configure) protected override ISender CreateSender(IWolverineRuntime runtime) { EnvelopeMapper ??= BuildMapper(runtime); - + if (_transport.PublisherApiClient is null) { throw new WolverinePubsubTransportNotConnectedException(); diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs deleted file mode 100644 index 6ccc354e2..000000000 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Wolverine.Pubsub; - -public class PubsubEnvelope : Envelope -{ - public string AckId = string.Empty; -} \ No newline at end of file From ecf326da3c5069bc6676bf49b46d57de447fdb25 Mon Sep 17 00:00:00 2001 From: Daniel Winkler Date: Mon, 20 Oct 2025 19:54:09 +0200 Subject: [PATCH 2/2] uncomment commented durable tests --- .../DurableSendingAndReceivingCompliance.cs | 206 +++++++++--------- 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs index 339da7f03..545b7c80f 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/DurableSendingAndReceivingCompliance.cs @@ -1,103 +1,103 @@ -// using IntegrationTests; -// using Marten; -// using Microsoft.Extensions.DependencyInjection; -// using Microsoft.Extensions.Logging.Abstractions; -// using JasperFx.Resources; -// using Shouldly; -// using Wolverine.ComplianceTests.Compliance; -// using Wolverine.Marten; -// using Wolverine.Runtime; -// using Xunit; - -// namespace Wolverine.Pubsub.Tests; - -// public class DurableComplianceFixture : TransportComplianceFixture, IAsyncLifetime -// { -// public DurableComplianceFixture() : base(new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver"), -// 120) -// { -// } - -// public async Task InitializeAsync() -// { -// var id = Guid.NewGuid().ToString(); - -// OutboundAddress = new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver.{id}"); - -// await SenderIs(opts => -// { -// opts -// .UsePubsubTesting() -// .AutoProvision() -// .AutoPurgeOnStartup() -// .EnableDeadLettering() -// .EnableSystemEndpoints() -// .ConfigureListeners(x => x.UseDurableInbox()) -// .ConfigureSenders(x => x.UseDurableOutbox()); - -// opts.Services -// .AddMarten(store => -// { -// store.Connection(Servers.PostgresConnectionString); -// store.DatabaseSchemaName = "sender"; -// }) -// .IntegrateWithWolverine(x => x.MessageStorageSchemaName = "sender"); - -// opts.Services.AddResourceSetupOnStartup(); -// }); - -// await ReceiverIs(opts => -// { -// opts -// .UsePubsubTesting() -// .AutoProvision() -// .AutoPurgeOnStartup() -// .EnableDeadLettering() -// .EnableSystemEndpoints() -// .ConfigureListeners(x => x.UseDurableInbox()) -// .ConfigureSenders(x => x.UseDurableOutbox()); - -// opts.Services.AddMarten(store => -// { -// store.Connection(Servers.PostgresConnectionString); -// store.DatabaseSchemaName = "receiver"; -// }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "receiver"); - -// opts.Services.AddResourceSetupOnStartup(); - -// opts.ListenToPubsubTopic($"durable-receiver.{id}"); -// }); -// } - -// public new async Task DisposeAsync() -// { -// await DisposeAsync(); -// } -// } - -// [Collection("acceptance")] -// public class DurableSendingAndReceivingCompliance : TransportCompliance -// { -// [Fact] -// public virtual async Task dl_mechanics() -// { -// throwOnAttempt(1); -// throwOnAttempt(2); -// throwOnAttempt(3); - -// await shouldMoveToErrorQueueOnAttempt(1); - -// var runtime = theReceiver.Services.GetRequiredService(); -// var transport = runtime.Options.Transports.GetOrCreate(); -// var dl = transport.Topics[PubsubTransport.DeadLetterName]; - -// await dl.InitializeAsync(NullLogger.Instance); - -// var pullResponse = await transport.SubscriberApiClient!.PullAsync( -// dl.Server.Subscription.Name, -// 1 -// ); - -// pullResponse.ReceivedMessages.ShouldNotBeEmpty(); -// } -// } \ No newline at end of file +using IntegrationTests; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using JasperFx.Resources; +using Shouldly; +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Marten; +using Wolverine.Runtime; +using Xunit; + +namespace Wolverine.Pubsub.Tests; + +public class DurableComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + public DurableComplianceFixture() : base(new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver"), + 120) + { + } + + public async Task InitializeAsync() + { + var id = Guid.NewGuid().ToString(); + + OutboundAddress = new Uri($"{PubsubTransport.ProtocolName}://wolverine/durable-receiver.{id}"); + + await SenderIs(opts => + { + opts + .UsePubsubTesting() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLettering() + .EnableSystemEndpoints() + .ConfigureListeners(x => x.UseDurableInbox()) + .ConfigureSenders(x => x.UseDurableOutbox()); + + opts.Services + .AddMarten(store => + { + store.Connection(Servers.PostgresConnectionString); + store.DatabaseSchemaName = "sender"; + }) + .IntegrateWithWolverine(x => x.MessageStorageSchemaName = "sender"); + + opts.Services.AddResourceSetupOnStartup(); + }); + + await ReceiverIs(opts => + { + opts + .UsePubsubTesting() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLettering() + .EnableSystemEndpoints() + .ConfigureListeners(x => x.UseDurableInbox()) + .ConfigureSenders(x => x.UseDurableOutbox()); + + opts.Services.AddMarten(store => + { + store.Connection(Servers.PostgresConnectionString); + store.DatabaseSchemaName = "receiver"; + }).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "receiver"); + + opts.Services.AddResourceSetupOnStartup(); + + opts.ListenToPubsubTopic($"durable-receiver.{id}"); + }); + } + + public new async Task DisposeAsync() + { + await DisposeAsync(); + } +} + +[Collection("acceptance")] +public class DurableSendingAndReceivingCompliance : TransportCompliance +{ + [Fact] + public virtual async Task dl_mechanics() + { + throwOnAttempt(1); + throwOnAttempt(2); + throwOnAttempt(3); + + await shouldMoveToErrorQueueOnAttempt(1); + + var runtime = theReceiver.Services.GetRequiredService(); + var transport = runtime.Options.Transports.GetOrCreate(); + var dl = transport.Topics[PubsubTransport.DeadLetterName]; + + await dl.InitializeAsync(NullLogger.Instance); + + var pullResponse = await transport.SubscriberApiClient!.PullAsync( + dl.Server.Subscription.Name, + 1 + ); + + pullResponse.ReceivedMessages.ShouldNotBeEmpty(); + } +} \ No newline at end of file