diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java index 31799312bb8..1735dfcf098 100644 --- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java +++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java @@ -39,10 +39,14 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; public class TestNettyServer { + private static final Logger LOG = LoggerFactory.getLogger(TestNettyServer.class.getName()); + static final int CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec protected static Server server; protected static Transceiver transceiver; @@ -83,7 +87,7 @@ public static void initializeConnections(Consumer initializer) th public static void initializeConnections(Consumer serverInitializer, Consumer transceiverInitializer) throws Exception { - System.out.println("starting server..."); + LOG.info("starting server..."); channelInitializer = transceiverInitializer; mailService = new MailImpl(); Responder responder = new SpecificResponder(Mail.class, mailService); @@ -91,7 +95,7 @@ public static void initializeConnections(Consumer serverInitializ server.start(); int serverPort = server.getPort(); - System.out.println("server port : " + serverPort); + LOG.info("server port : {}", serverPort); transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), CONNECT_TIMEOUT_MILLIS, transceiverInitializer, null); @@ -144,23 +148,54 @@ void mixtureOfRequests() throws Exception { @Test void connectionsCount() throws Exception { + // It happens on a regular basis that the server still has a connection + // that is in the process of being terminated (previous test?). + // We wait for that to happen because otherwise this test will fail. + assertNumberOfConnectionsOnServer(1, 1000); + Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(server.getPort()), CONNECT_TIMEOUT_MILLIS, channelInitializer); Mail proxy2 = SpecificRequestor.getClient(Mail.class, transceiver2); proxy.fireandforget(createMessage()); proxy2.fireandforget(createMessage()); - assertEquals(2, ((NettyServer) server).getNumActiveConnections()); + assertNumberOfConnectionsOnServer(2, 0); transceiver2.close(); // Check the active connections with some retries as closing at the client // side might not take effect on the server side immediately + assertNumberOfConnectionsOnServer(1, 5000); + } + + /** + * Assert for the number of server connections. This does repeated checks (with + * timeout) if it not matches at first because closing at the client side might + * not take effect on the server side immediately. + * + * @param wantedNumberOfConnections How many do we want to have + * @param maxWaitMs Within how much time (0= immediately) + */ + private static void assertNumberOfConnectionsOnServer(int wantedNumberOfConnections, long maxWaitMs) + throws InterruptedException { int numActiveConnections = ((NettyServer) server).getNumActiveConnections(); - for (int i = 0; i < 50 && numActiveConnections == 2; ++i) { - System.out.println("Server still has 2 active connections; retrying..."); - Thread.sleep(100); - numActiveConnections = ((NettyServer) server).getNumActiveConnections(); + if (numActiveConnections == wantedNumberOfConnections) { + return; // We're good. + } + long startMs = System.currentTimeMillis(); + long waited = 0; + if (maxWaitMs > 0) { + boolean timeOut = false; + while (numActiveConnections != wantedNumberOfConnections && !timeOut) { + LOG.info("Server still has {} active connections (want {}, waiting for {}ms); retrying...", + numActiveConnections, wantedNumberOfConnections, waited); + Thread.sleep(100); + numActiveConnections = ((NettyServer) server).getNumActiveConnections(); + waited = System.currentTimeMillis() - startMs; + timeOut = waited > maxWaitMs; + } + LOG.info("Server has {} active connections", numActiveConnections); } - assertEquals(1, numActiveConnections); + assertEquals(wantedNumberOfConnections, numActiveConnections, + "Not the expected number of connections after a wait of " + waited + " ms"); } private Message createMessage() {