Skip to content

Switch Pub/Sub implementation to use high level API#1700

Closed
cchen-motion wants to merge 2 commits intoJasperFx:mainfrom
CharlieDigital:main
Closed

Switch Pub/Sub implementation to use high level API#1700
cchen-motion wants to merge 2 commits intoJasperFx:mainfrom
CharlieDigital:main

Conversation

@cchen-motion
Copy link

@cchen-motion cchen-motion commented Sep 16, 2025

Purpose

This PR aims to replace the low-level Pub/Sub API using the SubscriberServiceApiClient with the higher level API call SubscriberClient which behind the scenes manages instance of SubcriberServiceApiClient.

See also: #1648

Background

I noticed that when connecting to upstream Pub/Sub, the connection would fail around the 90-100 second mark. This is because the low-level API is typically managed by the higher level API per Jon Skeet at Google: googleapis/google-cloud-dotnet#15057 (comment)

From the docs: https://cloud.google.com/pubsub/docs/pull#streamingpull-api

The StreamingPull API keeps an open connection. The Pub/Sub servers recurrently close the connection after a time period to avoid a long-running sticky connection. The client library automatically reopens a StreamingPull connection.

Messages are sent to the connection when they are available. The StreamingPull API thus minimizes latency and maximizes throughput for messages.

The minimum reproduction of this failure is here: https://github.com/CharlieDigital/dn-pubsub-error-repro but requires testing with an upstream Pub/Sub connection to observe the failure.

@jeremydmiller
Copy link
Member

@cchen-motion

image

@CharlieDigital
Copy link

Let me spend a bit more time with this to do some testing, but please review and let me know if I've made some incorrect assumptions about how Wolverine internals work.


NativeDeadLetterQueueEnabled = true;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this code block because dead lettering is handled by the SubscriberClient internally as is Acking.

Comment on lines +172 to +182
if (message.Attributes.ContainsKey("batched"))
{
if (message.Message.Attributes.Keys.Contains("batched"))
var batched = EnvelopeSerializer.ReadMany(message.Data.ToByteArray());

if (batched.Any())
{
var batched = EnvelopeSerializer.ReadMany(message.Message.Data.ToByteArray());
await _receiver.ReceivedAsync(this, batched);
}

if (batched.Any())
{
await _receiver.ReceivedAsync(this, batched);
}
return;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not certain if this batching piece is still relevant with the higher level API

Comment on lines -73 to -86
_deadLetter = new RetryBlock<Envelope>(async (e, _) =>
{
if (_deadLetterTopic is null)
{
return;
}

if (e is PubsubEnvelope pubsubEnvelope)
{
await _acknowledge.PostAsync([pubsubEnvelope.AckId]);
}

await _deadLetterTopic.SendMessageAsync(e, _logger);
}, _logger, runtime.Cancellation);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly here, new DLQ setup has to be written somewhere?

Comment on lines +49 to +52
var subscriberLifetime = subscriberClient.StartAsync(async (msg, ct) =>
{
while (await stream.MoveNextAsync(_cancellation.Token))
{
await handleMessagesAsync(stream.Current.ReceivedMessages);
}
await handleMessagesAsync(msg);
return SubscriberClient.Reply.Ack;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acking should technically happen automatically here in this lambda on the return of SubscriberClient.Reply.Ack so it seems like this removes some of the code that was previously managing the Ack at a lower level, but not clear if that code I removed is "structurally required" by Wolverine.

@CharlieDigital
Copy link

CharlieDigital commented Sep 18, 2025

Current state of tests:

image

Are there any tips to get the tests to run faster? Currently taking a really, really long time to run the suite just for GCP

@jeremydmiller
Copy link
Member

@cchen-motion Hate to admit that I paid more attention to this today because someone was ripping Wolverine because of the GCP transport today. Do you want me to take it the rest of the way, or just get into the questions?

Some of the test slowness is some compliance tests that wait for draining, and I've never taken any time to look at why that's slow

@CharlieDigital
Copy link

CharlieDigital commented Sep 29, 2025

I'm rethinking the implementation move to the high level lib as it may not mesh well with the overall architecture/design goals of Wolverine. Unfortunately, it was not easy to deduce the right way that this should work early on.

It may be that the correct move here is to change the error handling logic that was originally implemented which would fail after a certain number of reconnects. The key here is that these transient disconnects will not happen when testing with the emulator as the behavior here diverges from IRL Pub/Sub upstream in GCP.

I looked at the Brighter implementation of their Pub/Sub transport and it's quite good, actually.

https://github.com/BrighterCommand/Brighter/pull/3645/files#diff-b803fcb7f17ed9235f1e5cb1fcd2f5d3b2838429d4368ae4c57ce4436577f03f

Author implemented using idiomatic parts of the platform (e.g. clever use of Firestore atomic writes to perform distributed locking and using Firestore/Spanner as outbox).

I think the right course of action:

  • Determine if the high level client API fits into Wolverine's architecture/design. Mainly the high level API automatically handles ACKs and connection pooling. Does this fit into how Wolverine is designed to pull and consume messages?
  • If the high level client API does not fit into the intended design, then the question is how to make the intentional server disconnect handling correct in the Wolverine client implementation

The high level client API is quite stable and offers very high throughput in some testing I've done over the last week for different purposes. I just don't know if it fits well into your design goals.

I have two sandboxes here I was using for testing that you may find helpful:

I would also encourage you to give this thread a look: dotnet/aspnetcore#53219 (comment). Not sure about your thoughts here, but I think it would be a huge boon for the community and library owners like you if Microsoft separated out the front-side scaffolding for Orleans into a set of abstractions. Then, for example, if someone implements a NATS messaging transport, it would "just work" for Wolverine assuming Wolverine adopted the abstractions.

@jeremydmiller
Copy link
Member

@cchen-motion I honestly don't know. This is one of the very few parts of Wolverine where I wasn't terribly involved.

YOU are the guy who was bashing Wolverine on that thread?

"I think it would be a huge boon for the community and library owners like you if Microsoft separated out the front-side scaffolding for Orleans into a set of abstractions."

I don't want anything at all to do with that. I don't think that effort would provide any real value and probably end up being a huge waste of time for people like me, but I get why that might sound attractive.

At this point, I would prefer that you just move on, go use something else (Brighter is fine, if much more ceremony than Wolverine), and stay off of both the Wolverine GitHub and Discord. I don't know how much of a priority I feel the GCP transport is or even the Kafka usage you were after.

@jeremydmiller
Copy link
Member

Hey all, I'll take this on soon in an independent effort.

@CharlieDigital
Copy link

@jeremydmiller I'm surprised that you would view that as "bashing"; in its current state, the Pub/Sub implementation will only work with the emulator because it does not correctly handle the server disconnect signals. I'm merely sharing my experience about the state of messaging on .NET. It seems that individual projects not having to re-invent the wheel with it comes to "read message here, write message there" would be a big boon for platform authors.

I'm not trying to bash or pick on any particular team; I'm merely here exploring the options available as objectively as I can.

I'm very surprised that you see any malicious intent or snide in any of the feedback I've provided.

@jeremydmiller
Copy link
Member

@cchen-motion Hey man, you're being nothing but overhead and a distraction. If you're unhappy w/ Wolverine's implementation, and you're not able to finish this PR, I'd ask that you go somewhere. I'm irritated at the interactions with you on the Kafka usage as well and I think you're going to be difficult to interact with. I am still asking you to just go use something else.

I'm very sure that when I have some bandwidth, I can quickly address the GCP transport issues. To be honest with you though, there's very little demand for it and there are more pressing matters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants