diff --git a/modules/solace/src/main/java/org/testcontainers/solace/Service.java b/modules/solace/src/main/java/org/testcontainers/solace/Service.java index 6ec7de44d08..9c9342ef789 100644 --- a/modules/solace/src/main/java/org/testcontainers/solace/Service.java +++ b/modules/solace/src/main/java/org/testcontainers/solace/Service.java @@ -4,10 +4,25 @@ * Services that are supported by Testcontainers implementation */ public enum Service { + /** + * Advanced Message Queuing Protocol + */ AMQP("amqp", 5672, "amqp", false), + /** + * Message Queuing Telemetry Transport + */ MQTT("mqtt", 1883, "tcp", false), + /** + * Representational State Transfer + */ REST("rest", 9000, "http", false), + /** + * Solace Message Format + */ SMF("smf", 55555, "tcp", true), + /** + * Solace Message Format with SSL + */ SMF_SSL("smf", 55443, "tcps", true); private final String name; diff --git a/modules/solace/src/main/java/org/testcontainers/solace/SolaceContainer.java b/modules/solace/src/main/java/org/testcontainers/solace/SolaceContainer.java index b2a00b32f31..00db0606536 100644 --- a/modules/solace/src/main/java/org/testcontainers/solace/SolaceContainer.java +++ b/modules/solace/src/main/java/org/testcontainers/solace/SolaceContainer.java @@ -65,11 +65,19 @@ public SolaceContainer(String dockerImageName) { this(DockerImageName.parse(dockerImageName)); } + /** + * Create a new solace container with the specified docker image. + * + * @param dockerImageName the image name that should be used. + */ public SolaceContainer(DockerImageName dockerImageName) { super(dockerImageName); dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); withCreateContainerCmdModifier(cmd -> { - cmd.getHostConfig().withShmSize(SHM_SIZE).withUlimits(new Ulimit[] { new Ulimit("nofile", 2448L, 6592L) }); + cmd + .getHostConfig() + .withShmSize(SHM_SIZE) + .withUlimits(new Ulimit[] { new Ulimit("nofile", 2448L, 1048576L) }); }); this.waitStrategy = Wait.forLogMessage(SOLACE_READY_MESSAGE, 1).withStartupTimeout(Duration.ofSeconds(60)); withExposedPorts(8080); @@ -103,6 +111,17 @@ private Transferable createConfigurationScript() { updateConfigScript(scriptBuilder, "create message-vpn " + vpn); updateConfigScript(scriptBuilder, "no shutdown"); updateConfigScript(scriptBuilder, "exit"); + updateConfigScript(scriptBuilder, "client-profile default message-vpn " + vpn); + updateConfigScript(scriptBuilder, "message-spool"); + updateConfigScript(scriptBuilder, "allow-guaranteed-message-send"); + updateConfigScript(scriptBuilder, "allow-guaranteed-message-receive"); + updateConfigScript(scriptBuilder, "allow-guaranteed-endpoint-create"); + updateConfigScript(scriptBuilder, "allow-guaranteed-endpoint-create-durability all"); + updateConfigScript(scriptBuilder, "exit"); + updateConfigScript(scriptBuilder, "exit"); + updateConfigScript(scriptBuilder, "message-spool message-vpn " + vpn); + updateConfigScript(scriptBuilder, "max-spool-usage 60000"); + updateConfigScript(scriptBuilder, "exit"); } // Configure username and password diff --git a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerAMQPTest.java b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerAMQPTest.java index 6c68d27d358..f7266fe8212 100644 --- a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerAMQPTest.java +++ b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerAMQPTest.java @@ -31,7 +31,7 @@ public class SolaceContainerAMQPTest { @Test public void testSolaceContainer() throws JMSException { try ( - SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2") + SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0") .withTopic(TOPIC_NAME, Service.AMQP) .withVpn("amqp-vpn") ) { diff --git a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerMQTTTest.java b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerMQTTTest.java index ab7d5ed56bb..07c08d32416 100644 --- a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerMQTTTest.java +++ b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerMQTTTest.java @@ -27,7 +27,7 @@ public class SolaceContainerMQTTTest { @Test public void testSolaceContainer() { try ( - SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2") + SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0") .withTopic(TOPIC_NAME, Service.MQTT) .withVpn("mqtt-vpn") ) { diff --git a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerRESTTest.java b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerRESTTest.java index f2722b986ca..7f54cec3f74 100644 --- a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerRESTTest.java +++ b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerRESTTest.java @@ -28,7 +28,7 @@ public class SolaceContainerRESTTest { @Test public void testSolaceContainer() throws IOException { try ( - SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2") + SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0") .withTopic(TOPIC_NAME, Service.REST) .withVpn("rest-vpn") ) { diff --git a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerSMFTest.java b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerSMFTest.java index d7ccd94998e..d7e0e8ea981 100644 --- a/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerSMFTest.java +++ b/modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerSMFTest.java @@ -1,11 +1,14 @@ package org.testcontainers.solace; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.ConsumerFlowProperties; +import com.solacesystems.jcsmp.EndpointProperties; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import com.solacesystems.jcsmp.Queue; import com.solacesystems.jcsmp.TextMessage; import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.XMLMessageConsumer; @@ -30,40 +33,75 @@ public class SolaceContainerSMFTest { private static final Topic TOPIC = JCSMPFactory.onlyInstance().createTopic("Topic/ActualTopic"); + private static final Queue QUEUE = JCSMPFactory.onlyInstance().createQueue("Queue"); + @Test public void testSolaceContainerWithSimpleAuthentication() { try ( // solaceContainerSetup { - SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2") + SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0") .withCredentials("user", "pass") - .withTopic("Topic/ActualTopic", Service.SMF) + .withTopic(TOPIC.getName(), Service.SMF) .withVpn("test_vpn") // } ) { solaceContainer.start(); JCSMPSession session = createSessionWithBasicAuth(solaceContainer); assertThat(session).isNotNull(); - assertThat(consumeMessageFromSolace(session)).isEqualTo(MESSAGE); + consumeMessageFromTopics(session); session.closeSession(); } } + @Test + public void testSolaceContainerWithCreateFlow() { + try ( + SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0") + .withCredentials("user", "pass") + .withTopic(TOPIC.getName(), Service.SMF) + .withVpn("test_vpn") + ) { + solaceContainer.start(); + JCSMPSession session = createSessionWithBasicAuth(solaceContainer); + assertThat(session).isNotNull(); + testCreateFlow(session); + session.closeSession(); + } + } + + private static void testCreateFlow(JCSMPSession session) { + try { + EndpointProperties endpointProperties = new EndpointProperties(); + endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE); + endpointProperties.setQuota(1000); + session.provision(QUEUE, endpointProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS); + session.addSubscription(QUEUE, TOPIC, JCSMPSession.WAIT_FOR_CONFIRM); + ConsumerFlowProperties flowProperties = new ConsumerFlowProperties().setEndpoint(QUEUE); + TestConsumer listener = new TestConsumer(); + session.createFlow(listener, flowProperties).start(); + publishMessageToSolaceTopic(session); + listener.waitForMessage(); + } catch (Exception e) { + throw new RuntimeException("Cannot process message using solace topic/queue: " + e.getMessage(), e); + } + } + @Test public void testSolaceContainerWithCertificates() { try ( // solaceContainerUsageSSL { - SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.6") + SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0") .withClientCert( MountableFile.forClasspathResource("solace.pem"), MountableFile.forClasspathResource("rootCA.crt") ) - .withTopic("Topic/ActualTopic", Service.SMF_SSL) + .withTopic(TOPIC.getName(), Service.SMF_SSL) // } ) { solaceContainer.start(); JCSMPSession session = createSessionWithCertificates(solaceContainer); assertThat(session).isNotNull(); - assertThat(consumeMessageFromSolace(session)).isEqualTo(MESSAGE); + consumeMessageFromTopics(session); session.closeSession(); } } @@ -112,7 +150,7 @@ private static JCSMPSession createSession(JCSMPProperties properties) { } } - private void publishMessageToSolace(JCSMPSession session) throws JCSMPException { + private static void publishMessageToSolaceTopic(JCSMPSession session) throws JCSMPException { XMLMessageProducer producer = session.getMessageProducer( new JCSMPStreamingPublishCorrelatingEventHandler() { @Override @@ -131,37 +169,49 @@ public void handleErrorEx(Object o, JCSMPException e, long l) { producer.send(msg, TOPIC); } - private String consumeMessageFromSolace(JCSMPSession session) { - CountDownLatch latch = new CountDownLatch(1); + private static void consumeMessageFromTopics(JCSMPSession session) { try { - String[] result = new String[1]; - XMLMessageConsumer cons = session.getMessageConsumer( - new XMLMessageListener() { - @Override - public void onReceive(BytesXMLMessage msg) { - if (msg instanceof TextMessage) { - TextMessage textMessage = (TextMessage) msg; - String message = textMessage.getText(); - result[0] = message; - LOGGER.info("TextMessage received: " + message); - } - latch.countDown(); - } - - @Override - public void onException(JCSMPException e) { - LOGGER.error("Exception received: " + e.getMessage()); - latch.countDown(); - } - } - ); + TestConsumer listener = new TestConsumer(); + XMLMessageConsumer cons = session.getMessageConsumer(listener); session.addSubscription(TOPIC); cons.start(); - publishMessageToSolace(session); - assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue(); - return result[0]; + publishMessageToSolaceTopic(session); + listener.waitForMessage(); } catch (Exception e) { - throw new RuntimeException("Cannot receive message from solace", e); + throw new RuntimeException("Cannot process message using solace: " + e.getMessage(), e); + } + } + + static class TestConsumer implements XMLMessageListener { + + private final CountDownLatch latch = new CountDownLatch(1); + + private String result; + + @Override + public void onReceive(BytesXMLMessage msg) { + if (msg instanceof TextMessage) { + TextMessage textMessage = (TextMessage) msg; + String message = textMessage.getText(); + result = message; + LOGGER.info("Message received: " + message); + } + latch.countDown(); + } + + @Override + public void onException(JCSMPException e) { + LOGGER.error("Exception received: " + e.getMessage()); + latch.countDown(); + } + + private void waitForMessage() { + try { + assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(result).isEqualTo(MESSAGE); + } catch (Exception e) { + throw new RuntimeException("Cannot receive message from solace: " + e.getMessage(), e); + } } } }