diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs index edf42994a..9409c05b6 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs @@ -238,15 +238,15 @@ public async Task customize_mappers() #region sample_custom_pubsub_mapper -public class CustomPubsubMapper : EnvelopeMapper, IPubsubEnvelopeMapper +public class CustomPubsubMapper : EnvelopeMapper, IPubsubEnvelopeMapper { public CustomPubsubMapper(PubsubEndpoint endpoint) : base(endpoint) { } - public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming) + public void MapIncomingToEnvelope(PubsubEnvelope envelope, PubsubMessage incoming) { - envelope.AckId = incoming.AckId; + envelope.AckId = incoming.MessageId; // You will have to help Wolverine out by either telling Wolverine // what the message type is, or by reading the actual message object, @@ -264,20 +264,21 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, { outgoing.Attributes[key] = value; } + - protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope) + protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope) { - if (incoming.Message.Attributes is null) + if (incoming.Attributes is null) { return; } - foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value; } - protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value) { - if (incoming.Message.Attributes.TryGetValue(key, out var header)) + if (incoming.Attributes.TryGetValue(key, out var header)) { value = header; diff --git a/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs b/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs index a4b555f30..ad3cbf767 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs @@ -6,8 +6,8 @@ namespace Wolverine.Pubsub; /// /// Pluggable strategy for reading and writing data to Google Cloud Platform Pub/Sub /// -public interface IPubsubEnvelopeMapper : IEnvelopeMapper +public interface IPubsubEnvelopeMapper : IEnvelopeMapper { - void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming); + void MapIncomingToEnvelope(PubsubEnvelope envelope, PubsubMessage incoming); void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message); -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs index 11beed826..2f1e12a51 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/BatchedPubsubListener.cs @@ -1,6 +1,5 @@ -using Google.Api.Gax.Grpc; +using Google.Api.Gax; using Google.Cloud.PubSub.V1; -using JasperFx.Blocks; using Microsoft.Extensions.Logging; using Wolverine.Runtime; using Wolverine.Transports; @@ -25,39 +24,47 @@ public override async Task StartAsync() throw new WolverinePubsubTransportNotConnectedException(); } - using var streamingPull = - _transport.SubscriberApiClient.StreamingPull(CallSettings.FromCancellationToken(_cancellation.Token)); - - await streamingPull.WriteAsync(new StreamingPullRequest + // Create a high-level SubscriberClient for receiving messages which may + // use multiple underlying streaming pull connections. + var subscriberClientBuilder = new SubscriberClientBuilder() { - SubscriptionAsSubscriptionName = _endpoint.Server.Subscription.Name, - StreamAckDeadlineSeconds = 20, - MaxOutstandingMessages = _endpoint.Client.MaxOutstandingMessages, - MaxOutstandingBytes = _endpoint.Client.MaxOutstandingByteCount - }); + EmulatorDetection = _transport.EmulatorDetection, + SubscriptionName = _endpoint.Server.Subscription.Name, + Settings = new SubscriberClient.Settings + { + AckDeadline = TimeSpan.FromSeconds(20), + MaxTotalAckExtension = TimeSpan.FromMinutes(10), + FlowControlSettings = new FlowControlSettings( + _endpoint.Client.MaxOutstandingMessages, + _endpoint.Client.MaxOutstandingByteCount + ) + } + }; - await using var stream = streamingPull.GetResponseStream(); - - _acknowledge = new RetryBlock((ackIds, _) => streamingPull.WriteAsync(new StreamingPullRequest - { - AckIds = { ackIds } - }), _logger, _runtime.Cancellation); + await using var subscriberClient = await subscriberClientBuilder.BuildAsync(); try { - await listenForMessagesAsync(async () => + // Start the subscriber and capture the lifetime task + var subscriberLifetime = subscriberClient.StartAsync(async (msg, ct) => { - while (await stream.MoveNextAsync(_cancellation.Token)) - { - await handleMessagesAsync(stream.Current.ReceivedMessages); - } + await handleMessagesAsync(msg); + return SubscriberClient.Reply.Ack; }); + + // Wait for whatever condition you have for running (your helper can return the lifetime task) + await listenForMessagesAsync(() => subscriberLifetime); + + // When listenForMessagesAsync returns, request a graceful stop } finally { try { - await streamingPull.WriteCompleteAsync(); + await subscriberClient.StopAsync(TimeSpan.FromSeconds(15)); + // Ensure the StartAsync task has completed and observe any exceptions + // (if subscriberLifetime had an exception it will be rethrown here) + // Note: if you need the variable here, capture it in an outer scope. } catch (Exception ex) { @@ -66,4 +73,4 @@ await listenForMessagesAsync(async () => } } } -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs index 0f5ecd0a1..47f987db3 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/InlinePubsubListener.cs @@ -12,7 +12,7 @@ public InlinePubsubListener( IWolverineRuntime runtime ) : base(endpoint, transport, receiver, runtime) { - + } public override async Task StartAsync() @@ -30,7 +30,7 @@ await listenForMessagesAsync(async () => _cancellation.Token ); - await handleMessagesAsync(response.ReceivedMessages); + await handleMessagesAsync(response.ReceivedMessages[0].Message); }); } -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs index 8bdd70cc5..16aeb3ebc 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubEnvelopeMapper.cs @@ -1,12 +1,11 @@ using Google.Cloud.PubSub.V1; using Google.Protobuf; using ImTools; -using JasperFx.Core; using Wolverine.Transports; namespace Wolverine.Pubsub.Internal; -public class PubsubEnvelopeMapper : EnvelopeMapper, IPubsubEnvelopeMapper +public class PubsubEnvelopeMapper : EnvelopeMapper, IPubsubEnvelopeMapper { public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) { @@ -14,12 +13,12 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) e => e.Data!, (e, m) => { - if (m.Message.Data.IsEmpty) + if (m.Data.IsEmpty) { return; } - e.Data = m.Message.Data.ToByteArray(); + e.Data = m.Data.ToByteArray(); }, (e, m) => { @@ -31,15 +30,15 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint) m.Data = ByteString.CopyFrom(e.Data); } ); - + MapPropertyToHeader(x => x.GroupId, "group-id"); MapPropertyToHeader(x => x.DeduplicationId, "deduplication-id"); MapPropertyToHeader(x => x.PartitionKey, "partition-key"); } - public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming) + public void MapIncomingToEnvelope(PubsubEnvelope envelope, PubsubMessage incoming) { - envelope.AckId = incoming.AckId; + envelope.AckId = incoming.MessageId; base.MapIncomingToEnvelope(envelope, incoming); } @@ -56,19 +55,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, outgoing.Attributes[key] = value; } - protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope) + protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope) { - if (incoming.Message.Attributes is null) + if (incoming.Attributes is null) { return; } - foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value; + foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value; } - protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value) { - if (incoming.Message.Attributes.TryGetValue(key, out var header)) + if (incoming.Attributes.TryGetValue(key, out var header)) { value = header; @@ -79,4 +78,4 @@ protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string k return false; } -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs index 6b90ee4e2..4fba318b6 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubListener.cs @@ -1,5 +1,4 @@ using Google.Cloud.PubSub.V1; -using Google.Protobuf.Collections; using Grpc.Core; using JasperFx.Blocks; using JasperFx.Core; @@ -53,72 +52,6 @@ IWolverineRuntime runtime NativeDeadLetterQueueEnabled = true; } - - _acknowledge = new RetryBlock(async (ackIds, _) => - { - if (transport.SubscriberApiClient is null) - { - throw new WolverinePubsubTransportNotConnectedException(); - } - - if (ackIds.Any()) - { - await transport.SubscriberApiClient.AcknowledgeAsync( - _endpoint.Server.Subscription.Name, - ackIds - ); - } - }, _logger, runtime.Cancellation); - - _deadLetter = new RetryBlock(async (e, _) => - { - if (_deadLetterTopic is null) - { - return; - } - - if (e is PubsubEnvelope pubsubEnvelope) - { - await _acknowledge.PostAsync([pubsubEnvelope.AckId]); - } - - await _deadLetterTopic.SendMessageAsync(e, _logger); - }, _logger, runtime.Cancellation); - - _requeue = new RetryBlock(async (e, _) => - { - if (e is PubsubEnvelope pubsubEnvelope) - { - await _acknowledge.PostAsync([pubsubEnvelope.AckId]); - } - - await _endpoint.SendMessageAsync(e, _logger); - }, _logger, runtime.Cancellation); - - _complete = new RetryBlock(async (envelopes, _) => - { - var pubsubEnvelopes = envelopes.OfType().ToArray(); - - if (!pubsubEnvelopes.Any()) - { - return; - } - - if (transport.SubscriberApiClient is null) - { - throw new WolverinePubsubTransportNotConnectedException(); - } - - var ackIds = pubsubEnvelopes - .Select(e => e.AckId) - .Where(x => !string.IsNullOrEmpty(x)) - .Distinct() - .ToArray(); - - await _acknowledge.PostAsync(ackIds); - }, _logger, _cancellation.Token); - - _task = StartAsync(); } public Uri Address => _endpoint.Uri; @@ -232,61 +165,55 @@ protected async Task listenForMessagesAsync(Func listenAsync) } } - protected async Task handleMessagesAsync(RepeatedField messages) + protected async Task handleMessagesAsync(PubsubMessage message) { - var envelopes = new List(messages.Count); + PubsubEnvelope? envelope = null; - foreach (var message in messages) + if (message.Attributes.ContainsKey("batched")) { - if (message.Message.Attributes.Keys.Contains("batched")) + var batched = EnvelopeSerializer.ReadMany(message.Data.ToByteArray()); + + if (batched.Any()) { - var batched = EnvelopeSerializer.ReadMany(message.Message.Data.ToByteArray()); + await _receiver.ReceivedAsync(this, batched); + } - if (batched.Any()) - { - await _receiver.ReceivedAsync(this, batched); - } + return; + } - await _acknowledge.PostAsync([message.AckId]); + try + { + envelope = new PubsubEnvelope(); - continue; - } + _mapper.MapIncomingToEnvelope(envelope, message); - try + if (envelope.IsPing()) { - var envelope = new PubsubEnvelope(); - - _mapper.MapIncomingToEnvelope(envelope, message); - - if (envelope.IsPing()) + try { - try - { - await _complete.PostAsync([envelope]); - } - catch (Exception ex) - { - _logger.LogError(ex, - "{Uri}: Error while acknowledging Google Cloud Platform Pub/Sub ping message \"{AckId}\".", - _endpoint.Uri, message.AckId); - } - - continue; + await _complete.PostAsync([envelope]); + } + catch (Exception ex) + { + _logger.LogError(ex, + "{Uri}: Error while acknowledging Google Cloud Platform Pub/Sub ping message \"{MessageId}\".", + _endpoint.Uri, message.MessageId); } - envelopes.Add(envelope); - } - catch (Exception ex) - { - _logger.LogError(ex, "{Uri}: Error while mapping Google Cloud Platform Pub/Sub message {AckId}.", - _endpoint.Uri, message.AckId); + return; } } + catch (Exception ex) + { + _logger.LogError(ex, "{Uri}: Error while mapping Google Cloud Platform Pub/Sub message {MessageId}.", + _endpoint.Uri, message.MessageId); + } + - if (envelopes.Any()) + if (envelope != null) { - await _receiver.ReceivedAsync(this, envelopes.ToArray()); - await _complete.PostAsync(envelopes.ToArray()); + await _receiver.ReceivedAsync(this, [envelope]); + await _complete.PostAsync([envelope]); } } -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubSenderProtocol.cs b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubSenderProtocol.cs index a120603f5..7fea52dc1 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubSenderProtocol.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/Internal/PubsubSenderProtocol.cs @@ -48,4 +48,4 @@ await _client.PublishAsync(new PublishRequest await callback.MarkProcessingFailureAsync(batch, ex); } } -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubTopicListenerConfiguration.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubTopicListenerConfiguration.cs index d9a4ac173..25fdfb7fa 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubTopicListenerConfiguration.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/PubsubTopicListenerConfiguration.cs @@ -104,4 +104,4 @@ public PubsubTopicListenerConfiguration ConfigureDeadLettering( return this; } -} \ No newline at end of file +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubTransport.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubTransport.cs index bb0ee7967..27d473401 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubTransport.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/PubsubTransport.cs @@ -22,7 +22,14 @@ public class PubsubTransport : BrokerTransport, IAsyncDisposable public EmulatorDetection EmulatorDetection = EmulatorDetection.None; public string ProjectId = string.Empty; + /// + /// This is the low level RPC client for managing publishing. + /// internal PublisherServiceApiClient? PublisherApiClient; + + /// + /// This is the low level RPC client for managing subscriptions + /// internal SubscriberServiceApiClient? SubscriberApiClient; /// @@ -67,6 +74,7 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) } AssignedNodeNumber = runtime.DurabilitySettings.AssignedNodeNumber; + PublisherApiClient = await pubBuilder.BuildAsync(); SubscriberApiClient = await subBuilder.BuildAsync(); } @@ -141,4 +149,4 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) Topics[responseName] = responseTopic; } -} \ No newline at end of file +}