|
| 1 | +== Implementation Details |
| 2 | + |
| 3 | +This section provides additional implementation details for specific |
| 4 | +JMS API classes in the JMS Client. |
| 5 | + |
| 6 | +Deviations from the specification are implemented to support common |
| 7 | +acknowledgement behaviours. |
| 8 | + |
| 9 | +[#jms_topic_support] |
| 10 | +== JMS Topic Support |
| 11 | + |
| 12 | +JMS topics are implemented using an AMQP link:https://rabbitmq.com/tutorials/amqp-concepts.html#exchange-topic[topic exchange] |
| 13 | +and a dedicated AMQP queue for each JMS topic subscriber. The AMQP |
| 14 | +topic exchange is `jms.temp.topic` or `jms.durable.topic`, depending |
| 15 | +on whether the JMS topic is temporary or not, respectively. Let's |
| 16 | +take an example with a subscription to a durable `my.jms.topic` JMS topic: |
| 17 | + |
| 18 | +* a dedicated AMQP queue is created for this subscriber, its name |
| 19 | + will follow the pattern `+jms-cons-{UUID}+`. |
| 20 | +* the `+jms-cons-{UUID}+` AMQP queue is bound to the `jms.durable.topic` |
| 21 | + exchange with the `my.jms.topic` binding key. |
| 22 | +
|
| 23 | +If another subscriber subscribes to `my.jms.topic`, it will have |
| 24 | +its own AMQP queue and both subscribers will receive messages published |
| 25 | +to the `jms.durable.topic` exchange with the `my.jms.topic` routing key. |
| 26 | + |
| 27 | +The example above assumes no topic selector is used when declaring the |
| 28 | +subscribers. If a topic selector is in use, a `x-jms-topic`-typed exchange |
| 29 | +will sit between the `jms.durable.topic` topic exchange and the |
| 30 | +subscriber queue. So the topology is the following when subscribing to |
| 31 | +a durable `my.jms.topic` JMS topic with a selector: |
| 32 | + |
| 33 | +* a dedicated AMQP queue is created for this subscriber, its name |
| 34 | + will follow the pattern `+jms-cons-{UUID}+`. |
| 35 | +* a `x-jms-topic`-typed exchange is bound to the subscriber AMQP queue with |
| 36 | + the `my.jms.topic` binding key and some arguments related to the selector |
| 37 | + expressions. Note this exchange is scoped to the JMS session and not only |
| 38 | + to the subscriber. |
| 39 | +* the `x-jms-topic`-typed exchange is bound to the `jms.durable.topic` |
| 40 | + exchange with the `my.jms.topic` binding key. |
| 41 | +
|
| 42 | +Exchanges can be bound together thanks to a link:https://rabbitmq.com/e2e.html[RabbitMQ extension]. |
| 43 | +Note the <<enable_topic_selector, Topic Selector Plugin>> must be enabled for topic selectors |
| 44 | +to work. |
| 45 | + |
| 46 | +== QueueBrowser Support |
| 47 | + |
| 48 | +=== Overview of queue browsers |
| 49 | + |
| 50 | +The JMS API includes objects and methods to browse an existing queue |
| 51 | +destination, reading its messages _without_ removing them from the |
| 52 | +queue. Topic destinations cannot be browsed in this manner. |
| 53 | + |
| 54 | +A `QueueBrowser` can be created from a (queue) `Destination`, |
| 55 | +with or without a selector expression. The browser has a `getEnumeration()` |
| 56 | +method, which returns a Java `Enumeration` of ``Message``s copied from |
| 57 | +the queue. |
| 58 | + |
| 59 | +If no selector is supplied, then all messages in the queue appear |
| 60 | +in the `Enumeration`. If a selector is supplied, then only those |
| 61 | +messages that satisfy the selector appear. |
| 62 | + |
| 63 | +=== Implementation |
| 64 | + |
| 65 | +The destination queue is read when the `getEnumeration()` method is |
| 66 | +called. A _snapshot_ is taken of the messages in the queue; and the |
| 67 | +selector expression, if one is supplied, is used at this time to discard |
| 68 | +messages that do not match. |
| 69 | + |
| 70 | +The message copies may now be read using the `Enumeration` interface |
| 71 | +(`nextElement()` and `hasMoreElements()`). |
| 72 | + |
| 73 | +The selector expression and the destination queue of the `QueueBrowser` |
| 74 | +may not be adjusted after the `QueueBrowser` is created. |
| 75 | + |
| 76 | +An `Enumeration` cannot be "reset", but the `getEnumeration()` method |
| 77 | +may be re-issued, taking a _new_ snapshot from the queue each time. |
| 78 | + |
| 79 | +The contents of an `Enumeration` survive session and/or connection |
| 80 | +close, but a `QueueBrowser` may not be used after the session that |
| 81 | +created it has closed. `QueueBrowser.close()` has no effect. |
| 82 | + |
| 83 | +==== Which messages are included |
| 84 | + |
| 85 | +Messages that arrive, expire, are re-queued, or are removed after |
| 86 | +the `getEnumeration()` call have no effect on the contents of the |
| 87 | +`Enumeration` it produced. If the messages in the queue change |
| 88 | +_while the_ `Enumeration` _is being built_, they may or may not be |
| 89 | +included. In particular, if messages from the queue are simultaneously |
| 90 | +read by another client (or session), they may or may not appear in |
| 91 | +the `Enumeration`. |
| 92 | + |
| 93 | +Message copies do not "expire" from an `Enumeration`. |
| 94 | + |
| 95 | +==== Order of messages |
| 96 | + |
| 97 | +If other client sessions read from a queue that is being browsed, |
| 98 | +then it is possible that some messages may subsequently be received out |
| 99 | +of order. |
| 100 | + |
| 101 | +Message order will not be disturbed if no other client sessions read |
| 102 | +the queue at the same time. |
| 103 | + |
| 104 | +==== Memory usage |
| 105 | + |
| 106 | +When a message is read from the `Enumeration` (with `nextElement()`), |
| 107 | +then no reference to it is retained in the Java Client. This means the |
| 108 | +storage it occupies in the client is eligible for release |
| 109 | +(by garbage collection) if no other references are retained. |
| 110 | +Retaining an `Enumeration` will retain the storage for all message |
| 111 | +copies that remain in it. |
| 112 | + |
| 113 | +If the queue has many messages -- or the messages it contains are very |
| 114 | +large -- then a `getEnumeration()` method call may consume a large |
| 115 | +amount of memory in a very short time. This remains true even if only |
| 116 | +a few messages are selected. There is currently limited protection |
| 117 | +against `OutOfMemoryError` conditions that may arise because of this. |
| 118 | +See the next section. |
| 119 | + |
| 120 | +==== Setting a maximum number of messages to browse |
| 121 | + |
| 122 | +Each connection is created with a limit on the number of messages that |
| 123 | +are examined by a `QueueBrowser`. The limit is set on the |
| 124 | +`RMQConnectionFactory` by `RMQConnectionFactory.setQueueBrowserReadMax(int)` |
| 125 | +and is passed to each `Connection` subsequently created |
| 126 | +by `ConnectionFactory.createConnection()`. |
| 127 | + |
| 128 | +The limit is an integer that, if positive, stops the queue browser from |
| 129 | +reading more than this number of messages when building an enumeration. |
| 130 | +If it is zero or negative, it is interpreted as imposing no limit on |
| 131 | +the browser, and all of the messages on the queue are scanned. |
| 132 | + |
| 133 | +The default limit for a factory is determined by the |
| 134 | +`rabbit.jms.queueBrowserReadMax` system property, if set, and the value |
| 135 | +is specified as `0` if this property is not set or is not an integer. |
| 136 | + |
| 137 | +If a `RMQConnectionFactory` value is obtained from a JNDI provider, |
| 138 | +then the limit set when the factory object was created is preserved. |
| 139 | + |
| 140 | +==== Release Support |
| 141 | + |
| 142 | +Support for ``QueueBrowser``s is introduced in the JMS Client 1.2.0. |
| 143 | +Prior to that release, calling `Session.createBrowser(Queue queue[, String selector])` |
| 144 | +resulted in an `UnsupportedOperationException`. |
| 145 | + |
| 146 | +=== Group and individual acknowledgement |
| 147 | + |
| 148 | +Prior to version 1.2.0 of the JMS client, in client acknowledgement mode |
| 149 | +(`Session.CLIENT_ACKNOWLEDGE`), acknowledging any message from an open |
| 150 | +session would acknowledge _every_ unacknowledged message of that session, |
| 151 | +whether they were received before or after the message being acknowledged. |
| 152 | + |
| 153 | +Currently, the behaviour of `Session.CLIENT_ACKNOWLEDGE` mode is |
| 154 | +modified so that, when calling `msg.acknowledge()`, only the message |
| 155 | +`msg` _and all_ previously received _unacknowledged messages on that |
| 156 | +session_ are acknowledged. Messages received _after_ `msg` was received |
| 157 | +are not affected. This is a form of _group acknowledgement_, |
| 158 | +which differs slightly from the JMS specification but is likely to |
| 159 | +be more useful, and is compatible with the vast majority of uses of |
| 160 | +the existing acknowledge function. |
| 161 | + |
| 162 | +For even finer control, a new acknowledgement mode may be set when |
| 163 | +creating a session, called `RMQSession.CLIENT_INDIVIDUAL_ACKNOWLEDGE`. |
| 164 | + |
| 165 | +A session created with this acknowledgement mode will mean that messages |
| 166 | +received on that session will be acknowledged individually. That is, |
| 167 | +the call `msg.acknowledge()` will acknowledge only the message `msg` |
| 168 | +and not affect any other messages of that session. |
| 169 | + |
| 170 | +The acknowledgement mode `RMQSession.CLIENT_INDIVIDUAL_ACKNOWLEDGE` |
| 171 | +is equivalent to `Session.CLIENT_ACKNOWLEDGE` in all other respects. |
| 172 | +In particular the `getAcknowledgeMode()` method returns |
| 173 | +`Session.CLIENT_ACKNOWLEDGE` even if |
| 174 | +`RMQSession.CLIENT_INDIVIDUAL_ACKNOWLEDGE` has been set. |
| 175 | + |
| 176 | +== Arbitrary Message support |
| 177 | + |
| 178 | +Any instance of a class that implements the `javax.jms.Message` |
| 179 | +interface can be _sent_ by a JMS message producer. |
| 180 | + |
| 181 | +All properties of the message required by `send()` are correctly |
| 182 | +interpreted except that the `JMSReplyTo` header and objects |
| 183 | +(as property values or the body of an `ObjectMessage`) that |
| 184 | +cannot be deserialized are ignored. |
| 185 | + |
| 186 | +The implementation extracts the properties and body from the `Message` |
| 187 | +instance using interface methods and recreates it as a message of |
| 188 | +the right (`RMQMessage`) type (`BytesMessage`, `MapMessage`, `ObjectMessage`, |
| 189 | +`TextMessage`, or `StreamMessage`) before sending it. This means |
| 190 | +that there is some performance loss due to the copying; but in the |
| 191 | +normal case, when the message is an instance of |
| 192 | +`com.rabbitmq.jms.client.RMQMessage`, no copying is done. |
0 commit comments