diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index d9e6ea5096b..2b78564901d 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1080,6 +1080,12 @@ property, when available, is noted below. effect due to TLS handshake timeout when there are too many in-flight TLS handshakes. Set it to something like 250 is good enough to avoid herd effect. +* *learner.asyncSending* + (Java system property: **zookeeper.learner.asyncSending**) + (Java system property: **learner.asyncSending**)(Added for backward compatibility) + **New in 3.7.0:** + The sending and receiving packets in Learner were done synchronously in a critical section. An untimely network issue could cause the followers to hang (see [ZOOKEEPER-3575](https://issues.apache.org/jira/browse/ZOOKEEPER-3575) and [ZOOKEEPER-4074](https://issues.apache.org/jira/browse/ZOOKEEPER-4074)). The new design moves sending packets in Learner to a separate thread and sends the packets asynchronously. The new design is enabled with this parameter (learner.asyncSending). + The default is false. #### Cluster Options diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index fa7cc597cdf..d281acf2c14 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -54,6 +54,7 @@ import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ConfigUtils; import org.apache.zookeeper.server.util.MessageTracker; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; @@ -95,6 +96,7 @@ public Socket getSocket() { return sock; } + LearnerSender sender = null; protected InputArchive leaderIs; protected OutputArchive leaderOs; /** the protocol version of the leader */ @@ -113,9 +115,13 @@ public Socket getSocket() { private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); + public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending"; + private static boolean asyncSending = + Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING)); static { LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); LOG.info("TCP NoDelay set to: {}", nodelay); + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); } final ConcurrentHashMap pendingRevalidations = new ConcurrentHashMap(); @@ -124,6 +130,15 @@ public int getPendingRevalidationsCount() { return pendingRevalidations.size(); } + // for testing + protected static void setAsyncSending(boolean newMode) { + asyncSending = newMode; + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + + } + protected static boolean getAsyncSending() { + return asyncSending; + } /** * validate a session for a client * @@ -152,13 +167,27 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep } /** - * write a packet to the leader + * write a packet to the leader. + * + * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time. + * When packets are sent synchronously, writing is done within a synchronization block. + * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe. + * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only. + * So we have only one thread writing to leaderOs at a time in either case. * * @param pp * the proposal packet to be sent to the leader * @throws IOException */ void writePacket(QuorumPacket pp, boolean flush) throws IOException { + if (asyncSending) { + sender.queuePacket(pp); + } else { + writePacketNow(pp, flush); + } + } + + void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { messageTracker.trackSent(pp.getType()); @@ -170,6 +199,14 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException { } } + /** + * Start thread that will forward any packet in the queue to the leader + */ + protected void startSendingThread() { + sender = new LearnerSender(this); + sender.start(); + } + /** * read a packet from the leader * @@ -305,6 +342,9 @@ protected void connectToLeader(MultipleAddresses multiAddr, String hostname) thr leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); + if (asyncSending) { + startSendingThread(); + } } class LeaderConnector implements Runnable { @@ -788,8 +828,9 @@ protected void ping(QuorumPacket qp) throws IOException { dos.writeLong(entry.getKey()); dos.writeInt(entry.getValue()); } - qp.setData(bos.toByteArray()); - writePacket(qp, true); + + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); + writePacket(pingReply, true); } /** @@ -799,6 +840,11 @@ public void shutdown() { self.setZooKeeperServer(null); self.closeAllConnections(); self.adminServer.setZooKeeperServer(null); + + if (sender != null) { + sender.shutdown(); + } + closeSocket(); // shutdown previous zookeeper if (zk != null) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java new file mode 100644 index 00000000000..cbb7d699717 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.zookeeper.server.ZooKeeperCriticalThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LearnerSender extends ZooKeeperCriticalThread { + private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class); + + private final LinkedBlockingQueue queuedPackets = new LinkedBlockingQueue<>(); + private final QuorumPacket proposalOfDeath = new QuorumPacket(); + + Learner learner; + + public LearnerSender(Learner learner) { + super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener()); + this.learner = learner; + } + + @Override + public void run() { + while (true) { + try { + QuorumPacket p = queuedPackets.poll(); + if (p == null) { + learner.bufferedOutput.flush(); + p = queuedPackets.take(); + } + + if (p == proposalOfDeath) { + // Packet of death! + break; + } + + learner.messageTracker.trackSent(p.getType()); + learner.leaderOs.writeRecord(p, "packet"); + } catch (IOException e) { + handleException(this.getName(), e); + break; + } catch (InterruptedException e) { + handleException(this.getName(), e); + break; + } + } + + LOG.info("LearnerSender exited"); + } + + public void queuePacket(QuorumPacket pp) throws IOException { + if (pp == null) { + learner.bufferedOutput.flush(); + } else { + queuedPackets.add(pp); + } + } + + public void shutdown() { + LOG.info("Shutting down LearnerSender"); + queuedPackets.clear(); + queuedPackets.add(proposalOfDeath); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java index 508dc112565..d6f75729798 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java @@ -95,4 +95,29 @@ public static String[] getHostAndPort(String s) throws ConfigException { } } + /** + * Some old configuration properties are not configurable in zookeeper configuration file + * zoo.cfg. To make these properties configurable in zoo.cfg old properties are prepended + * with zookeeper. For example prop.x.y.z changed to zookeeper.prop.x.y.z. But for backward + * compatibility both prop.x.y.z and zookeeper.prop.x.y.z should be supported. + * This method first gets value from new property, if first property is not configured + * then gets value from old property + * + * @param newPropertyKey new property key which starts with zookeeper. + * @return either new or old system property value. Null if none of the properties are set. + */ + public static String getPropertyBackwardCompatibleWay(String newPropertyKey) { + String newKeyValue = System.getProperty(newPropertyKey); + if (newKeyValue != null) { + return newKeyValue.trim(); + } + String oldPropertyKey = newPropertyKey.replace("zookeeper.", ""); + String oldKeyValue = System.getProperty(oldPropertyKey); + + if (oldKeyValue != null) { + return oldKeyValue.trim(); + } + return null; + } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java index c6de02f80ed..659ba31f8a2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java @@ -20,6 +20,8 @@ import static org.hamcrest.core.Is.is; import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.zookeeper.CreateMode; @@ -31,14 +33,46 @@ import org.apache.zookeeper.test.ClientBase; import org.hamcrest.Matcher; import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class LearnerMetricsTest extends QuorumPeerTestBase { private static final int TIMEOUT_SECONDS = 30; private static final int SERVER_COUNT = 4; // 1 observer, 3 participants private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT]; private ZooKeeper zk_client; + private boolean asyncSending; + private static boolean bakAsyncSending; + + public LearnerMetricsTest(boolean asyncSending) { + this.asyncSending = asyncSending; + } + + @Parameterized.Parameters + public static Collection sendingModes() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } + + @Before + public void setAsyncSendingFlag() { + Learner.setAsyncSending(asyncSending); + } + + @BeforeClass + public static void saveAsyncSendingFlag() { + bakAsyncSending = Learner.getAsyncSending(); + } + + @AfterClass + public static void resetAsyncSendingFlag() { + Learner.setAsyncSending(bakAsyncSending); + } @Test public void testLearnerMetricsTest() throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java index bff6cbf0e45..ee51baf955e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java @@ -25,6 +25,8 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; import java.util.Map; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.PortAssignment; @@ -38,23 +40,51 @@ import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase { - protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class); + private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class); private static int SERVER_COUNT = 3; private MainThread[] mt; + private static boolean bakAsyncSending; + + private boolean asyncSending; + + public ReconfigDuringLeaderSyncTest(boolean asyncSending) { + this.asyncSending = asyncSending; + } + + @Parameterized.Parameters + public static Collection sendingModes() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } @Before public void setup() { System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); + Learner.setAsyncSending(asyncSending); QuorumPeerConfig.setReconfigEnabled(true); } + @BeforeClass + public static void saveAsyncSendingFlag() { + bakAsyncSending = Learner.getAsyncSending(); + } + + @AfterClass + public static void resetAsyncSendingFlag() { + Learner.setAsyncSending(bakAsyncSending); + } + /** *
      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
index ba68b229ec0..d259a5d7cbf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.junit.Test;
 
@@ -69,4 +70,45 @@ public void testGetHostAndPortWithoutPort() throws ConfigException {
         assertEquals(nsa.length, 1);
     }
 
+    @Test
+    public void testGetPropertyBackwardCompatibleWay() throws ConfigException {
+        String newProp = "zookeeper.prop.x.y.z";
+        String oldProp = "prop.x.y.z";
+
+        // Null as both properties are not set
+        String result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertNull(result);
+
+        // Return old property value when only old property is set
+        String oldPropValue = "oldPropertyValue";
+        System.setProperty(oldProp, oldPropValue);
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(oldPropValue, result);
+
+        // Return new property value when both properties are set
+        String newPropValue = "newPropertyValue";
+        System.setProperty(newProp, newPropValue);
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(newPropValue, result);
+
+        // cleanUp
+        clearProp(newProp, oldProp);
+
+        // Return trimmed value
+        System.setProperty(oldProp, oldPropValue + "  ");
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(oldPropValue, result);
+
+        System.setProperty(newProp, "  " + newPropValue);
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(newPropValue, result);
+
+        // cleanUp
+        clearProp(newProp, oldProp);
+    }
+
+    private void clearProp(String newProp, String oldProp) {
+        System.clearProperty(newProp);
+        System.clearProperty(oldProp);
+    }
 }