diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 129cf19beac..a3e742c9bdb 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1149,6 +1149,11 @@ property, when available, is noted below. **New in 3.6.2:** When enabled, a learner will close the quorum socket asynchronously. This is useful for TLS connections where closing a socket might take a long time, block the shutdown process, potentially delay a new leader election, and leave the quorum unavailabe. Closing the socket asynchronously avoids blocking the shutdown process despite the long socket closing time and a new leader election can be started while the socket being closed. The default is false. +* *learner.asyncSending* + (Jave system property only: **learner.asyncSending**) + **New in 3.7.0:** + The sending and receiving packets in Learner were done synchronously in a critical section. An untimely network issue could cause the followers to hang (see [ZOOKEEPER-3575](https://issues.apache.org/jira/browse/ZOOKEEPER-3575) and [ZOOKEEPER-4074](https://issues.apache.org/jira/browse/ZOOKEEPER-4074)). The new design moves sending packets in Learner to a separate thread and sends the packets asynchronously. The new design is enabled with this parameter (learner.asyncSending). The default setting is true. + * *leader.closeSocketAsync* (Java system property only: **leader.closeSocketAsync**) **New in 3.6.2:** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index ef36b2f9000..8037fc6e87e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -119,7 +119,7 @@ public Socket getSocket() { private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending"; - private static boolean asyncSending = Boolean.getBoolean(LEARNER_ASYNC_SENDING); + private static boolean asyncSending = System.getProperty(LEARNER_ASYNC_SENDING, "true").equals("true"); public static final String LEARNER_CLOSE_SOCKET_ASYNC = "learner.closeSocketAsync"; public static final boolean closeSocketAsync = Boolean.getBoolean(LEARNER_CLOSE_SOCKET_ASYNC); @@ -208,7 +208,7 @@ void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { /** * Start thread that will forward any packet in the queue to the leader */ - protected void startSendingThread() { + public void startSendingThread() { sender = new LearnerSender(this); sender.start(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java index d876c167f94..a8118ace503 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java @@ -94,11 +94,11 @@ static class TestLearner extends Learner { private Socket socketToBeCreated = null; private Set unreachableAddresses = emptySet(); - private void setTimeMultiplier(long multiplier) { + protected void setTimeMultiplier(long multiplier) { timeMultiplier = multiplier; } - private void setPassConnectAttempt(int num) { + protected void setPassConnectAttempt(int num) { passSocketConnectOnAttempt = num; } @@ -106,15 +106,15 @@ protected long nanoTime() { return socketConnectAttempt * timeMultiplier; } - private int getSockConnectAttempt() { + protected int getSockConnectAttempt() { return socketConnectAttempt; } - private void setSocketToBeCreated(Socket socketToBeCreated) { + protected void setSocketToBeCreated(Socket socketToBeCreated) { this.socketToBeCreated = socketToBeCreated; } - private void setUnreachableAddresses(Set unreachableAddresses) { + protected void setUnreachableAddresses(Set unreachableAddresses) { this.unreachableAddresses = unreachableAddresses; } @@ -136,6 +136,18 @@ protected Socket createSocket() throws X509Exception, IOException { } } + static class TestLearnerWithZk extends TestLearner { + TestLearnerWithZk() throws IOException { + super(); + File tmpFile = File.createTempFile("test", ".dir", testData); + tmpFile.delete(); + FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile); + self = new QuorumPeer(); + zk = new SimpleLearnerZooKeeperServer(ftsl, self); + ((SimpleLearnerZooKeeperServer) zk).learner = this; + } + } + @AfterEach public void cleanup() { System.clearProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED); @@ -216,7 +228,7 @@ public void shouldTryMultipleAddresses() throws Exception { @Test public void multipleAddressesSomeAreFailing() throws Exception { System.setProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "true"); - TestLearner learner = new TestLearner(); + TestLearner learner = new TestLearnerWithZk(); learner.self = new QuorumPeer(); learner.self.setTickTime(2000); learner.self.setInitLimit(5); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java index 70f1844eac6..cff196894aa 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java @@ -112,6 +112,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { fzks.startup(); fzks.setServerCnxnFactory(serverCnxnFactory); quorumPeer.follower = new MyFollower(quorumPeer, fzks); + quorumPeer.follower.startSendingThread(); LOG.info("Follower created"); // Simulate a socket channel between a client and a follower final SocketChannel socketChannel = createClientSocketChannel();