diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs index edf42994a..279b2d226 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs @@ -238,23 +238,12 @@ public async Task customize_mappers() #region sample_custom_pubsub_mapper -public class CustomPubsubMapper : EnvelopeMapper, IPubsubEnvelopeMapper +public class CustomPubsubMapper : EnvelopeMapper, IPubsubEnvelopeMapper { public CustomPubsubMapper(PubsubEndpoint endpoint) : base(endpoint) { } - public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming) - { - envelope.AckId = incoming.AckId; - - // You will have to help Wolverine out by either telling Wolverine - // what the message type is, or by reading the actual message object, - // or by telling Wolverine separately what the default message type - // is for a listening endpoint - envelope.MessageType = typeof(Message1).ToMessageTypeName(); - } - public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message) { message.Data = ByteString.CopyFrom(outgoing.Data); @@ -265,19 +254,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, outgoing.Attributes[key] = value; } - protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope) + protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope) { - if (incoming.Message.Attributes is null) + if (incoming.Attributes is null) { return; } - foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value; } - protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value) { - if (incoming.Message.Attributes.TryGetValue(key, out var header)) + if (incoming.Attributes.TryGetValue(key, out var header)) { value = header; diff --git a/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs b/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs index a4b555f30..dc0cf4200 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs @@ -6,8 +6,7 @@ namespace Wolverine.Pubsub; /// /// Pluggable strategy for reading and writing data to Google Cloud Platform Pub/Sub /// -public interface IPubsubEnvelopeMapper : IEnvelopeMapper +public interface IPubsubEnvelopeMapper : IEnvelopeMapper { - void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming); void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message); } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs index 11beed826..1b1a9fcaf 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs @@ -1,7 +1,4 @@ -using Google.Api.Gax.Grpc; using Google.Cloud.PubSub.V1; -using JasperFx.Blocks; -using Microsoft.Extensions.Logging; using Wolverine.Runtime; using Wolverine.Transports; @@ -20,50 +17,35 @@ IWolverineRuntime runtime public override async Task StartAsync() { - if (_transport.SubscriberApiClient is null) + await listenForMessagesAsync(async () => { - throw new WolverinePubsubTransportNotConnectedException(); - } - - using var streamingPull = - _transport.SubscriberApiClient.StreamingPull(CallSettings.FromCancellationToken(_cancellation.Token)); - - await streamingPull.WriteAsync(new StreamingPullRequest - { - SubscriptionAsSubscriptionName = _endpoint.Server.Subscription.Name, - StreamAckDeadlineSeconds = 20, - MaxOutstandingMessages = _endpoint.Client.MaxOutstandingMessages, - MaxOutstandingBytes = _endpoint.Client.MaxOutstandingByteCount - }); - - await using var stream = streamingPull.GetResponseStream(); - - _acknowledge = new RetryBlock((ackIds, _) => streamingPull.WriteAsync(new StreamingPullRequest - { - AckIds = { ackIds } - }), _logger, _runtime.Cancellation); - - try - { - await listenForMessagesAsync(async () => + var subscriptionName = _endpoint.Server.Subscription.Name; + await using SubscriberClient subscriber = await new SubscriberClientBuilder { - while (await stream.MoveNextAsync(_cancellation.Token)) + SubscriptionName = subscriptionName, + EmulatorDetection = _transport.EmulatorDetection, + Settings = new() { - await handleMessagesAsync(stream.Current.ReceivedMessages); + // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriberClient.Settings#Google_Cloud_PubSub_V1_SubscriberClient_Settings_FlowControlSettings + // Remarks: Flow control uses these settings for two purposes: fetching messages to process, and processing them. + // In terms of fetching messages, a single SubscriberClient creates multiple instances of SubscriberServiceApiClient, and each will observe the flow control settings independently + FlowControlSettings = new(_endpoint.Client.MaxOutstandingMessages, _endpoint.Client.MaxOutstandingByteCount), } - }); - } - finally - { + }.BuildAsync(); + var ctRegistration = _cancellation.Token.Register(() => subscriber.StopAsync(CancellationToken.None)); try { - await streamingPull.WriteCompleteAsync(); + await subscriber.StartAsync(async (PubsubMessage message, CancellationToken cancel) => + { + var success = await handleMessageAsync(message); + return success ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack; + }); } - catch (Exception ex) + finally { - _logger.LogError(ex, "{Uri}: Error while completing the Google Cloud Platform Pub/Sub streaming pull.", - _endpoint.Uri); + ctRegistration.Unregister(); + await subscriber.StopAsync(CancellationToken.None); } - } + }); } } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs index 0f5ecd0a1..11ce11bb3 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs @@ -1,3 +1,4 @@ +using Google.Cloud.PubSub.V1; using Wolverine.Runtime; using Wolverine.Transports; @@ -12,25 +13,33 @@ public InlinePubsubListener( IWolverineRuntime runtime ) : base(endpoint, transport, receiver, runtime) { - + } public override async Task StartAsync() { await listenForMessagesAsync(async () => { - if (_transport.SubscriberApiClient is null) + var subscriptionName = _endpoint.Server.Subscription.Name; + await using SubscriberClient subscriber = await new SubscriberClientBuilder + { + SubscriptionName = subscriptionName, + EmulatorDetection = _transport.EmulatorDetection, + }.BuildAsync(); + var ctRegistration = _cancellation.Token.Register(() => subscriber.StopAsync(CancellationToken.None)); + try { - throw new WolverinePubsubTransportNotConnectedException(); + await subscriber.StartAsync(async (PubsubMessage message, CancellationToken cancel) => + { + var success = await handleMessageAsync(message); + return success ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack; + }); + } + finally + { + ctRegistration.Unregister(); + await subscriber.StopAsync(CancellationToken.None); } - - var response = await _transport.SubscriberApiClient.PullAsync( - _endpoint.Server.Subscription.Name, - 1, - _cancellation.Token - ); - - await handleMessagesAsync(response.ReceivedMessages); }); } } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs index 8bdd70cc5..d23a9bf5a 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs @@ -6,7 +6,7 @@ namespace Wolverine.Pubsub.Internal; -public class PubsubEnvelopeMapper : EnvelopeMapper, IPubsubEnvelopeMapper +public class PubsubEnvelopeMapper : EnvelopeMapper, IPubsubEnvelopeMapper { public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) { @@ -14,12 +14,12 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) e => e.Data!, (e, m) => { - if (m.Message.Data.IsEmpty) + if (m.Data.IsEmpty) { return; } - e.Data = m.Message.Data.ToByteArray(); + e.Data = m.Data.ToByteArray(); }, (e, m) => { @@ -31,19 +31,12 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) m.Data = ByteString.CopyFrom(e.Data); } ); - + MapPropertyToHeader(x => x.GroupId, "group-id"); MapPropertyToHeader(x => x.DeduplicationId, "deduplication-id"); MapPropertyToHeader(x => x.PartitionKey, "partition-key"); } - public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming) - { - envelope.AckId = incoming.AckId; - - base.MapIncomingToEnvelope(envelope, incoming); - } - public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message) { message.Data = ByteString.CopyFrom(outgoing.Data); @@ -56,19 +49,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, outgoing.Attributes[key] = value; } - protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope) + protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope) { - if (incoming.Message.Attributes is null) + if (incoming.Attributes is null) { return; } - foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value; } - protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value) { - if (incoming.Message.Attributes.TryGetValue(key, out var header)) + if (incoming.Attributes.TryGetValue(key, out var header)) { value = header; diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs index 6b90ee4e2..3ce12f643 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs @@ -13,7 +13,6 @@ namespace Wolverine.Pubsub.Internal; public abstract class PubsubListener : IListener, ISupportDeadLetterQueue { protected readonly CancellationTokenSource _cancellation = new(); - protected readonly RetryBlock _complete; protected readonly RetryBlock _deadLetter; protected readonly PubsubEndpoint? _deadLetterTopic; protected readonly PubsubEndpoint _endpoint; @@ -23,7 +22,6 @@ public abstract class PubsubListener : IListener, ISupportDeadLetterQueue protected readonly IWolverineRuntime _runtime; protected readonly PubsubTransport _transport; - protected RetryBlock _acknowledge; protected Task _task; private readonly IPubsubEnvelopeMapper _mapper; @@ -54,70 +52,20 @@ IWolverineRuntime runtime NativeDeadLetterQueueEnabled = true; } - _acknowledge = new RetryBlock(async (ackIds, _) => - { - if (transport.SubscriberApiClient is null) - { - throw new WolverinePubsubTransportNotConnectedException(); - } - - if (ackIds.Any()) - { - await transport.SubscriberApiClient.AcknowledgeAsync( - _endpoint.Server.Subscription.Name, - ackIds - ); - } - }, _logger, runtime.Cancellation); - _deadLetter = new RetryBlock(async (e, _) => { if (_deadLetterTopic is null) { return; } - - if (e is PubsubEnvelope pubsubEnvelope) - { - await _acknowledge.PostAsync([pubsubEnvelope.AckId]); - } - await _deadLetterTopic.SendMessageAsync(e, _logger); }, _logger, runtime.Cancellation); _requeue = new RetryBlock(async (e, _) => { - if (e is PubsubEnvelope pubsubEnvelope) - { - await _acknowledge.PostAsync([pubsubEnvelope.AckId]); - } - await _endpoint.SendMessageAsync(e, _logger); }, _logger, runtime.Cancellation); - _complete = new RetryBlock(async (envelopes, _) => - { - var pubsubEnvelopes = envelopes.OfType().ToArray(); - - if (!pubsubEnvelopes.Any()) - { - return; - } - - if (transport.SubscriberApiClient is null) - { - throw new WolverinePubsubTransportNotConnectedException(); - } - - var ackIds = pubsubEnvelopes - .Select(e => e.AckId) - .Where(x => !string.IsNullOrEmpty(x)) - .Distinct() - .ToArray(); - - await _acknowledge.PostAsync(ackIds); - }, _logger, _cancellation.Token); - _task = StartAsync(); } @@ -125,9 +73,9 @@ await transport.SubscriberApiClient.AcknowledgeAsync( public IHandlerPipeline? Pipeline => _receiver.Pipeline; - public async ValueTask CompleteAsync(Envelope envelope) + public ValueTask CompleteAsync(Envelope envelope) { - await _complete.PostAsync([envelope]); + return ValueTask.CompletedTask; } public async ValueTask DeferAsync(Envelope envelope) @@ -153,7 +101,6 @@ public ValueTask DisposeAsync() { _cancellation.Cancel(); _task.SafeDispose(); - _complete.SafeDispose(); _requeue.SafeDispose(); _deadLetter.SafeDispose(); @@ -232,61 +179,36 @@ protected async Task listenForMessagesAsync(Func listenAsync) } } - protected async Task handleMessagesAsync(RepeatedField messages) + protected async Task handleMessageAsync(PubsubMessage message) { - var envelopes = new List(messages.Count); - foreach (var message in messages) + if (message.Attributes.Keys.Contains("batched")) { - if (message.Message.Attributes.Keys.Contains("batched")) - { - var batched = EnvelopeSerializer.ReadMany(message.Message.Data.ToByteArray()); - - if (batched.Any()) - { - await _receiver.ReceivedAsync(this, batched); - } - - await _acknowledge.PostAsync([message.AckId]); + var batched = EnvelopeSerializer.ReadMany(message.Data.ToByteArray()); - continue; + if (batched.Any()) + { + await _receiver.ReceivedAsync(this, batched); } - try - { - var envelope = new PubsubEnvelope(); + return true; + } - _mapper.MapIncomingToEnvelope(envelope, message); + try + { + var envelope = new Envelope(); - if (envelope.IsPing()) - { - try - { - await _complete.PostAsync([envelope]); - } - catch (Exception ex) - { - _logger.LogError(ex, - "{Uri}: Error while acknowledging Google Cloud Platform Pub/Sub ping message \"{AckId}\".", - _endpoint.Uri, message.AckId); - } - - continue; - } + _mapper.MapIncomingToEnvelope(envelope, message); - envelopes.Add(envelope); - } - catch (Exception ex) - { - _logger.LogError(ex, "{Uri}: Error while mapping Google Cloud Platform Pub/Sub message {AckId}.", - _endpoint.Uri, message.AckId); - } - } + await _receiver.ReceivedAsync(this, [envelope]); - if (envelopes.Any()) + return true; + } + catch (Exception ex) { - await _receiver.ReceivedAsync(this, envelopes.ToArray()); - await _complete.PostAsync(envelopes.ToArray()); + _logger.LogError(ex, "{Uri}: Error while mapping Google Cloud Platform Pub/Sub message {MessageId}.", _endpoint.Uri, message.MessageId); + return false; } } + } \ No newline at end of file diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs index 07ba3645d..8dce60045 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs @@ -281,7 +281,7 @@ public override async ValueTask InitializeAsync(ILogger logger) public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) { EnvelopeMapper ??= BuildMapper(runtime); - + if (Mode == EndpointMode.Inline) { return ValueTask.FromResult(new InlinePubsubListener( @@ -303,7 +303,7 @@ public override ValueTask BuildListenerAsync(IWolverineRuntime runtim public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISender? deadLetterSender) { EnvelopeMapper ??= BuildMapper(runtime); - + if (DeadLetterName.IsNotEmpty()) { var initialized = _transport.Topics.Contains(DeadLetterName); @@ -344,6 +344,7 @@ internal async Task SendMessageAsync(Envelope envelope, ILogger logger) var message = new PubsubMessage(); var orderBy = Server.Topic.OrderBy(envelope); + EnvelopeMapper ??= new PubsubEnvelopeMapper(this); EnvelopeMapper.MapEnvelopeToOutgoing(envelope, message); message.OrderingKey = envelope.GroupId ?? orderBy ?? message.OrderingKey; @@ -381,7 +382,7 @@ internal void ConfigureDeadLetter(Action configure) protected override ISender CreateSender(IWolverineRuntime runtime) { EnvelopeMapper ??= BuildMapper(runtime); - + if (_transport.PublisherApiClient is null) { throw new WolverinePubsubTransportNotConnectedException(); diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs deleted file mode 100644 index 6ccc354e2..000000000 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubEnvelope.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Wolverine.Pubsub; - -public class PubsubEnvelope : Envelope -{ - public string AckId = string.Empty; -} \ No newline at end of file