Skip to content

Commit d83aa20

Browse files
committed
Use concurrent utilities in RMQSession
Instead of synchronized blocks and Object monitors. (cherry picked from commit 67b80b2)
1 parent 32b423f commit d83aa20

File tree

1 file changed

+102
-102
lines changed

1 file changed

+102
-102
lines changed

src/main/java/com/rabbitmq/jms/client/RMQSession.java

Lines changed: 102 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.*;
1414
import java.util.concurrent.*;
1515
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.concurrent.locks.Lock;
17+
import java.util.concurrent.locks.ReentrantLock;
1618
import java.util.function.BiFunction;
1719

1820
import javax.jms.BytesMessage;
@@ -133,8 +135,10 @@ public class RMQSession implements Session, QueueSession, TopicSession {
133135

134136
/** The main RabbitMQ channel we use under the hood */
135137
private final Channel channel;
138+
136139
/** Set to true if close() has been called and completed */
137-
private volatile boolean closed = false;
140+
private final AtomicBoolean closed = new AtomicBoolean(false);
141+
138142
/** The message listener for this session. */
139143
private volatile MessageListener messageListener;
140144
/** A list of all the producers created by this session.
@@ -146,22 +150,18 @@ public class RMQSession implements Session, QueueSession, TopicSession {
146150
/** We keep an ordered set of the message tags (acknowledgement tags) for all messages received and unacknowledged.
147151
* Each message acknowledgement must ACK all (unacknowledged) messages received up to this point, and
148152
* we must never acknowledge a message more than once (nor acknowledge a message that doesn't exist). */
149-
private final SortedSet<Long> unackedMessageTags = Collections.synchronizedSortedSet(
150-
new TreeSet<>());
151-
152-
/* Holds the uncommited tags to commit a nack on rollback */
153-
private final List<Long> uncommittedMessageTags = new ArrayList<>(); // GuardedBy("commitLock");
153+
private final SortedSet<Long> unackedMessageTags =
154+
Collections.synchronizedSortedSet(new TreeSet<>()); // GuardedBy("unackedMessageTagsLock")
155+
private final Lock unackedMessageTagsLock = new ReentrantLock();
154156

155157
/** List of all our topic subscriptions so we can track them */
156-
private final Subscriptions subscriptions;
157-
158-
/** Lock for waiting for close */
159-
private final Object closeLock = new Object();
158+
private final Subscriptions subscriptions; // GuardedBy("subscriptionsLock")
159+
private final Lock subscriptionsLock = new ReentrantLock();
160160

161-
/** Lock and parms for commit and rollback blocking of other commands */
162-
private final Object commitLock = new Object();
163-
private static final long COMMIT_WAIT_MAX = 2000L; // 2 seconds
164-
private boolean committing = false; // GuardedBy("commitLock");
161+
/* Holds the uncommited tags to commit a nack on rollback */
162+
private final List<Long> uncommittedMessageTags = new ArrayList<>(); // GuardedBy("commitLock");
163+
/** Lock commit and rollback blocking of other commands */
164+
private final Lock commitLock = new ReentrantLock();
165165

166166
/** Client version obtained from compiled class. */
167167
private static final GenericVersion CLIENT_VERSION = new GenericVersion(RMQSession.class.getPackage().getImplementationVersion());
@@ -182,14 +182,13 @@ public class RMQSession implements Session, QueueSession, TopicSession {
182182
private static final Map<String, Object> RJMS_SELECTOR_EXCHANGE_ARGS
183183
= Collections.singletonMap(RJMS_VERSION_ARG, RJMS_CLIENT_VERSION);
184184

185-
186185
private static final String JMS_TOPIC_SELECTOR_EXCHANGE_TYPE = "x-jms-topic";
187186

188187
private final DeliveryExecutor deliveryExecutor;
189188

190189
/** The channels we use for browsing queues (there may be more than one in operation at a time) */
191-
private Set<Channel> browsingChannels = new HashSet<>(); // @GuardedBy(bcLock)
192-
private final Object bcLock = new Object();
190+
private final Set<Channel> browsingChannels = new HashSet<>(); // @GuardedBy(bcLock)
191+
private final Lock bcLock = new ReentrantLock();
193192

194193
/**
195194
* Classes in these packages can be transferred via ObjectMessage.
@@ -344,7 +343,7 @@ public BytesMessage createBytesMessage() throws JMSException {
344343
}
345344

346345
private void illegalStateExceptionIfClosed() throws IllegalStateException {
347-
if (this.closed) throw new IllegalStateException("Session is closed");
346+
if (this.closed.get()) throw new IllegalStateException("Session is closed");
348347
}
349348

350349
/**
@@ -470,25 +469,18 @@ int getAcknowledgeModeNoException() {
470469
}
471470

472471
private boolean enterCommittingBlock() {
473-
synchronized(this.commitLock){
474-
try {
475-
while(this.committing) {
476-
this.commitLock.wait(COMMIT_WAIT_MAX);
477-
}
478-
this.committing = true;
479-
return true;
480-
} catch(InterruptedException ie) {
481-
Thread.currentThread().interrupt();
482-
return false;
483-
}
472+
try {
473+
this.commitLock.lockInterruptibly();
474+
return true;
475+
} catch (InterruptedException ie) {
476+
this.commitLock.unlock();
477+
Thread.currentThread().interrupt();
478+
return false;
484479
}
485480
}
486481

487482
private void leaveCommittingBlock() {
488-
synchronized(this.commitLock){
489-
this.committing = false;
490-
this.commitLock.notifyAll();
491-
}
483+
this.commitLock.unlock();
492484
}
493485

494486
/**
@@ -583,40 +575,33 @@ public void close() throws JMSException {
583575
}
584576

585577
void internalClose() throws JMSException {
586-
if (this.closed) return;
587-
logger.trace("close session {}", this);
588-
589-
synchronized (this.closeLock) {
590-
try {
591-
// close consumers first (to prevent requeues being consumed)
592-
closeAllConsumers();
593-
594-
// rollback anything not committed already
595-
if (this.getTransactedNoException()) {
596-
// don't nack messages on close
597-
this.clearUncommittedTags();
598-
this.rollback();
599-
}
600-
601-
//clear up potential executor
602-
this.deliveryExecutor.close();
603-
604-
//close all producers created by this session
605-
for (RMQMessageProducer producer : this.producers) {
606-
producer.internalClose();
607-
}
608-
this.producers.clear();
578+
if (this.closed.compareAndSet(false, true)) {
579+
logger.trace("close session {}", this);
580+
// close consumers first (to prevent requeues being consumed)
581+
closeAllConsumers();
582+
583+
// rollback anything not committed already
584+
if (this.getTransactedNoException()) {
585+
// don't nack messages on close
586+
this.clearUncommittedTags();
587+
this.rollback();
588+
}
609589

610-
//now commit anything done during close
611-
if (this.getTransactedNoException()) {
612-
this.commit();
613-
}
590+
//clear up potential executor
591+
this.deliveryExecutor.close();
614592

615-
this.closeRabbitChannels();
593+
//close all producers created by this session
594+
for (RMQMessageProducer producer : this.producers) {
595+
producer.internalClose();
596+
}
597+
this.producers.clear();
616598

617-
} finally {
618-
this.closed = true;
599+
//now commit anything done during close
600+
if (this.getTransactedNoException()) {
601+
this.commit();
619602
}
603+
604+
this.closeRabbitChannels();
620605
}
621606
}
622607

@@ -670,7 +655,8 @@ public void recover() throws JMSException {
670655
if (getTransactedNoException()) {
671656
throw new javax.jms.IllegalStateException("Session is transacted.");
672657
} else {
673-
synchronized (this.unackedMessageTags) {
658+
try {
659+
this.unackedMessageTagsLock.lock();
674660
/* If we have messages to recover */
675661
if (!this.unackedMessageTags.isEmpty()) {
676662
try {
@@ -681,6 +667,8 @@ public void recover() throws JMSException {
681667
}
682668
this.unackedMessageTags.clear();
683669
}
670+
} finally {
671+
this.unackedMessageTagsLock.unlock();
684672
}
685673
}
686674
}
@@ -1074,13 +1062,14 @@ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JM
10741062
*/
10751063
Channel getBrowsingChannel() throws JMSException {
10761064
try {
1077-
synchronized (this.bcLock) {
1078-
Channel chan = this.getConnection().createRabbitChannel(false); // not transactional
1079-
this.browsingChannels.add(chan);
1080-
return chan;
1081-
}
1065+
this.bcLock.lock();
1066+
Channel chan = this.getConnection().createRabbitChannel(false); // not transactional
1067+
this.browsingChannels.add(chan);
1068+
return chan;
10821069
} catch (Exception e) { // includes unchecked exceptions, e.g. ShutdownSignalException
10831070
throw new RMQJMSException("Cannot create browsing channel", e);
1071+
} finally {
1072+
this.bcLock.unlock();
10841073
}
10851074
}
10861075

@@ -1101,7 +1090,8 @@ private static Map<String, Object> merge(Map<String, Object> m1, Map<String, Obj
11011090
* Silently close and discard browsing channels, if any.
11021091
*/
11031092
private void clearBrowsingChannels() {
1104-
synchronized (this.bcLock) {
1093+
try {
1094+
this.bcLock.lock();
11051095
for (Channel chan : this.browsingChannels) {
11061096
try {
11071097
if (chan.isOpen())
@@ -1111,6 +1101,8 @@ private void clearBrowsingChannels() {
11111101
}
11121102
}
11131103
this.browsingChannels.clear();
1104+
} finally {
1105+
this.bcLock.unlock();
11141106
}
11151107
}
11161108

@@ -1119,15 +1111,16 @@ private void clearBrowsingChannels() {
11191111
*/
11201112
void closeBrowsingChannel(Channel chan) {
11211113
try {
1122-
synchronized (this.bcLock) {
1123-
if (this.browsingChannels.remove(chan)) {
1124-
if (chan.isOpen())
1125-
chan.close();
1114+
this.bcLock.lock();
1115+
if (this.browsingChannels.remove(chan)) {
1116+
if (chan.isOpen()) {
1117+
chan.close();
11261118
}
11271119
}
11281120
} catch (Exception e) {
1129-
// throw new RMQJMSException("Cannot close browsing channel", _);
11301121
// ignore errors in clearing up
1122+
} finally {
1123+
this.bcLock.unlock();
11311124
}
11321125
}
11331126

@@ -1303,8 +1296,11 @@ void resume() throws JMSException {
13031296

13041297
void unackedMessageReceived(long dTag) {
13051298
if (!getTransactedNoException()) {
1306-
synchronized (this.unackedMessageTags) {
1299+
try {
1300+
this.unackedMessageTagsLock.lock();
13071301
this.unackedMessageTags.add(dTag);
1302+
} finally {
1303+
this.unackedMessageTagsLock.unlock();
13081304
}
13091305
}
13101306
}
@@ -1344,32 +1340,33 @@ private void acknowledge(long messageTag) throws JMSException {
13441340
* The individualAck option is set by session mode (CLIENT_INDIVIDUAL_ACKNOWLEDGE) and overrides groupAck (default) and acknowledges at most a single message.
13451341
* </p>
13461342
*/
1347-
synchronized (this.unackedMessageTags) {
1348-
try {
1349-
if (individualAck) {
1350-
if (!this.unackedMessageTags.contains(messageTag)) return; // this message already acknowledged
1351-
/* ACK a single message */
1352-
this.getChannel().basicAck(messageTag, false); // we ack the single message with this tag
1353-
this.unackedMessageTags.remove(messageTag);
1354-
} else if (groupAck) {
1355-
/** The tags that precede the given one, and the given one, if unacknowledged */
1356-
SortedSet<Long> previousTags = this.unackedMessageTags.headSet(messageTag+1);
1357-
if (previousTags.isEmpty()) return; // no message to acknowledge
1358-
/* ack multiple message up until the existing tag */
1359-
this.getChannel().basicAck(previousTags.last(), // we ack the latest one (which might be this one, but might not be)
1360-
true); // and everything prior to that
1361-
// now remove all the tags <= messageTag
1362-
previousTags.clear();
1363-
} else {
1364-
// this block is no longer possible (groupAck == true) after RJMS 1.2.0
1365-
this.getChannel().basicAck(this.unackedMessageTags.last(), // we ack the highest tag
1366-
true); // and everything prior to that
1367-
this.unackedMessageTags.clear();
1368-
}
1369-
} catch (IOException x) {
1370-
this.logger.error("RabbitMQ exception on basicAck of message {}; on session '{}'", messageTag, this, x);
1371-
throw new RMQJMSException(x);
1343+
try {
1344+
this.unackedMessageTagsLock.lock();
1345+
if (individualAck) {
1346+
if (!this.unackedMessageTags.contains(messageTag)) return; // this message already acknowledged
1347+
/* ACK a single message */
1348+
this.getChannel().basicAck(messageTag, false); // we ack the single message with this tag
1349+
this.unackedMessageTags.remove(messageTag);
1350+
} else if (groupAck) {
1351+
/** The tags that precede the given one, and the given one, if unacknowledged */
1352+
SortedSet<Long> previousTags = this.unackedMessageTags.headSet(messageTag+1);
1353+
if (previousTags.isEmpty()) return; // no message to acknowledge
1354+
/* ack multiple message up until the existing tag */
1355+
this.getChannel().basicAck(previousTags.last(), // we ack the latest one (which might be this one, but might not be)
1356+
true); // and everything prior to that
1357+
// now remove all the tags <= messageTag
1358+
previousTags.clear();
1359+
} else {
1360+
// this block is no longer possible (groupAck == true) after RJMS 1.2.0
1361+
this.getChannel().basicAck(this.unackedMessageTags.last(), // we ack the highest tag
1362+
true); // and everything prior to that
1363+
this.unackedMessageTags.clear();
13721364
}
1365+
} catch (IOException x) {
1366+
this.logger.error("RabbitMQ exception on basicAck of message {}; on session '{}'", messageTag, this, x);
1367+
throw new RMQJMSException(x);
1368+
} finally {
1369+
this.unackedMessageTagsLock.unlock();
13731370
}
13741371
}
13751372
}
@@ -1444,7 +1441,8 @@ private MessageConsumer createTopicConsumer(Topic topic, String name, boolean du
14441441

14451442
RMQDestination topicDest = (RMQDestination) topic;
14461443
String queueName = durable ? name : generateJmsConsumerQueueName();
1447-
synchronized (this.subscriptions) {
1444+
try {
1445+
this.subscriptionsLock.lock();
14481446
Subscription subscription = this.subscriptions.register(name, queueName, durable, shared,
14491447
messageSelector, noLocal);
14501448
PostAction postAction = subscription.validateNewConsumer(topic, durable, shared,
@@ -1459,6 +1457,8 @@ private MessageConsumer createTopicConsumer(Topic topic, String name, boolean du
14591457
consumer.setNoLocal(noLocal);
14601458
subscription.add(consumer);
14611459
return consumer;
1460+
} finally {
1461+
this.subscriptionsLock.unlock();
14621462
}
14631463
}
14641464

0 commit comments

Comments
 (0)