Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/main/java/com/rabbitmq/jms/admin/DestinationsStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.rabbitmq.jms.admin;

import java.io.Serializable;

/**
* Strategy for customise destinations name.
*
* @since 3.4.0
*/
public interface DestinationsStrategy extends Serializable {

default String getDurableTopicExchangeName() {
return "jms.durable.topic";
}

default String getTempTopicExchangeName() {
return "jms.temp.topic";
}

default String getDurableQueueExchangeName() {
return "jms.durable.queues";
}

default String getTempQueueExchangeName() {
return "jms.temp.queues";
}

default String getConsumerQueueNamePrefix() {
return "jms-cons-";
}

default String getDurableTopicSelectorExchangePrefix() {
return "jms-dutop-slx-";
}

default String getNonDurableTopicSelectorExchangePrefix() {
return "jms-ndtop-slx-";
}

default String getTempQueuePrefix() {
return "jms-temp-queue-";
}

default String getTempTopicPrefix() {
return "jms-temp-queue-";
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
*/
private AuthenticationMechanism authenticationMechanism = AuthenticationMechanism.PLAIN;

/**
* The destinations name strategy to use.
*
* @since 3.4.0
*/
private DestinationsStrategy destinationsStrategy = new DestinationsStrategy() {};

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -351,6 +358,7 @@ protected Connection createConnection(String username, String password, Connecti
.setRequeueOnTimeout(this.requeueOnTimeout)
.setKeepTextMessageType(this.keepTextMessageType)
.setReplyToStrategy(replyToStrategy)
.setDestinationsStrategy(destinationsStrategy)
);
logger.debug("Connection {} created.", conn);
return conn;
Expand Down Expand Up @@ -1151,6 +1159,10 @@ public void setKeepTextMessageType(boolean keepTextMessageType) {
this.keepTextMessageType = keepTextMessageType;
}

public void setDestinationsStrategy(DestinationsStrategy destinationsStrategy) {
this.destinationsStrategy = destinationsStrategy;
}

@FunctionalInterface
private interface ConnectionCreator {
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;
Expand Down
22 changes: 9 additions & 13 deletions src/main/java/com/rabbitmq/jms/admin/RMQDestination.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ public class RMQDestination implements Queue, Topic, Destination, Referenceable,

private static final String RABBITMQ_AMQ_TOPIC_EXCHANGE_NAME = "amq.topic";
private static final String RABBITMQ_AMQ_TOPIC_EXCHANGE_TYPE = "topic"; // standard topic exchange type in RabbitMQ
private static final String JMS_DURABLE_TOPIC_EXCHANGE_NAME = "jms.durable.topic"; // fixed topic exchange in RabbitMQ for jms traffic
private static final String JMS_TEMP_TOPIC_EXCHANGE_NAME = "jms.temp.topic"; // fixed topic exchange in RabbitMQ for jms traffic

private static final String RABBITMQ_UNNAMED_EXCHANGE = "";
private static final String RABBITMQ_AMQ_DIRECT_EXCHANGE_NAME = "amq.direct";
private static final String RABBITMQ_AMQ_DIRECT_EXCHANGE_TYPE = "direct"; // standard direct exchange type in RabbitMQ
private static final String JMS_DURABLE_QUEUE_EXCHANGE_NAME = "jms.durable.queues"; // fixed queue exchange in RabbitMQ for jms traffic
private static final String JMS_TEMP_QUEUE_EXCHANGE_NAME = "jms.temp.queues"; // fixed queue exchange in RabbitMQ for jms traffic

// Would like all these to be final, but we need to allow set them
private String destinationName;
Expand Down Expand Up @@ -67,8 +63,8 @@ public RMQDestination() {
* @param isQueue true if this represent a queue
* @param isTemporary true if this is a temporary destination
*/
public RMQDestination(String destName, boolean isQueue, boolean isTemporary) {
this(destName, isQueue, isTemporary, null);
public RMQDestination(String destName, boolean isQueue, boolean isTemporary, DestinationsStrategy destinationsStrategy) {
this(destName, isQueue, isTemporary, null, destinationsStrategy);
}

/**
Expand All @@ -78,15 +74,15 @@ public RMQDestination(String destName, boolean isQueue, boolean isTemporary) {
* @param isTemporary true if this is a temporary destination
* @param queueDeclareArguments arguments to use when declaring the AMQP queue
*/
public RMQDestination(String destName, boolean isQueue, boolean isTemporary, Map<String, Object> queueDeclareArguments) {
this(destName, false, queueOrTopicExchangeName(isQueue, isTemporary), destName, destName, isQueue, isTemporary, queueDeclareArguments);
public RMQDestination(String destName, boolean isQueue, boolean isTemporary, Map<String, Object> queueDeclareArguments, DestinationsStrategy destinationsStrategy) {
this(destName, false, queueOrTopicExchangeName(isQueue, isTemporary, destinationsStrategy), destName, destName, isQueue, isTemporary, queueDeclareArguments);
}

private static String queueOrTopicExchangeName(boolean isQueue, boolean isTemporary) {
if (isQueue & isTemporary) return JMS_TEMP_QUEUE_EXCHANGE_NAME;
else if (isQueue & !isTemporary) return JMS_DURABLE_QUEUE_EXCHANGE_NAME;
else if (!isQueue & isTemporary) return JMS_TEMP_TOPIC_EXCHANGE_NAME;
else /* if (!isQueue & !isTemporary) */ return JMS_DURABLE_TOPIC_EXCHANGE_NAME;
private static String queueOrTopicExchangeName(boolean isQueue, boolean isTemporary, DestinationsStrategy destinationsStrategy) {
if (isQueue & isTemporary) return destinationsStrategy.getTempQueueExchangeName();
else if (isQueue & !isTemporary) return destinationsStrategy.getDurableQueueExchangeName();
else if (!isQueue & isTemporary) return destinationsStrategy.getTempTopicExchangeName();
else /* if (!isQueue & !isTemporary) */ return destinationsStrategy.getDurableTopicExchangeName();
}

private static String queueOrTopicExchangeType(boolean isQueue) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/rabbitmq/jms/admin/RMQObjectFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class RMQObjectFactory implements ObjectFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(RMQObjectFactory.class);

private final DestinationsStrategy destinationsStrategy = new DestinationsStrategy(){};

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -263,7 +265,7 @@ public Object createDestination(Reference ref, Hashtable<?, ?> environment, Name
String amqpQueueName = getStringProperty(ref, environment, "amqpQueueName", true, null);
return new RMQDestination(dname, amqpExchangeName, amqpRoutingKey, amqpQueueName);
} else {
return new RMQDestination(dname, !topic, false, queueDeclareArguments);
return new RMQDestination(dname, !topic, false, queueDeclareArguments, destinationsStrategy);
}
}

Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.jms.admin.DestinationsStrategy;
import com.rabbitmq.jms.util.WhiteListObjectInputStream;

import jakarta.jms.Message;
Expand Down Expand Up @@ -118,6 +119,13 @@ public class ConnectionParams {
*/
private ReplyToStrategy replyToStrategy = DefaultReplyToStrategy.INSTANCE;

/**
* The destinations name strategy to use.
*
* @since 3.4.0
*/
private DestinationsStrategy destinationsStrategy = new DestinationsStrategy() {};

public Connection getRabbitConnection() {
return rabbitConnection;
}
Expand Down Expand Up @@ -261,4 +269,13 @@ public ConnectionParams setReplyToStrategy(final ReplyToStrategy replyToStrategy
public ReplyToStrategy getReplyToStrategy() {
return replyToStrategy;
}

public ConnectionParams setDestinationsStrategy(DestinationsStrategy destinationsStrategy) {
this.destinationsStrategy = destinationsStrategy;
return this;
}

public DestinationsStrategy getDestinationsStrategy() {
return destinationsStrategy;
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import com.rabbitmq.jms.admin.DestinationsStrategy;
import jakarta.jms.*;
import jakarta.jms.IllegalStateException;

Expand Down Expand Up @@ -162,6 +163,13 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
*/
private final ReplyToStrategy replyToStrategy;

/**
* The destinations name strategy to use.
*
* @since 3.4.0
*/
private final DestinationsStrategy destinationsStrategy;

/**
* Creates an RMQConnection object.
* @param connectionParams parameters for this connection
Expand Down Expand Up @@ -190,6 +198,7 @@ public RMQConnection(ConnectionParams connectionParams) {
this.keepTextMessageType = connectionParams.isKeepTextMessageType();
this.delayedMessageService = new DelayedMessageService();
this.replyToStrategy = connectionParams.getReplyToStrategy();
this.destinationsStrategy = connectionParams.getDestinationsStrategy();
}

/**
Expand Down Expand Up @@ -247,6 +256,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
.setKeepTextMessageType(this.keepTextMessageType)
.setDelayedMessageService(this.delayedMessageService)
.setReplyToStrategy(this.replyToStrategy)
.setDestinationsStrategy(this.destinationsStrategy)
);
this.sessions.add(session);
return session;
Expand Down
24 changes: 16 additions & 8 deletions src/main/java/com/rabbitmq/jms/client/RMQSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Copyright (c) 2013-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
package com.rabbitmq.jms.client;

import com.rabbitmq.jms.admin.DestinationsStrategy;
import com.rabbitmq.jms.client.Subscription.Context;
import com.rabbitmq.jms.client.Subscription.PostAction;
import java.io.IOException;
Expand Down Expand Up @@ -226,6 +227,12 @@ public class RMQSession implements Session, QueueSession, TopicSession {
*/
private final ReplyToStrategy replyToStrategy;

/**
* The destinations name strategy to use.
*
* @since 3.4.0
*/
private final DestinationsStrategy destinationsStrategy;

static boolean validateSessionMode(int sessionMode) {
return sessionMode >= 0 && sessionMode <= CLIENT_INDIVIDUAL_ACKNOWLEDGE;
Expand Down Expand Up @@ -273,6 +280,7 @@ public RMQSession(SessionParams sessionParams) throws JMSException {

this.replyToStrategy = sessionParams.getReplyToStrategy() == null ?
DefaultReplyToStrategy.INSTANCE : sessionParams.getReplyToStrategy();
this.destinationsStrategy = sessionParams.getDestinationsStrategy();

if (transacted) {
this.acknowledgeMode = Session.SESSION_TRANSACTED;
Expand Down Expand Up @@ -803,8 +811,8 @@ private RMQMessageConsumer createConsumerInternal(RMQDestination dest, String uu
return consumer;
}

private static String generateJmsConsumerQueueName() {
return Util.generateUUID("jms-cons-");
private String generateJmsConsumerQueueName() {
return Util.generateUUID(this.destinationsStrategy.getConsumerQueueNamePrefix());
}

/**
Expand All @@ -823,15 +831,15 @@ String getSelectionExchange(boolean durableSubscriber) throws IOException {

private String getDurableTopicSelectorExchange() throws IOException {
if (this.durableTopicSelectorExchange==null) {
this.durableTopicSelectorExchange = Util.generateUUID("jms-dutop-slx-");
this.durableTopicSelectorExchange = Util.generateUUID(this.destinationsStrategy.getDurableTopicSelectorExchangePrefix());
}
this.channel.exchangeDeclare(this.durableTopicSelectorExchange, JMS_TOPIC_SELECTOR_EXCHANGE_TYPE, true, true, RJMS_SELECTOR_EXCHANGE_ARGS);
return this.durableTopicSelectorExchange;
}

private String getNonDurableTopicSelectorExchange() throws IOException {
if (this.nonDurableTopicSelectorExchange==null) {
this.nonDurableTopicSelectorExchange = Util.generateUUID("jms-ndtop-slx-");
this.nonDurableTopicSelectorExchange = Util.generateUUID(this.destinationsStrategy.getNonDurableTopicSelectorExchangePrefix());
}
this.channel.exchangeDeclare(this.nonDurableTopicSelectorExchange, JMS_TOPIC_SELECTOR_EXCHANGE_TYPE, false, true, RJMS_SELECTOR_EXCHANGE_ARGS);
return this.nonDurableTopicSelectorExchange;
Expand Down Expand Up @@ -891,7 +899,7 @@ public MessageConsumer createConsumer(Destination destination, String messageSel
@Override
public Queue createQueue(String queueName) throws JMSException {
illegalStateExceptionIfClosed();
RMQDestination dest = new RMQDestination(queueName, true, false);
RMQDestination dest = new RMQDestination(queueName, true, false, this.destinationsStrategy);
declareRMQQueue(dest, null, false, true);
return dest;
}
Expand Down Expand Up @@ -992,7 +1000,7 @@ void declareRMQQueue(RMQDestination dest, String queueNameOverride, boolean dura
@Override
public Topic createTopic(String topicName) throws JMSException {
illegalStateExceptionIfClosed();
RMQDestination dest = new RMQDestination(topicName, false, false);
RMQDestination dest = new RMQDestination(topicName, false, false, this.destinationsStrategy);
declareTopic(dest);
return dest;
}
Expand Down Expand Up @@ -1124,7 +1132,7 @@ void closeBrowsingChannel(Channel chan) {
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
illegalStateExceptionIfClosed();
return new RMQDestination(Util.generateUUID("jms-temp-queue-"), true, true);
return new RMQDestination(Util.generateUUID(this.destinationsStrategy.getTempQueuePrefix()), true, true, this.destinationsStrategy);
}

/**
Expand All @@ -1133,7 +1141,7 @@ public TemporaryQueue createTemporaryQueue() throws JMSException {
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
illegalStateExceptionIfClosed();
return new RMQDestination(Util.generateUUID("jms-temp-topic-"), false, true);
return new RMQDestination(Util.generateUUID(this.destinationsStrategy.getTempTopicPrefix()), false, true, this.destinationsStrategy);
}

/**
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/SessionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.rabbitmq.jms.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.jms.admin.DestinationsStrategy;
import com.rabbitmq.jms.util.WhiteListObjectInputStream;

import jakarta.jms.Message;
Expand Down Expand Up @@ -115,6 +116,13 @@ public class SessionParams {
*/
private ReplyToStrategy replyToStrategy = DefaultReplyToStrategy.INSTANCE;

/**
* The destinations name strategy to use.
*
* @since 3.4.0
*/
private DestinationsStrategy destinationsStrategy = new DestinationsStrategy() {};

public RMQConnection getConnection() {
return connection;
}
Expand Down Expand Up @@ -267,4 +275,13 @@ public SessionParams setReplyToStrategy(final ReplyToStrategy replyToStrategy) {
public ReplyToStrategy getReplyToStrategy() {
return replyToStrategy;
}

public SessionParams setDestinationsStrategy(DestinationsStrategy destinationsStrategy) {
this.destinationsStrategy = destinationsStrategy;
return this;
}

public DestinationsStrategy getDestinationsStrategy() {
return destinationsStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.rabbitmq.integration.tests;

import com.rabbitmq.TestUtils.SkipIfDelayedMessageExchangePluginNotActivated;
import com.rabbitmq.jms.admin.DestinationsStrategy;
import com.rabbitmq.jms.admin.RMQDestination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
Expand All @@ -28,7 +29,7 @@ public class DelayedAMQPQueueMessageIT extends AbstractAmqpITQueue {

@BeforeEach
void init() {
destination = new RMQDestination(queueName, true, false);
destination = new RMQDestination(queueName, true, false, new DestinationsStrategy() {});
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.TestUtils;
import com.rabbitmq.jms.admin.DestinationsStrategy;
import com.rabbitmq.jms.admin.RMQDestination;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testRecoverTextMessageSyncRedeliveryCountShouldBeSetWithQuorumQueue(
String queueName = TestUtils.queueName(info);
Queue queue =
new RMQDestination(
queueName, true, false, Collections.singletonMap("x-queue-type", "quorum"));
queueName, true, false, Collections.singletonMap("x-queue-type", "quorum"), new DestinationsStrategy() {});
try {
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
Expand Down
Loading