Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,12 @@ public async Task customize_mappers()

#region sample_custom_pubsub_mapper

public class CustomPubsubMapper : EnvelopeMapper<ReceivedMessage, PubsubMessage>, IPubsubEnvelopeMapper
public class CustomPubsubMapper : EnvelopeMapper<PubsubMessage, PubsubMessage>, IPubsubEnvelopeMapper
{
public CustomPubsubMapper(PubsubEndpoint endpoint) : base(endpoint)
{
}

public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming)
{
envelope.AckId = incoming.AckId;

// You will have to help Wolverine out by either telling Wolverine
// what the message type is, or by reading the actual message object,
// or by telling Wolverine separately what the default message type
// is for a listening endpoint
envelope.MessageType = typeof(Message1).ToMessageTypeName();
}

public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message)
{
message.Data = ByteString.CopyFrom(outgoing.Data);
Expand All @@ -265,19 +254,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key,
outgoing.Attributes[key] = value;
}

protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope)
protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope)
{
if (incoming.Message.Attributes is null)
if (incoming.Attributes is null)
{
return;
}

foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value;
foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value;
}

protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value)
protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value)
{
if (incoming.Message.Attributes.TryGetValue(key, out var header))
if (incoming.Attributes.TryGetValue(key, out var header))
{
value = header;

Expand Down
3 changes: 1 addition & 2 deletions src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ namespace Wolverine.Pubsub;
/// <summary>
/// Pluggable strategy for reading and writing data to Google Cloud Platform Pub/Sub
/// </summary>
public interface IPubsubEnvelopeMapper : IEnvelopeMapper<ReceivedMessage, PubsubMessage>
public interface IPubsubEnvelopeMapper : IEnvelopeMapper<PubsubMessage, PubsubMessage>
{
void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming);
void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using JasperFx.Blocks;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Transports;

Expand All @@ -20,50 +17,35 @@ IWolverineRuntime runtime

public override async Task StartAsync()
{
if (_transport.SubscriberApiClient is null)
await listenForMessagesAsync(async () =>
{
throw new WolverinePubsubTransportNotConnectedException();
}

using var streamingPull =
_transport.SubscriberApiClient.StreamingPull(CallSettings.FromCancellationToken(_cancellation.Token));

await streamingPull.WriteAsync(new StreamingPullRequest
{
SubscriptionAsSubscriptionName = _endpoint.Server.Subscription.Name,
StreamAckDeadlineSeconds = 20,
MaxOutstandingMessages = _endpoint.Client.MaxOutstandingMessages,
MaxOutstandingBytes = _endpoint.Client.MaxOutstandingByteCount
});

await using var stream = streamingPull.GetResponseStream();

_acknowledge = new RetryBlock<string[]>((ackIds, _) => streamingPull.WriteAsync(new StreamingPullRequest
{
AckIds = { ackIds }
}), _logger, _runtime.Cancellation);

try
{
await listenForMessagesAsync(async () =>
var subscriptionName = _endpoint.Server.Subscription.Name;
await using SubscriberClient subscriber = await new SubscriberClientBuilder
{
while (await stream.MoveNextAsync(_cancellation.Token))
SubscriptionName = subscriptionName,
EmulatorDetection = _transport.EmulatorDetection,
Settings = new()
{
await handleMessagesAsync(stream.Current.ReceivedMessages);
// https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriberClient.Settings#Google_Cloud_PubSub_V1_SubscriberClient_Settings_FlowControlSettings
// Remarks: Flow control uses these settings for two purposes: fetching messages to process, and processing them.
// In terms of fetching messages, a single SubscriberClient creates multiple instances of SubscriberServiceApiClient, and each will observe the flow control settings independently
FlowControlSettings = new(_endpoint.Client.MaxOutstandingMessages, _endpoint.Client.MaxOutstandingByteCount),
}
});
}
finally
{
}.BuildAsync();
var ctRegistration = _cancellation.Token.Register(() => subscriber.StopAsync(CancellationToken.None));
try
{
await streamingPull.WriteCompleteAsync();
await subscriber.StartAsync(async (PubsubMessage message, CancellationToken cancel) =>
{
var success = await handleMessageAsync(message);
return success ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack;
});
}
catch (Exception ex)
finally
{
_logger.LogError(ex, "{Uri}: Error while completing the Google Cloud Platform Pub/Sub streaming pull.",
_endpoint.Uri);
ctRegistration.Unregister();
await subscriber.StopAsync(CancellationToken.None);
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Google.Cloud.PubSub.V1;
using Wolverine.Runtime;
using Wolverine.Transports;

Expand All @@ -12,25 +13,33 @@ public InlinePubsubListener(
IWolverineRuntime runtime
) : base(endpoint, transport, receiver, runtime)
{

}

public override async Task StartAsync()
{
await listenForMessagesAsync(async () =>
{
if (_transport.SubscriberApiClient is null)
var subscriptionName = _endpoint.Server.Subscription.Name;
await using SubscriberClient subscriber = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
EmulatorDetection = _transport.EmulatorDetection,
}.BuildAsync();
var ctRegistration = _cancellation.Token.Register(() => subscriber.StopAsync(CancellationToken.None));
try
{
throw new WolverinePubsubTransportNotConnectedException();
await subscriber.StartAsync(async (PubsubMessage message, CancellationToken cancel) =>
{
var success = await handleMessageAsync(message);
return success ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack;
});
}
finally
{
ctRegistration.Unregister();
await subscriber.StopAsync(CancellationToken.None);
}

var response = await _transport.SubscriberApiClient.PullAsync(
_endpoint.Server.Subscription.Name,
1,
_cancellation.Token
);

await handleMessagesAsync(response.ReceivedMessages);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

namespace Wolverine.Pubsub.Internal;

public class PubsubEnvelopeMapper : EnvelopeMapper<ReceivedMessage, PubsubMessage>, IPubsubEnvelopeMapper
public class PubsubEnvelopeMapper : EnvelopeMapper<PubsubMessage, PubsubMessage>, IPubsubEnvelopeMapper
{
public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint)
{
MapProperty(
e => e.Data!,
(e, m) =>
{
if (m.Message.Data.IsEmpty)
if (m.Data.IsEmpty)
{
return;
}

e.Data = m.Message.Data.ToByteArray();
e.Data = m.Data.ToByteArray();
},
(e, m) =>
{
Expand All @@ -31,19 +31,12 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint)
m.Data = ByteString.CopyFrom(e.Data);
}
);

MapPropertyToHeader(x => x.GroupId, "group-id");
MapPropertyToHeader(x => x.DeduplicationId, "deduplication-id");
MapPropertyToHeader(x => x.PartitionKey, "partition-key");
}

public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming)
{
envelope.AckId = incoming.AckId;

base.MapIncomingToEnvelope(envelope, incoming);
}

public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message)
{
message.Data = ByteString.CopyFrom(outgoing.Data);
Expand All @@ -56,19 +49,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key,
outgoing.Attributes[key] = value;
}

protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope)
protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope)
{
if (incoming.Message.Attributes is null)
if (incoming.Attributes is null)
{
return;
}

foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value;
foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value;
}

protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value)
protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value)
{
if (incoming.Message.Attributes.TryGetValue(key, out var header))
if (incoming.Attributes.TryGetValue(key, out var header))
{
value = header;

Expand Down
Loading
Loading