diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/MulticastRule.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/MulticastRule.java
index 1c2cc674903..b85897b0935 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/MulticastRule.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/MulticastRule.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015-2020 the original author or authors.
+ * Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,6 @@
import org.springframework.integration.ip.util.SocketTestUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
-import org.springframework.util.SocketUtils;
/**
* @author Artem Bilan
@@ -76,7 +75,7 @@ private NetworkInterface checkMulticast() throws Exception {
}
try {
MulticastSocket socket = new MulticastSocket();
- socket.joinGroup(new InetSocketAddress(this.group, SocketUtils.findAvailableUdpPort()), nic);
+ socket.joinGroup(new InetSocketAddress(this.group, 0), nic);
socket.close();
}
catch (Exception e) {
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/UdpChannelAdapterTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/UdpChannelAdapterTests.java
index b67427f294b..c7b8bc244c8 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/UdpChannelAdapterTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/UdpChannelAdapterTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -50,7 +50,6 @@
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
-import org.springframework.util.SocketUtils;
/**
*
@@ -284,8 +283,7 @@ public void testMulticastReceiver() throws Exception {
}
}
- DatagramSocket datagramSocket =
- new DatagramSocket(SocketUtils.findAvailableUdpPort(), inetAddress);
+ DatagramSocket datagramSocket = new DatagramSocket(0, inetAddress);
datagramSocket.send(packet);
datagramSocket.close();
diff --git a/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java b/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java
index c169671bf73..24a29a1152d 100644
--- a/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java
+++ b/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015-2021 the original author or authors.
+ * Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.jupiter.api.AfterAll;
@@ -57,7 +58,6 @@
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.util.SocketUtils;
/**
* @author Artem Bilan
@@ -73,7 +73,6 @@ public class StompServerIntegrationTests {
@BeforeAll
public static void setup() throws Exception {
- int port = SocketUtils.findAvailableTcpPort(61613);
ConfigurationImpl configuration =
new ConfigurationImpl()
.setName("embedded-server")
@@ -81,14 +80,14 @@ public static void setup() throws Exception {
.setSecurityEnabled(false)
.setJMXManagementEnabled(false)
.setJournalDatasync(false)
- .addAcceptorConfiguration("stomp", "tcp://127.0.0.1:" + port)
+ .addAcceptorConfiguration("stomp", "tcp://127.0.0.1:" + TransportConstants.DEFAULT_STOMP_PORT)
.addAddressesSetting("#",
new AddressSettings()
.setDeadLetterAddress(SimpleString.toSimpleString("dla"))
.setExpiryAddress(SimpleString.toSimpleString("expiry")));
broker.setConfiguration(configuration).start();
- stompClient = new ReactorNettyTcpStompClient("127.0.0.1", port);
+ stompClient = new ReactorNettyTcpStompClient("127.0.0.1", TransportConstants.DEFAULT_STOMP_PORT);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
diff --git a/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests-context.xml b/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests-context.xml
index 61846f157a4..34139d17d91 100644
--- a/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests-context.xml
+++ b/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests-context.xml
@@ -9,9 +9,7 @@
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
-
-
-
+
@@ -42,7 +40,7 @@
-
+
diff --git a/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests.java b/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests.java
index 9909a0729f8..e48b46b6e73 100644
--- a/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests.java
+++ b/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterParserTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,17 +17,18 @@
package org.springframework.integration.syslog.config;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.nio.charset.StandardCharsets;
import javax.net.SocketFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,19 +43,21 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import org.springframework.util.ReflectionUtils;
/**
* @author Gary Russell
+ * @author Artem Bilan
+ *
* @since 3.0
*
*/
-@ContextConfiguration
-@RunWith(SpringJUnit4ClassRunner.class)
+@SpringJUnitConfig
public class SyslogReceivingChannelAdapterParserTests {
- @Autowired @Qualifier("foo.adapter")
+ @Autowired
+ @Qualifier("foo.adapter")
private UdpSyslogReceivingChannelAdapter adapter1;
@Autowired
@@ -63,7 +66,8 @@ public class SyslogReceivingChannelAdapterParserTests {
@Autowired
private PollableChannel foo;
- @Autowired @Qualifier("explicitUdp.adapter")
+ @Autowired
+ @Qualifier("explicitUdp.adapter")
private UdpSyslogReceivingChannelAdapter explicitUdpAdapter;
@Autowired
@@ -81,7 +85,8 @@ public class SyslogReceivingChannelAdapterParserTests {
@Autowired
private RFC5424MessageConverter rfc5424;
- @Autowired @Qualifier("bar.adapter")
+ @Autowired
+ @Qualifier("bar.adapter")
private TcpSyslogReceivingChannelAdapter adapter2;
@Autowired
@@ -95,8 +100,10 @@ public class SyslogReceivingChannelAdapterParserTests {
@Test
public void testSimplestUdp() throws Exception {
- int port = TestUtils.getPropertyValue(adapter1, "udpAdapter.port", Integer.class);
- byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes("UTF-8");
+ Method getPort = ReflectionUtils.findMethod(UdpSyslogReceivingChannelAdapter.class, "getPort");
+ ReflectionUtils.makeAccessible(getPort);
+ int port = (int) ReflectionUtils.invokeMethod(getPort, this.adapter1);
+ byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes(StandardCharsets.UTF_8);
DatagramPacket packet = new DatagramPacket(buf, buf.length, new InetSocketAddress("localhost", port));
DatagramSocket socket = new DatagramSocket();
Thread.sleep(1000);
@@ -108,13 +115,13 @@ public void testSimplestUdp() throws Exception {
}
@Test
- public void testExplicitChannelUdp() throws Exception {
+ public void testExplicitChannelUdp() {
assertThat(TestUtils.getPropertyValue(foobar, "udpAdapter.port")).isEqualTo(1514);
assertThat(TestUtils.getPropertyValue(foobar, "outputChannel")).isSameAs(foo);
}
@Test
- public void testExplicitUdp() throws Exception {
+ public void testExplicitUdp() {
assertThat(TestUtils.getPropertyValue(explicitUdpAdapter, "outputChannel")).isSameAs(explicitUdp);
}
@@ -133,9 +140,9 @@ public void testFullBoatUdp() {
public void testSimplestTcp() throws Exception {
AbstractServerConnectionFactory connectionFactory = TestUtils.getPropertyValue(adapter2, "connectionFactory",
AbstractServerConnectionFactory.class);
- int port = connectionFactory.getPort();
waitListening(connectionFactory, 10000L);
- byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE\n".getBytes("UTF-8");
+ byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE\n".getBytes(StandardCharsets.UTF_8);
+ int port = connectionFactory.getPort();
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
Thread.sleep(1000);
socket.getOutputStream().write(buf);
@@ -159,57 +166,40 @@ public void testFullBoatTcp() {
@Test
public void testPortOnUdpChild() {
- try {
- new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail1-context.xml", this.getClass())
- .close();
- fail("Expected exception");
- }
- catch (BeanDefinitionParsingException e) {
- assertThat(e.getMessage().startsWith(
- "Configuration problem: When child element 'udp-attributes' is present, 'port' must be defined " +
- "there"))
- .isTrue();
- }
+ assertThatExceptionOfType(BeanDefinitionParsingException.class)
+ .isThrownBy(() ->
+ new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail1-context.xml",
+ getClass()))
+ .withMessageStartingWith(
+ "Configuration problem: " +
+ "When child element 'udp-attributes' is present, 'port' must be defined there");
}
@Test
public void testPortWithTCPFactory() {
- try {
- new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail2-context.xml", this.getClass())
- .close();
- fail("Expected exception");
- }
- catch (BeanCreationException e) {
- assertThat(e.getCause().getMessage()).isEqualTo("Cannot specify both 'port' and 'connectionFactory'");
- }
+ assertThatExceptionOfType(BeanCreationException.class)
+ .isThrownBy(() ->
+ new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail2-context.xml",
+ getClass()))
+ .withStackTraceContaining("Cannot specify both 'port' and 'connectionFactory'");
}
@Test
public void testUdpChildWithTcp() {
- try {
- new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail3-context.xml", this.getClass())
- .close();
- fail("Expected exception");
- }
- catch (BeanCreationException e) {
- e.printStackTrace();
-
- assertThat(e.getCause().getMessage())
- .isEqualTo("Cannot specify 'udp-attributes' when the protocol is 'tcp'");
- }
+ assertThatExceptionOfType(BeanCreationException.class)
+ .isThrownBy(() ->
+ new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail3-context.xml",
+ getClass()))
+ .withStackTraceContaining("Cannot specify 'udp-attributes' when the protocol is 'tcp'");
}
@Test
public void testUDPWithTCPFactory() {
- try {
- new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail4-context.xml", this.getClass())
- .close();
- fail("Expected exception");
- }
- catch (BeanCreationException e) {
- assertThat(e.getCause().getMessage())
- .isEqualTo("Cannot specify 'connection-factory' unless the protocol is 'tcp'");
- }
+ assertThatExceptionOfType(BeanCreationException.class)
+ .isThrownBy(() ->
+ new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail4-context.xml",
+ getClass()))
+ .withStackTraceContaining("Cannot specify 'connection-factory' unless the protocol is 'tcp'");
}
public static class PassThruConverter implements MessageConverter {
@@ -229,7 +219,7 @@ public Message> fromSyslog(Message> syslog) {
* @throws IllegalStateException
*/
private void waitListening(AbstractServerConnectionFactory serverConnectionFactory, Long delay)
- throws IllegalStateException {
+ throws IllegalStateException {
if (delay == null) {
delay = 100L;
}
diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
index f38f48eeb5c..9b5956fca0d 100644
--- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
+++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 the original author or authors.
+ * Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -212,7 +212,7 @@ public int getBackendPort() {
/**
* Return the address an {@code inproc} control socket is bound or null if this proxy has not been started yet.
- * @return the the address for control socket or null
+ * @return the address for control socket or null
*/
@Nullable
public String getControlAddress() {
@@ -222,7 +222,7 @@ public String getControlAddress() {
/**
* Return the address an {@code inproc} capture socket is bound or null if this proxy has not been started yet
* or {@link #captureAddress} is false.
- * @return the the address for capture socket or null
+ * @return the address for capture socket or null
*/
@Nullable
public String getCaptureAddress() {
diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java
index 7dace41972a..834f1e08646 100644
--- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java
+++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 the original author or authors.
+ * Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
package org.springframework.integration.zeromq.dsl;
+import java.util.function.Supplier;
+
import org.zeromq.SocketType;
import org.zeromq.ZContext;
@@ -53,6 +55,18 @@ public static ZeroMqChannelSpec pubSubZeroMqChannel(ZContext context) {
* @return the spec.
*/
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, String connectUrl) {
+ return outboundChannelAdapter(context, () -> connectUrl);
+ }
+
+ /**
+ * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}
+ * and connection URL supplier.
+ * @param context the {@link ZContext} to use.
+ * @param connectUrl the supplier for URL to connect a ZeroMq socket to.
+ * @return the spec.
+ * @since 5.5.9
+ */
+ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, Supplier connectUrl) {
return new ZeroMqMessageHandlerSpec(context, connectUrl);
}
@@ -70,6 +84,21 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context,
return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType);
}
+ /**
+ * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext},
+ * connection URL supplier and {@link SocketType}.
+ * @param context the {@link ZContext} to use.
+ * @param connectUrl the supplier for URL to connect a ZeroMq socket to.
+ * @param socketType the {@link SocketType} for ZeroMq socket.
+ * @return the spec.
+ * @since 5.5.9
+ */
+ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, Supplier connectUrl,
+ SocketType socketType) {
+
+ return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType);
+ }
+
/**
* Create an instance of {@link ZeroMqMessageProducerSpec} for the provided {@link ZContext}.
* @param context the {@link ZContext} to use.
diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java
index 0bcc294c392..57506db768b 100644
--- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java
+++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 the original author or authors.
+ * Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
@@ -47,9 +48,20 @@ public class ZeroMqMessageHandlerSpec
* @param connectUrl the URL to connect the socket to.
*/
protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) {
+ this(context, () -> connectUrl);
+ }
+
+ /**
+ * Create an instance based on the provided {@link ZContext} and connection string supplier.
+ * @param context the {@link ZContext} to use for creating sockets.
+ * @param connectUrl the supplier for URL to connect the socket to.
+ * @since 5.5.9
+ */
+ protected ZeroMqMessageHandlerSpec(ZContext context, Supplier connectUrl) {
super(new ZeroMqMessageHandler(context, connectUrl));
}
+
/**
* Create an instance based on the provided {@link ZContext}, connection string and {@link SocketType}.
* @param context the {@link ZContext} to use for creating sockets.
@@ -58,6 +70,17 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) {
* only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.
*/
protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketType socketType) {
+ this(context, () -> connectUrl, socketType);
+ }
+
+ /**
+ * Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}.
+ * @param context the {@link ZContext} to use for creating sockets.
+ * @param connectUrl the supplier for URL to connect the socket to.
+ * @param socketType the {@link SocketType} to use;
+ * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.
+ */
+ protected ZeroMqMessageHandlerSpec(ZContext context, Supplier connectUrl, SocketType socketType) {
super(new ZeroMqMessageHandler(context, connectUrl, socketType));
}
diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java
index 5a9e38389ce..61ed1658757 100644
--- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java
+++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 the original author or authors.
+ * Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,9 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
@@ -36,10 +38,12 @@
import org.springframework.integration.mapping.ConvertingBytesMessageMapper;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
+import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
+import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@@ -61,11 +65,14 @@
*
* @since 5.4
*/
-public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler {
+public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler
+ implements ManageableLifecycle {
private static final List VALID_SOCKET_TYPES =
Arrays.asList(SocketType.PAIR, SocketType.PUSH, SocketType.PUB);
+ private final AtomicBoolean running = new AtomicBoolean();
+
private final Scheduler publisherScheduler = Schedulers.newSingle("zeroMqMessageHandlerScheduler");
private final Mono socketMono;
@@ -80,6 +87,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler {
private volatile boolean initialized;
+ private volatile Disposable socketMonoSubscriber;
+
/**
* Create an instance based on the provided {@link ZContext} and connection string.
* @param context the {@link ZContext} to use for creating sockets.
@@ -89,6 +98,16 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl) {
this(context, connectUrl, SocketType.PAIR);
}
+ /**
+ * Create an instance based on the provided {@link ZContext} and connection string supplier.
+ * @param context the {@link ZContext} to use for creating sockets.
+ * @param connectUrl the supplier for URL to connect the socket to.
+ * @since 5.5.9
+ */
+ public ZeroMqMessageHandler(ZContext context, Supplier connectUrl) {
+ this(context, connectUrl, SocketType.PAIR);
+ }
+
/**
* Create an instance based on the provided {@link ZContext}, connection string and {@link SocketType}.
* @param context the {@link ZContext} to use for creating sockets.
@@ -97,15 +116,29 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl) {
* only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.
*/
public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType socketType) {
- Assert.notNull(context, "'context' must not be null");
+ this(context, () -> connectUrl, socketType);
Assert.hasText(connectUrl, "'connectUrl' must not be empty");
+ }
+
+
+ /**
+ * Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}.
+ * @param context the {@link ZContext} to use for creating sockets.
+ * @param connectUrl the supplier for URL to connect the socket to.
+ * @param socketType the {@link SocketType} to use;
+ * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.
+ * @since 5.5.9
+ */
+ public ZeroMqMessageHandler(ZContext context, Supplier connectUrl, SocketType socketType) {
+ Assert.notNull(context, "'context' must not be null");
+ Assert.notNull(connectUrl, "'connectUrl' must not be null");
Assert.state(VALID_SOCKET_TYPES.contains(socketType),
() -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES);
this.socketMono =
Mono.just(context.createSocket(socketType))
.publishOn(this.publisherScheduler)
.doOnNext((socket) -> this.socketConfigurer.accept(socket))
- .doOnNext((socket) -> socket.connect(connectUrl))
+ .doOnNext((socket) -> socket.connect(connectUrl.get()))
.cache()
.publishOn(this.publisherScheduler);
}
@@ -176,10 +209,28 @@ protected void onInit() {
messageConverter.afterPropertiesSet();
this.messageMapper = new ConvertingBytesMessageMapper(messageConverter);
}
- this.socketMono.subscribe();
this.initialized = true;
}
+ @Override
+ public void start() {
+ if (!this.running.getAndSet(true)) {
+ this.socketMonoSubscriber = this.socketMono.subscribe();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (this.running.getAndSet(false)) {
+ this.socketMonoSubscriber.dispose();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.running.get();
+ }
+
@Override
protected Mono handleMessageInternal(Message> message) {
Assert.state(this.initialized, "the message handler is not initialized yet or already destroyed");
@@ -209,6 +260,7 @@ public void destroy() {
this.initialized = false;
super.destroy();
this.socketMono.doOnNext(ZMQ.Socket::close).block();
+ this.socketMonoSubscriber.dispose();
this.publisherScheduler.dispose();
}
diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/dsl/ZeroMqDslTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/dsl/ZeroMqDslTests.java
index ff65b8fc398..d268e3d1940 100644
--- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/dsl/ZeroMqDslTests.java
+++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/dsl/ZeroMqDslTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 the original author or authors.
+ * Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,7 +44,6 @@
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-import org.springframework.util.SocketUtils;
/**
* @author Artem Bilan
@@ -55,8 +54,6 @@
@DirtiesContext
public class ZeroMqDslTests {
- private static final int PROXY_PUB_PORT = SocketUtils.findAvailableTcpPort();
-
@Autowired
ZContext context;
@@ -82,10 +79,10 @@ void testZeroMqDslIntegration() throws InterruptedException {
for (int i = 0; i < 2; i++) {
IntegrationFlow consumerFlow =
IntegrationFlows.from(
- ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
- .connectUrl("tcp://localhost:" + this.subPubZeroMqProxy.getBackendPort())
- .topics("someTopic")
- .consumeDelay(Duration.ofMillis(100)))
+ ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
+ .connectUrl("tcp://localhost:" + this.subPubZeroMqProxy.getBackendPort())
+ .topics("someTopic")
+ .consumeDelay(Duration.ofMillis(100)))
.channel(ZeroMq.zeroMqChannel(this.context).zeroMqProxy(this.pullPushZeroMqProxy))
.transform(Transformers.objectToString())
.handle(results::offer)
@@ -130,9 +127,7 @@ ZContext context() {
@Bean
ZeroMqProxy subPubZeroMqProxy() {
- ZeroMqProxy zeroMqProxy = new ZeroMqProxy(context(), ZeroMqProxy.Type.SUB_PUB);
- zeroMqProxy.setFrontendPort(PROXY_PUB_PORT);
- return zeroMqProxy;
+ return new ZeroMqProxy(context(), ZeroMqProxy.Type.SUB_PUB);
}
@Bean
@@ -141,10 +136,14 @@ ZeroMqProxy pullPushZeroMqProxy() {
}
@Bean
- IntegrationFlow publishToZeroMqPubSubFlow() {
+ IntegrationFlow publishToZeroMqPubSubFlow(ZeroMqProxy subPubZeroMqProxy) {
return flow ->
- flow.handle(ZeroMq.outboundChannelAdapter(context(), "tcp://localhost:" + PROXY_PUB_PORT,
- SocketType.PUB)
+ flow.handle(ZeroMq.outboundChannelAdapter(context(),
+ () -> {
+ await().until(() -> subPubZeroMqProxy.getFrontendPort() > 0);
+ return "tcp://localhost:" + subPubZeroMqProxy.getFrontendPort();
+ },
+ SocketType.PUB)
.topic("someTopic"));
}
diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java
index 47f18a1d939..e0fdec0f1c2 100644
--- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java
+++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 the original author or authors.
+ * Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -71,6 +71,7 @@ void testMessageHandlerForPair() {
Mono socketMono = TestUtils.getPropertyValue(messageHandler, "socketMono", Mono.class);
ZMQ.Socket socketInUse = socketMono.block(Duration.ofSeconds(10));
assertThat(socketInUse.getZapDomain()).isEqualTo("global");
+ messageHandler.start();
Message> testMessage = new GenericMessage<>("test");
messageHandler.handleMessage(testMessage).subscribe();
@@ -99,6 +100,7 @@ void testMessageHandlerForPubSub() {
new FunctionExpression>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
messageHandler.afterPropertiesSet();
+ messageHandler.start();
Message> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();
@@ -137,6 +139,7 @@ void testMessageHandlerForPushPullOverProxy() {
messageHandler.setBeanFactory(mock(BeanFactory.class));
messageHandler.setMessageConverter(new ByteArrayMessageConverter());
messageHandler.afterPropertiesSet();
+ messageHandler.start();
Message> testMessage = new GenericMessage<>("test".getBytes());
messageHandler.handleMessage(testMessage).subscribe();
diff --git a/src/reference/asciidoc/testing.adoc b/src/reference/asciidoc/testing.adoc
index ff8ec73163c..88f6accbdeb 100644
--- a/src/reference/asciidoc/testing.adoc
+++ b/src/reference/asciidoc/testing.adoc
@@ -69,56 +69,6 @@ The `createTestApplicationContext()` factory method produces a `TestApplicationC
See the https://docs.spring.io/spring-integration/api/org/springframework/integration/test/util/TestUtils.html[Javadoc] of other `TestUtils` methods for more information about this class.
-==== Using the `SocketUtils` Class
-
-The https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/util/SocketUtils.html[`SocketUtils` class] provides several methods that select one or more random ports for exposing server-side components without conflicts, as the following example shows:
-
-====
-[source,xml]
-----
-
-
-
-
-
-
-
-----
-====
-
-The following example shows how the preceding configuration is used from the unit test:
-
-====
-[source,java]
-----
-@Autowired @Qualifier("syslog.adapter")
-private UdpSyslogReceivingChannelAdapter adapter;
-
-@Autowired
-private PollableChannel sysLogs;
-
-@Test
-public void testSimplestUdp() throws Exception {
- int port = TestUtils.getPropertyValue(adapter1, "udpAdapter.port", Integer.class);
- byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes("UTF-8");
- DatagramPacket packet = new DatagramPacket(buf, buf.length,
- new InetSocketAddress("localhost", port));
- DatagramSocket socket = new DatagramSocket();
- socket.send(packet);
- socket.close();
- Message> message = foo.receive(10000);
- assertNotNull(message);
-}
-----
-====
-
-NOTE: This technique is not foolproof.
-Some other process could be allocated the "`free`" port before your test opens it.
-It is generally more preferable to use server port `0`, let the operating system select the port for you, and then discover the selected port in your test.
-We have converted most framework tests to use this preferred technique.
-
==== Using `OnlyOnceTrigger`
https://docs.spring.io/spring-integration/api/org/springframework/integration/test/util/OnlyOnceTrigger.html[`OnlyOnceTrigger`] is useful for polling endpoints when you need to produce only one test message and verify the behavior without impacting other period messages.