-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][AMQ-8324][Jakarta Messaging 3.1] Asynchronous send with CompletionListener #1364
base: main
Are you sure you want to change the base?
Conversation
…stener. Added unit tests to cover behaviour required by specs.
a06cef3
to
dbb78fb
Compare
I don't think the changes here are going to be nearly as simple as you think, it's been a while since I've looked at this and have to review the spec again but I've talked to @tabish121 about there are a lot of things you have to take into consideration and it's not just as simple as being able to copy and paste the existing async code. Speaking of which, we should be trying to re-use code where possible and not simply copying and pasting. |
It looks like @tabish121 already commented on another PR about the issues with things like ordering that need to be taken into consideration here: #1045 (comment) Furthermore, @gemmellr commented here #1303 (review) So all of this was already discussed and I don't see how this PR addresses any of those concerns |
It does not. |
I need to look more into it later when I have more time but I started looking at the spec details and also the previous comments of concerns to try and figure out what the issues are. There's a few things but it looked like one of the biggest concerns was whether or not the ordering requirements are being met. The PR mentions that ordering is handled by the broker so I was trying to figure out if it's true for all the cases we need to care about. Specifically, things like persistent vs non-persistent messages being intermixed and also anonymous producers across destinations, etc. Looking at the spec in detail, I think the current async behavior might be good enough but not sure yet. It says the async behavior must follow the sync and is described at: https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#order-of-message-sends. Specifically I noticed it says:
All of this needs to be looked at closely and well tested of course. There's a lot of other nuances that need to be considered like session close: https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback |
Thanks for the feedback @cshannon, @tabish121 and @mattrpav . So I didn't exactly copied and pasted the AsyncCallback code, I did a few manual testing to make sure the ordering is honor. I added one testcase in the unit test that sends 100 message asynchronously, and make sure the CompletionListener is executed at the correct order. I will add a few more about intermixing between sync and async send. One challenge I had with unit testing is it doesn't simulate delayed in Ack. I built a hacked broker to simulate ack delayed for certain messages (if the message content contains a secret command) and it did honor the ordering. I will make sure I add more unit testcases, perform more manual tests and come up with edge cases, put it in a doc and present it here. From what I observed in my testing, the broker will not send out acknowledge of the send request out of order. It can dispatch to the consumer in different order due to message priority, but for same destination, the order of message ack is honored. I will go back and poke around it even more to see if I miss other edge cases and push the number of messages sent to see if I can break it. That being said, I didn't implement the logic of making sure the order is honored on the client side. If we feel that we need to enforce it on the client side (as a safeguard) then I can see what I can do on the client code. Regarding session close: yes 100%. That's why I would love feedback on that on what testcases I miss. Especially I needed to add a concurrent data structure to block until all async sends have completed (maybe there is one that does what I want supported natively, I have looked into phaser). That one I can add unit tests coverage more easily. |
@kenliao94 - In talking with @tabish121 there is one more wrinkle that I think also makes this more challenging if we are sticking strictly to the spec regarding the completion listener: https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#use-of-the-completionlistener-by-the-jakarta-messaging-provider
The issue here is that if a producer is sending messages to different destinations and/or with different modes (persistent/non persistent) then while the broker will handle the ordering correctly for each destination, it's possible and likely that the client will receive the responses out of order and end up completing the callbacks out of order. This is because the ActiveMQ client tracks the async requests in a map and just completes them as it receives them. In order to honor the spec for this section you would have to modify things to hold responses and not invoke those callbacks immediately if there were previous outstanding calls that had not been completed which is of course different than what the current behavior of the client is. There is a good example of the complexity in handling this in the Qpid JMS JmsSession class where they track all their async request completions in a queue so that things are processed in order. I suppose there could be a discussion on whether this requirement really needs to be met or not for the client but if the intention is to implement the spec then it should probably be followed 100%. The other issue of course is that this behavior is a bit different than the existing async behavior that the client supports so then we'd have to decide if we also need to preserve the old way for the existing API. |
As a follow up to my comment about following the spec 100% and whether we really need to meet the requirement etc... User's have been using the existing async client behavior for a very long time so I think you could also make an argument to actually just make the new Jakarta 3.1 completion listener behavior to be in line with the old behavior even even if it doesn't match the spec 100% as it preserves existing behavior and obviously is significantly less work to implement. On one hand, i think the goal should be to be 100% compliance but on the other hand it's also not like the broker has ever been fully compliant and certainly isn't compliant today with not implementing much of the spec. And there are times where I don't know how much it really matters if the spec is fully met. Anyways, I'm not sure what we should do as I'm sure there are uses cases and client code that would now break if the code expects an exact match to the spec and it deviates. Thinking about it, I don't really know how much it matters or doesn't matter that clause 7.3.8 is honored as it depends on what a client is trying to do, but if it wasn't honored we'd certainly need to disclose that somewhere for awareness. |
I agree with Chris. I think it makes more sense to close to the ActiveMQ async behavior (that exists for a while) more than trying to implement the spec at 100%. It would not surprise users imho ;) |
Well, I'm not really advocating one way or the other at the moment, and I still think there's a very good argument to be made that the work needs to be done to implement the spec correctly. I'm just saying we could have the conversation about it. Normally I would say if we are implementing the JMS spec then obviously it should be done 100%, this is just a little interesting because there's an existing async API that users have been using that is similar but slightly different. It's hard to say how surprised users will be as it depends on if they are a new user or legacy user probably. I would think that if a user is migrating from the existing async custom API to the Jakarta API they may be fine with the change and be expecting it to be the same. But there are going to be other users who are not migrating or read the spec and expect a different behavior and that the callbacks are completed in order so I could see someone relying on that (maybe the callback increments something, who knows) that could now break if they complete out of order. |
My point is that async exists for a while in ActiveMQ. So, it makes sense to use this for spec impl. That's especially true for async send. For message listener/callback, I think it depends if the user is an existing ActiveMQ user or a new one. I would advocate that as a first shot, it makes sense to start from our ActiveMQ experience. |
I took a look at the existing Async API and it's not actually documented anywhere that I can see of how it is supposed to work. So I actually don't think it would be a problem to just have the old API follow the same behavior as the spec, the order of completion callback isn't defined anywhere and there is no spec for the AMQ api so if we made it process in order it would be fine. I think it's always going to be better to implement the spec 100% if possible. My primary hesitation was about unexpected behavior changes from the existing async API but since it isn't documented (as I mentioned) I don't think it's a problem if we complete the callbacks in order. So I am leaning towards the idea that we should try and figure out if we can add a way to actually do that so we are spec compliant. Right now the callbacks/requests are only tracked in a map in the response correlator, I think we'd need to add something extra to track the order of submission as well and make sure to only execute the callback on a completed request once there are no previous requests. We'd need to make sure we didn't have too much queued up at once so that means timeouts or even blocking the send if there is outstanding stuff. Only simple way I think to meet the requirement is to only send one message at a time and wait for the response before sending the next one. This would make the behavior similar to a normal sync send but it would still complete async and is spec compliant. The only requirement is the callback is not executed in the same thread but you could in theory just only allow one to be submitted at a time (wait for each send to complete before sending the next one). I don't know that it's a good idea to simply use the existing async code and then say we will fix it later as that just establishes newly expected behavior that will either change which could cause users problems or more likely it will just be ignored and never fixed for real like ti should be. |
Hi @cshannon and @jbonofre the initial goal of this particular PR is to implement the behavior as close to the spec as possible. That being said, I am not against using the existing AsyncCallback behavior as CompletionListener because it has been proven effective. There are 9 items in the spec, so are we saying that only 7.3.3 message orders is not met? The first iteration of this PR is to just wrap AsyncCallback into CompletionListener and have the CompletionListener implemented the same way as the original AsyncCallback. But as @tabish121 pointed out, that version didn't implement the spec, and from the mailing list discussion https://lists.apache.org/thread/y2pz4y1p9c5l1cdm6lyd5spc810mk6kq we want to keep both behavior in ActiveMQ 6. Hence I created this PR that referenced the current async design but added additional logic (such as wrapping user completion listener to perform check and add a blocking call to close, commit and rollback) I will do more testing and add more logic to try and meet 7.3.3 and 7.3.8. But would like to get clarity on whether this is the only one not meeting the spec or there are more. Thanks. |
@kenliao94 - I think 7.3.3 is already handled by the broker as it should enforce the order across each destination and between mode (persistent/non persistent) but we can test. My concern the last few comments has to do with 7.3.8 which refers to the order that the completion listeners complete and that is the thing that would not be handled right now. The listeners would complete as the requests to the broker complete, so there is no guarantee the client will complete them in the order as 7.3.8 requires across different destinations. For example, let's say a producer sends 5 messages to 5 different destinations async (anonymous producer). I believe the current behavior of the client is that the completion callback would be executed for each of the 5 sent messages whenever they are received back from the broker. Because you are publishing to 5 different destinations, and because the broker does not enforce any kind of order across destinations, it's possible the producer will receive back the acks for the sent messages in a different order then they were sent. So this could lead to something like the callback for the 3rd published message being executed before the first one. We would need some way for the client to keep track of the order and hold off completing. The caveat is I think this is the current behavior. I would need to double check it but I don't think there's anything preventing the broker from responding back in a different order if the messages are published to different destinations. Regarding 7.3.5 it sounds like @mattrpav asked a question about it so I'm not sure if it can be skipped. Regarding 7.3.6 I missed this when originally reviewing the PR and I read into the spec more and your explanation and I'm not sure I agree. The spec says that you "may" throw an exception so that part is optional but I would argue it's preferred. What I'm reading is that you still would need to prevent the fields/properties from being accessed even if you didn't. I do not agree with your assessment that it is somehow messy code or the applications job. The simplest thing to do is the thing you didn't do, and that is just have a flag that is set that sets the mode and if the setter is allowed to be accessed and if it isn't because the competition is pending you throw an exception. Here is where Qpid JMS enforces this: https://github.com/apache/qpid-jms/blob/2d799c153a6b51f07a4c9ccec4a8751598183733/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java#L461-L481 Qpid JMS is a pretty good open source reference point in general to refer to. While it's AMQP, it's a good starting point for tips on how to handle the spec. |
One idea I had as another option for 7.3.8 is to only process one message a time. Well technically, this was @tabish121 idea. The spec says that the callback must be processed in a new thread for the async send but it doesn't say that multiple concurrent sends must be allowed. So the sending thread could block and wait for the send to finish and execute the callback before continuing. This of course is similar behavior to the normal sync send that just sends a message at a time and blocks. This is the simplest way to meet the spec (turning it into sync essentially) but the downside is not allowing the throughput of multiple sends at once. We could do something like make the mode configurable with legacy vs compliant mode. If it's legacy it could fall back to the existing async behavior that doesn't meet the spec, and if it's compliant it goes into the mode where it just sends one a time. A future update could be done to make the "compliant" mode better and allow it to handle multiple sends at one by tracking the order. Again, potential option, I'm not sure this is a good idea either but another idea I had. |
@cshannon makes sense. Thank you for your through review! Regarding 7.3.6 and 7.3.9: I will bring back the flag idea I mentioned in the description. During my exploration, I used the exact approach as Qpid JMS, add a setter method and an attribute to ActiveMQMessage. If that attribute is true. It will throw exception. As stated in the Regarding 7.3.3 and 7.3.8: I think I see your points now. Basically ordering within same destination and same delivery mode is already honored by the broker. So we are on the same page that 7.3.3 is already met as is. However, when it comes to 7.3.8, across destination from the same producer, the message order is not honored and I think you are right with the example you provided. When I looked at it initially, it is a map and there's no mechanism in the transport layer that make sure it is honored but I forgot about that case. I will push a new revision ASAP. Regarding legacy vs compliant mode, in this PR, it is differentiated by what API the client application uses. If the application uses the Jakarta Messaging API, async send with CompletionListener, it will get the behavior as described in the spec, I.E compliant mode. If the client application decides to cast the |
And follow up on your idea of one async send at a time. It is a viable option. We can go with it for now and not worry about the performance aspect. Since optimizing the throughput won't change behavior so it is not a breaking change. We can always improve the performance through another PR if that makes this PR smaller to review. |
+1 on spec compliance I have come to learn (the hard way) that if there is a feature in ActiveMQ, someone is using it in production. |
@kenliao94 - Yeah I think it's probably simplest to just do the one message at a time thing (don't even have an option to turn it off). The new method should be spec compliant so if we just go with blocking until the callback completes should work. The main thing is to make sure the completion callback still runs in a new thread (so use an executor, etc). This is still not going to be slow, it will be as fast as the normal sync API which is still quite fast, it just limits the concurrent sends until we can handle ordering. It ends up just being similar to sync but allows using the callback to handle which can be nice for some use cases. As you said, we can just keep the existing API with current async behavior so users can retain that if they want multiple concurrent sends and a future update could handle ordering if we want to support multiple concurrent sends for the new API. |
75b141a
to
a5d8ef1
Compare
Hi @cshannon @mattrpav @jbonofre I have published a new commit that implemented 7.3.6 and 7.3.9 - restriction on the Message object. It is pretty much what we discussed. Set a flag in the object to make it not accessible. The other thing I added was enforcing Will work on 7.3.8 in the new few days. Thank you! |
…er close is handled correctly according to 7.3.4
I just pushed a commit that uses a single-threaded Executor to perform the async send, and have that background thread wait until the current async send is finished. So application sender thread is not blocked (meet threading restriction) and async send is done one at a time. This will preserve ordering across the session "A session will only invoke one CompletionListener callback method at a time. For a given MessageProducer or JMSContext, callbacks (both onCompletion and onException) will be performed in the same order as the corresponding calls to the asynchronous send method." TODO: Need to add a few more tests. |
I actually don't think this is the correct design. I think we want to block the send call for now if we are just going to handle one at a time and essentially treat it like sync. As far as I know there is nothing that says the sender can't block and wait for the other thread to complete and it's much, much simpler. You could simply submit a task to execute the callback and wait for the future before continuing. Besides that, I don't see a benefit of the design to try and have a background thread that waits because:
|
@kenliao94 as I don't want you get frustrating about a lot of back and forth on this PR, I would propose to take a little step back. Imho, we don't have yet a consensus on the design. Maybe we can experiment the design proposal process we discussed on the mailing list: I propose you start a design document, associated with this PR. |
@jbonofre - That's a great idea. As seen from the back and forth, the spec here is a bit tricky so I think it would be good to start a design doc or something to specify what the actual plan is for the changes. It's a bit hard to get onto the same page just from PR discussions and it can lead to miscommunication or different ideas for design. The main thing with this is we can all agree before code is changed so you don't have to throw away code or waste time etc. Generally speaking the design here has gotten way too complex I think for a first version of this feature so i think we need to work on clarifying what the plan is. So we should probably:
|
Hi @cshannon and @jbonofre , I agree, that is a great idea. And thanks for the review, it is very helpful for newcomers like me to understand the nuances of the codebase. I will start a design doc and adopt the process proposed and get a consensus on the design. I will mark this PR as [WIP], close this one and open a new one once we get consensus on the design doc. |
What is this PR about?
Implemented Jakarta Messaging 3.1 spec, section 7.3 https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#asynchronous-send
Jira ticket: https://issues.apache.org/jira/browse/AMQ-8324
Here an overview of how each requirement is satisfied:
7.3.1 Quality of service
This CR implements the feature in both Classic and Simplified API. It references implementation of synchronous send and asynchronous send. It is confirmed that the CompletionListener is triggered only when broker server returns acknowledgement.
7.3.2 Exceptions
I trigger CompletionListener#onException if the onComplete throws an exception.
7.3.3 Message order
This basically is "enforced" by the broker server. The requirement of 7.3.3 states that async messages sent to a particular destination should honor the order when it comes to execution of associated CompletionListeners. In that case, the broker server will send acknowledge the same order as the send sequence, hence the execution of CompletionListener will follow the same order. Will write a testcase for it.
7.3.4 Close, commit or rollback
The wait until all incompleted async send is Implemented by a CountdownLock (I implemented it, it is similar to a CountDownLatch but it can be incremented). Before the session is dispose, it will block until CountdownLock is at zero. (If it is zero, then it is not blocked).
I also used a ThreadLocal object to set a "marker". So if the close, rollback, commit methods are called on the session or producer in the same thread of onComplete execution, it will throw an exception.
7.3.5 Restrictions on usage in Jakarta EE
I don't think we need to do this. I haven't found an example of it in the codebase. Also from the spec, that is recommended but not mandatory. Asked on the dev mailing list we can skip this requirement: https://lists.apache.org/thread/r4fmmjcbgbgm0gw5mo4k3m4h3jzj8x8q
7.3.6 Message headers
Setting a flag in
ActiveMQMessage
to make it throw exception when properties or headers are accessed.7.3.7 Restrictions on threading
The execution of CompletionListener is in the transport layer (in another thread). The application thread can continue to use the session after performing an asynchronous send.
7.3.8 Use of the CompletionListener by the Jakarta Messaging provider
The execution of CompletionListener is in the transport layer (in another thread)
7.3.9 Restrictions on the use of the Message object
See explanation at 7.3.6.
How do I test this change