Skip to content

Commit 5f6ebde

Browse files
committed
Fix session closing sequence
The refactoring in #606 broke the closing sequence for transacted sessions, as a closed flag was set to true on closing and a method called in the closing checks the flag. This commit adds a closing flag to make the closing idempotent and sets the closed flag to true when the session is closed. References #606 (cherry picked from commit 3a227a8)
1 parent b6b8eea commit 5f6ebde

File tree

1 file changed

+28
-23
lines changed

1 file changed

+28
-23
lines changed

src/main/java/com/rabbitmq/jms/client/RMQSession.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public class RMQSession implements Session, QueueSession, TopicSession {
138138

139139
/** Set to true if close() has been called and completed */
140140
private final AtomicBoolean closed = new AtomicBoolean(false);
141+
private final AtomicBoolean closing = new AtomicBoolean(false);
141142

142143
/** The message listener for this session. */
143144
private volatile MessageListener messageListener;
@@ -575,33 +576,37 @@ public void close() throws JMSException {
575576
}
576577

577578
void internalClose() throws JMSException {
578-
if (this.closed.compareAndSet(false, true)) {
579-
logger.trace("close session {}", this);
580-
// close consumers first (to prevent requeues being consumed)
581-
closeAllConsumers();
582-
583-
// rollback anything not committed already
584-
if (this.getTransactedNoException()) {
585-
// don't nack messages on close
586-
this.clearUncommittedTags();
587-
this.rollback();
588-
}
579+
if (this.closing.compareAndSet(false, true)) {
580+
try {
581+
logger.trace("close session {}", this);
582+
// close consumers first (to prevent requeues being consumed)
583+
closeAllConsumers();
589584

590-
//clear up potential executor
591-
this.deliveryExecutor.close();
585+
// rollback anything not committed already
586+
if (this.getTransactedNoException()) {
587+
// don't nack messages on close
588+
this.clearUncommittedTags();
589+
this.rollback();
590+
}
592591

593-
//close all producers created by this session
594-
for (RMQMessageProducer producer : this.producers) {
595-
producer.internalClose();
596-
}
597-
this.producers.clear();
592+
//clear up potential executor
593+
this.deliveryExecutor.close();
598594

599-
//now commit anything done during close
600-
if (this.getTransactedNoException()) {
601-
this.commit();
602-
}
595+
//close all producers created by this session
596+
for (RMQMessageProducer producer : this.producers) {
597+
producer.internalClose();
598+
}
599+
this.producers.clear();
600+
601+
//now commit anything done during close
602+
if (this.getTransactedNoException()) {
603+
this.commit();
604+
}
603605

604-
this.closeRabbitChannels();
606+
this.closeRabbitChannels();
607+
} finally {
608+
this.closed.set(true);
609+
}
605610
}
606611
}
607612

0 commit comments

Comments
 (0)