-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question/suggestion on how to get immidiate feedback from a publish call #101
Comments
Potential solution for eclipse-paho#101 Signed-off-by: Sander van Harmelen <[email protected]>
Potential solution for eclipse-paho#101 Signed-off-by: Sander van Harmelen <[email protected]>
Seems super trivial, but maybe I overlooked anything... Again curious what you think of it. |
Potential solution for eclipse-paho#101 Signed-off-by: Sander van Harmelen <[email protected]>
Potential solution for eclipse-paho#101 Signed-off-by: Sander van Harmelen <[email protected]>
Potential solution for eclipse-paho#101 Signed-off-by: Sander van Harmelen <[email protected]>
As I mentioned in the PR #102 I'm leaning against this solution because:
But I also wonder if the idea doesn't move us away from the evolution of this library towards the async/await paradigm. Having the functions return a Result wrapped in a Future walks us toward eventually making the functions like Wouldn't wrapping the Future in a Result break the pattern? |
I get your point, but let's first see if we are aligned on the problem... Maybe you have a suggestion or insight on how to solve it differently, but I had issues with the following:
Currently we have a bunch of producers all sending their messages over a single MPSC channel. The consumer is running in its own Tokio task, iterates over all incoming messages and then batches and publishes them. So when there is high traffic coming through the channel I want to iterate quickly and not wait for the previous message to actually be send (as it can be in the queue for quite some time if there is a bad internet uplink). So either I "trust" the C client logic to persist and eventually send the message (like I'm doing now) or I have to spawn a new dedicated Tokio task for each batch of messages I'm trying to send in order to prevent blocking the channel. But, again, during high load and a moment were the internet connection is pore or fails, this could mean we have a lot of tasks hanging around doing nothing, other then waiting for the call to return. And once it returns we do noting with the result, other then write a log message if the delivery failed. Codewise it probably means that I have to wrap the client in an Arc and clone it for each task... Next to managing all the tasks, that feel much more complicated and fragile then the solution in this PR. Just hand over the message to the queue and trust the persistence and queue logic for the actual delivery... |
Well, first, the client is just an Arc that can be cheaply cloned to share among publisher threads... |
Good point 😏 Yet why should I have to continiously start and manage tasks, while I can also manage everything from just a single long running task instead? Additionally it somehow feels like a second layer of code is trying to manage the same thing. That being delivering the message. The C code already has a ton of stuff for monitoring and managing the actual delivery, so it feels like I should be able to handoff my message and just trust and build on that existing logic. All in all I just don’t see any benefits, but only downsides. More tasks, more code paths, more things to manage and to test. So only the fact that the client is cheap to clone, is (IMO) not enought reason to not use an approach as suggested (or at least simular in how it can be used) in this PR. |
I guess, though, what I'm not seeing is the time delay you mention. On the failure of the C send function, the Token completes immediately with a failure. So on return of the call I guess you're saying that the API doesn't present a non-blocking way to know that? It's a little unfortunate that the Future works this way, but in reality, these aren't traditional Futures. The C lib is the real executor, and the task starts when you call the publish() function. When you "drive the future to completion" it really just returns immediately if C call failed or already completed (as with QoS=0), or it blocks the calling task until the C lib completes on an ACK. Before they became Future's, the Tokens were just managed by a Mutex and Condition Variable, so you could check and re-check them as often as you like without consuming them. |
To your point, above, I believe you're correct that I'm not aligned on the problem yet. Apologies.
That's not true if the C send function failed with an error (non-zero) return code. When the token is created with the error code, it is marked "complete" on construction. So await'ing it would only take slightly longer (for an uncontested Mutex lock) than to check the Result code. |
Or, maybe, to put the question another way... If the API will evolve into a standard async/await and we expect something like:
How would we handle this issue? |
Yeah I think so...
I know... I went through all the code (also the C code)
Yes, but I am "required" to call
I just want to know if it's properly queued. After that I'll leave it to the C client. But even for people who want to wait on the delivery this two step API seems pretty reasonable. Get an Of course you could use things like |
In that case I would almost make an additional fn:
EDIT: Where publish does both the queue and publish action of course! But I get your point... If you call |
The reason why I'm even looking at this is, that one of the errors (which is now fixed) in the C client caused some messages to not be handled properly and then they got "lost" causing the task to never unblock. I then went in to the code to better understand the logic and to me the problem is the fact that the queuing mechanism (incl. the persistent logic) is in between my publish call and the actual publish. |
Well, yeah, I guess the internal implementation needs to work as the first step!!! Which brings up another point that the proposed solutions - returning Result<> or the async queue() function - are exposing an internal implementation detail in the public API. And that detail will most definitely change, though perhaps off in the future. This API will likely evolve into the async functions before too long. That shouldn't change the applications using async/await because the existing API is mostly just the de-sugared version of them; at least to the outside world. At some time in the more distant future, this library will likely become a 100% Rust async/await implementation, and it would be nice to do that without affecting the API too much. I guess I'm trying to keep the bigger picture in mind. Things like I mentioned: consistency in the API itself (all the other functions return a token), some consistency between the Paho libraries, and worrying about making this particular use case easier while making many other use cases more difficult by requiring two error checks for every publish. |
I'm not sure if I agree with that statement. When creating a client you already have to configure if and how much messages your want to hold in the queue (max_buffered_messages) and if you want to persist those messages. So the notion of a queue is already exposed through the existing configuration options.
That would not be the case with a dedicated queue method right? As a user you can then choose which method to use and in both cases you just get one result. The existing publish method should not change and people should use either one or the other to hand off a message. When thinking a bit more about it, I really think that an additional method to only queue a message would be really great. Optionally the queue method could maybe take callbacks which could then be used as custom I really believe adding such a method adds value and improves the way this package can be used, without making it harder to update the publish method in the future (or any async/await logic in general). It just allows for another way to handle messages which in some cases can add value (as it does in our setup). |
Yes, but the intent there was for off-line buffering: meaning how many messages should it hold when the client it not connected. Perhaps a pure Rust async implementation would implement separate tasks to serialize outbound packets and transmit them - with a queue between them. But again, that would be an internal implementation detail. Maybe it would serialize and write_all() in one task with no queue, unless off-line bufering. But, anyway, how would a queue() function work? It would return a future that is already completed when returned? That's probably not right. Plus you would lose track of the message and never know if it completed transmission. That's sort of the purpose of QoS=0, although in this case, the internal library would still be trying to deliver the message, even if you didn't know that. Maybe the queue() function wouldn't be
Of course, if we do this, then someone will also want a
This is an interesting problem. I wonder what other asynchronous libraries do in this situation. |
It would be great if we could just discuss this face to face, as it takes so much effort like this 😉 I would expect that the queue function doesn't return a future, just a result. What purpose would it have if the result also contained a future? In that case you should just use the existing publish function. Same for the send function you described. That essentially is what the publish function already gives your right? So what is wrong with something like this:
Of maybe even something like this:
As that way you can still get notified when the message is actually delivered and log something if you want. I don't see why offering two different ways to handle sending a message would be an issue? It just enables more usecases IMO. |
A Future in a Result is what you proposed in your PR #102 ! 😄
is effectively the same as
as the |
Yes, but since you don't like the approach in PR #102 I am now suggesting another solution (adding a queue function). I don't understand how you now come up with this:
That is totally not what I just suggested right?
|
I posted the (simplified) question here: |
Funny... So the first two options mentioned are actually my first (PR #102) and second (add a queue function with callbacks) suggestions 😂 It does however surprise me that you respond with After all these discussions, I very much believe adding a queue function like I suggested (the one with the callbacks) is actually the best solution. It allows a sync response to a sync action and still a way to track the end result that doesn't require you to add a lot of additional logic. |
But... Happy to see several suggestions/options, as that most likely means we will eventually be able to add a solution for this one 😄 |
No, you don't need to check twice on that third option. That's why it's interesting. The Token itself completes when the message is delivered - meaning there is no change there from the existing API. Nothing breaks:
Same as before. The calls to It would complicate the But it also provides a means to eventually cover the additional state (written) at some point in the future. (Believe me - someone will ask for that next.) And it provides an easy path for tracking the state of other operations like connect, disconnect, subscribe, etc. Keep in mind that with 3yrs of this library and 8yrs of the similar C++ library API, and half a million combined downloads... you're the only person that ever asked for this feature. 😃 And it was based on a bug, that was fixed. And some assumed time delays that don't exist. So I'm not sure how universally useful or popular this might be. |
But, honestly, I think using callbacks is my least favorite of all. It would totally break the flow of an otherwise simple async client. For that matter, if async/await, or even Futures had been around when I started the library, I probably never would have implemented callbacks - at least not for operation success/failure. |
Damn. Actually, if we're just adding functions to the Tokens, we could add anything, right? It doesn't need to be something that has to return a future. The "queued" success/error will always have been completed or failed by the return of |
Ah I see... You added an example in your reply that indeed shows a solution that doesn't require calling the intermediate steps. Interesting...
How would it break the flow? Its an additional method that you can optionally use. But if you don't care you can just ignore it and keep using the existing methods. In that case nothing will change for you right?
Don't see how that is relevant...
The time delays I'm talking about a very much there and do exist. How is it not true that a message can be in the C clients' queue for a couple of minutes (maybe even longer)? As that is the "delay" I'm talking about... Seems like "talking" through typing and having different native languages is difficult... Who could have known 😂 |
I agree that this is indeed a super interesting direction to think about... But I think it returning a future does add more value as then you can Also not sure what |
I actually already thought about using But I understand that will not happen in a simple way like suggested in #102 so I'll either keep adding that solution to my own fork, or I'll try to use |
Yeah, I suppose I have a pre-existing idea of how the tokens should work, from using the libraries for so long. The C++ lib has
So you have a lot of options to check if the result is there without committing to wait forever, or consuming the token. (And you can call them from different threads, repeatedly without fear). These options kinda disappeared from the Rust lib when the Tokens were converted to implement Futures. For the non-Future status functions, I was meaning for this specific usage: Where the Future might already be complete upon returning from And I thought the callbacks for the queued state would be really weird because it would only ever be called from inside |
So, to come back around. This is what I had been thinking we needed from the beginning. The Token is the result. We just need a way to get the current status from it without committing to block for days at a time, and without consuming it if it's not complete. Maybe to keep compatibility, it can be implemented in a |
Oh, and to keep the full history here, the two suggestions from the Rust Forum were: From Kornel:
From Alice:
|
I will reopen this to remind me to actually implement it for the next release! And we can discuss, because I may try a few of these different ideas to see what works best. |
OK. I added
This is similar to what was proposed in PR #102, but it keeps the the old
so, if Now the question is whether, for consistency, there should be Next I will attempt to implement a |
OK. The |
OK, I pushed out both of these new features in a v0.9.1 release. To review, you can get the result of a With
or with
I'm curious if people would have a preference for one way or the other. But they each took so little code to implement I have no problem keeping them both indefinitely. |
Thanks @fpagliughi, really nice to have this in! I'll be off for another week, but will certainly test and play with all the new stuff soon after 👍🏻 |
Just as a heads up, some personal things came up so I didn't got to work on this yet. But it's still high on my list, so will pick it up shortly... |
I had some time to play and test with this, and the new features added in v0.9.1 work like a charm 👍🏻 Thanks! |
I have a question about the way the publish call works. If I understand the code correctly, it looks like the async publish method makes a call to publish the message and then returns a future that can be polled until the
on_failure
oron_success
callback is called.The problem with this is that it can take very long. Say that my connection is lost and I have configured the client to auto-reconnect and to persist messages. While the C call to publish a message will place the message on the queue and returns immediately, the Rust call returns a future that I have to drive to completion in order to find out if it failed or succeeded.
It would be nice if we could mimic the C behavior and give callers a choice about what to do once a message is added to the queue. In my case I'm good when I know the message is in the queue and will now be handled by the persistence logic of the C package for handling it's delivery.
So I thought that maybe changing the method to something like this would be nice improvement:
This way you can make a difference between the "internal" publish call (put the message on the queue) and the "external" publish call (the actual sending of the message) and let the caller use what is applicable. One could only check the
Result
, of first check theResult
and then.await
on the token to know if the message was successfully send or not.Curious if you would be open to a PR implementing something like this?
The text was updated successfully, but these errors were encountered: