Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ public async Task customize_mappers()

#region sample_custom_pubsub_mapper

public class CustomPubsubMapper : EnvelopeMapper<ReceivedMessage, PubsubMessage>, IPubsubEnvelopeMapper
public class CustomPubsubMapper : EnvelopeMapper<PubsubMessage, PubsubMessage>, IPubsubEnvelopeMapper
{
public CustomPubsubMapper(PubsubEndpoint endpoint) : base(endpoint)
{
}

public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming)
public void MapIncomingToEnvelope(PubsubEnvelope envelope, PubsubMessage incoming)
{
envelope.AckId = incoming.AckId;
envelope.AckId = incoming.MessageId;

// You will have to help Wolverine out by either telling Wolverine
// what the message type is, or by reading the actual message object,
Expand All @@ -264,20 +264,21 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key,
{
outgoing.Attributes[key] = value;
}


protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope)
protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope)
{
if (incoming.Message.Attributes is null)
if (incoming.Attributes is null)
{
return;
}

foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value;
foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value;
}

protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value)
protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value)
{
if (incoming.Message.Attributes.TryGetValue(key, out var header))
if (incoming.Attributes.TryGetValue(key, out var header))
{
value = header;

Expand Down
6 changes: 3 additions & 3 deletions src/Transports/GCP/Wolverine.Pubsub/IPubsubEnvelopeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace Wolverine.Pubsub;
/// <summary>
/// Pluggable strategy for reading and writing data to Google Cloud Platform Pub/Sub
/// </summary>
public interface IPubsubEnvelopeMapper : IEnvelopeMapper<ReceivedMessage, PubsubMessage>
public interface IPubsubEnvelopeMapper : IEnvelopeMapper<PubsubMessage, PubsubMessage>
{
void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming);
void MapIncomingToEnvelope(PubsubEnvelope envelope, PubsubMessage incoming);
void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Google.Api.Gax.Grpc;
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using JasperFx.Blocks;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Transports;
Expand All @@ -25,39 +24,47 @@ public override async Task StartAsync()
throw new WolverinePubsubTransportNotConnectedException();
}

using var streamingPull =
_transport.SubscriberApiClient.StreamingPull(CallSettings.FromCancellationToken(_cancellation.Token));

await streamingPull.WriteAsync(new StreamingPullRequest
// Create a high-level SubscriberClient for receiving messages which may
// use multiple underlying streaming pull connections.
var subscriberClientBuilder = new SubscriberClientBuilder()
{
SubscriptionAsSubscriptionName = _endpoint.Server.Subscription.Name,
StreamAckDeadlineSeconds = 20,
MaxOutstandingMessages = _endpoint.Client.MaxOutstandingMessages,
MaxOutstandingBytes = _endpoint.Client.MaxOutstandingByteCount
});
EmulatorDetection = _transport.EmulatorDetection,
SubscriptionName = _endpoint.Server.Subscription.Name,
Settings = new SubscriberClient.Settings
{
AckDeadline = TimeSpan.FromSeconds(20),
MaxTotalAckExtension = TimeSpan.FromMinutes(10),
FlowControlSettings = new FlowControlSettings(
_endpoint.Client.MaxOutstandingMessages,
_endpoint.Client.MaxOutstandingByteCount
)
}
};

await using var stream = streamingPull.GetResponseStream();

_acknowledge = new RetryBlock<string[]>((ackIds, _) => streamingPull.WriteAsync(new StreamingPullRequest
{
AckIds = { ackIds }
}), _logger, _runtime.Cancellation);
await using var subscriberClient = await subscriberClientBuilder.BuildAsync();

try
{
await listenForMessagesAsync(async () =>
// Start the subscriber and capture the lifetime task
var subscriberLifetime = subscriberClient.StartAsync(async (msg, ct) =>
{
while (await stream.MoveNextAsync(_cancellation.Token))
{
await handleMessagesAsync(stream.Current.ReceivedMessages);
}
await handleMessagesAsync(msg);
return SubscriberClient.Reply.Ack;
Comment on lines +49 to +52

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acking should technically happen automatically here in this lambda on the return of SubscriberClient.Reply.Ack so it seems like this removes some of the code that was previously managing the Ack at a lower level, but not clear if that code I removed is "structurally required" by Wolverine.

});

// Wait for whatever condition you have for running (your helper can return the lifetime task)
await listenForMessagesAsync(() => subscriberLifetime);

// When listenForMessagesAsync returns, request a graceful stop
}
finally
{
try
{
await streamingPull.WriteCompleteAsync();
await subscriberClient.StopAsync(TimeSpan.FromSeconds(15));
// Ensure the StartAsync task has completed and observe any exceptions
// (if subscriberLifetime had an exception it will be rethrown here)
// Note: if you need the variable here, capture it in an outer scope.
}
catch (Exception ex)
{
Expand All @@ -66,4 +73,4 @@ await listenForMessagesAsync(async () =>
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public InlinePubsubListener(
IWolverineRuntime runtime
) : base(endpoint, transport, receiver, runtime)
{

}

public override async Task StartAsync()
Expand All @@ -30,7 +30,7 @@ await listenForMessagesAsync(async () =>
_cancellation.Token
);

await handleMessagesAsync(response.ReceivedMessages);
await handleMessagesAsync(response.ReceivedMessages[0].Message);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using ImTools;
using JasperFx.Core;
using Wolverine.Transports;

namespace Wolverine.Pubsub.Internal;

public class PubsubEnvelopeMapper : EnvelopeMapper<ReceivedMessage, PubsubMessage>, IPubsubEnvelopeMapper
public class PubsubEnvelopeMapper : EnvelopeMapper<PubsubMessage, PubsubMessage>, IPubsubEnvelopeMapper
{
public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint)
{
MapProperty(
e => e.Data!,
(e, m) =>
{
if (m.Message.Data.IsEmpty)
if (m.Data.IsEmpty)
{
return;
}

e.Data = m.Message.Data.ToByteArray();
e.Data = m.Data.ToByteArray();
},
(e, m) =>
{
Expand All @@ -31,15 +30,15 @@ public PubsubEnvelopeMapper(PubsubEndpoint endpoint) : base(endpoint)
m.Data = ByteString.CopyFrom(e.Data);
}
);

MapPropertyToHeader(x => x.GroupId, "group-id");
MapPropertyToHeader(x => x.DeduplicationId, "deduplication-id");
MapPropertyToHeader(x => x.PartitionKey, "partition-key");
}

public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming)
public void MapIncomingToEnvelope(PubsubEnvelope envelope, PubsubMessage incoming)
{
envelope.AckId = incoming.AckId;
envelope.AckId = incoming.MessageId;

base.MapIncomingToEnvelope(envelope, incoming);
}
Expand All @@ -56,19 +55,19 @@ protected override void writeOutgoingHeader(PubsubMessage outgoing, string key,
outgoing.Attributes[key] = value;
}

protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope)
protected override void writeIncomingHeaders(PubsubMessage incoming, Envelope envelope)
{
if (incoming.Message.Attributes is null)
if (incoming.Attributes is null)
{
return;
}

foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value;
foreach (var pair in incoming.Attributes) envelope.Headers[pair.Key] = pair.Value;
}

protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value)
protected override bool tryReadIncomingHeader(PubsubMessage incoming, string key, out string? value)
{
if (incoming.Message.Attributes.TryGetValue(key, out var header))
if (incoming.Attributes.TryGetValue(key, out var header))
{
value = header;

Expand All @@ -79,4 +78,4 @@ protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string k

return false;
}
}
}
Loading
Loading