diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 515c0de899d..3a5d85c96eb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -85,6 +85,7 @@ public Socket getSocket() { return sock; } + LearnerSender sender = null; protected InputArchive leaderIs; protected OutputArchive leaderOs; /** the protocol version of the leader */ @@ -103,9 +104,12 @@ public Socket getSocket() { private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); + public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending"; + private static boolean asyncSending = Boolean.getBoolean(LEARNER_ASYNC_SENDING); static { LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); LOG.info("TCP NoDelay set to: {}", nodelay); + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); } final ConcurrentHashMap pendingRevalidations = new ConcurrentHashMap(); @@ -114,6 +118,15 @@ public int getPendingRevalidationsCount() { return pendingRevalidations.size(); } + // for testing + protected static void setAsyncSending(boolean newMode) { + asyncSending = newMode; + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + + } + protected static boolean getAsyncSending() { + return asyncSending; + } /** * validate a session for a client * @@ -142,13 +155,27 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep } /** - * write a packet to the leader + * write a packet to the leader. + * + * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time. + * When packets are sent synchronously, writing is done within a synchronization block. + * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe. + * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only. + * So we have only one thread writing to leaderOs at a time in either case. * * @param pp * the proposal packet to be sent to the leader * @throws IOException */ void writePacket(QuorumPacket pp, boolean flush) throws IOException { + if (asyncSending) { + sender.queuePacket(pp); + } else { + writePacketNow(pp, flush); + } + } + + void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { messageTracker.trackSent(pp.getType()); @@ -160,6 +187,14 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException { } } + /** + * Start thread that will forward any packet in the queue to the leader + */ + protected void startSendingThread() { + sender = new LearnerSender(this); + sender.start(); + } + /** * read a packet from the leader * @@ -323,6 +358,9 @@ protected void connectToLeader(InetSocketAddress addr, String hostname) throws I leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); + if (asyncSending) { + startSendingThread(); + } } private Socket createSocket() throws X509Exception, IOException { @@ -682,8 +720,9 @@ protected void ping(QuorumPacket qp) throws IOException { dos.writeLong(entry.getKey()); dos.writeInt(entry.getValue()); } - qp.setData(bos.toByteArray()); - writePacket(qp, true); + + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); + writePacket(pingReply, true); } /** @@ -693,6 +732,11 @@ public void shutdown() { self.setZooKeeperServer(null); self.closeAllConnections(); self.adminServer.setZooKeeperServer(null); + + if (sender != null) { + sender.shutdown(); + } + closeSocket(); // shutdown previous zookeeper if (zk != null) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java new file mode 100644 index 00000000000..cbb7d699717 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.zookeeper.server.ZooKeeperCriticalThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LearnerSender extends ZooKeeperCriticalThread { + private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class); + + private final LinkedBlockingQueue queuedPackets = new LinkedBlockingQueue<>(); + private final QuorumPacket proposalOfDeath = new QuorumPacket(); + + Learner learner; + + public LearnerSender(Learner learner) { + super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener()); + this.learner = learner; + } + + @Override + public void run() { + while (true) { + try { + QuorumPacket p = queuedPackets.poll(); + if (p == null) { + learner.bufferedOutput.flush(); + p = queuedPackets.take(); + } + + if (p == proposalOfDeath) { + // Packet of death! + break; + } + + learner.messageTracker.trackSent(p.getType()); + learner.leaderOs.writeRecord(p, "packet"); + } catch (IOException e) { + handleException(this.getName(), e); + break; + } catch (InterruptedException e) { + handleException(this.getName(), e); + break; + } + } + + LOG.info("LearnerSender exited"); + } + + public void queuePacket(QuorumPacket pp) throws IOException { + if (pp == null) { + learner.bufferedOutput.flush(); + } else { + queuedPackets.add(pp); + } + } + + public void shutdown() { + LOG.info("Shutting down LearnerSender"); + queuedPackets.clear(); + queuedPackets.add(proposalOfDeath); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java index c6de02f80ed..659ba31f8a2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java @@ -20,6 +20,8 @@ import static org.hamcrest.core.Is.is; import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.zookeeper.CreateMode; @@ -31,14 +33,46 @@ import org.apache.zookeeper.test.ClientBase; import org.hamcrest.Matcher; import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class LearnerMetricsTest extends QuorumPeerTestBase { private static final int TIMEOUT_SECONDS = 30; private static final int SERVER_COUNT = 4; // 1 observer, 3 participants private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT]; private ZooKeeper zk_client; + private boolean asyncSending; + private static boolean bakAsyncSending; + + public LearnerMetricsTest(boolean asyncSending) { + this.asyncSending = asyncSending; + } + + @Parameterized.Parameters + public static Collection sendingModes() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } + + @Before + public void setAsyncSendingFlag() { + Learner.setAsyncSending(asyncSending); + } + + @BeforeClass + public static void saveAsyncSendingFlag() { + bakAsyncSending = Learner.getAsyncSending(); + } + + @AfterClass + public static void resetAsyncSendingFlag() { + Learner.setAsyncSending(bakAsyncSending); + } @Test public void testLearnerMetricsTest() throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java index bff6cbf0e45..ee51baf955e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java @@ -25,6 +25,8 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; import java.util.Map; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.PortAssignment; @@ -38,23 +40,51 @@ import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase { - protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class); + private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class); private static int SERVER_COUNT = 3; private MainThread[] mt; + private static boolean bakAsyncSending; + + private boolean asyncSending; + + public ReconfigDuringLeaderSyncTest(boolean asyncSending) { + this.asyncSending = asyncSending; + } + + @Parameterized.Parameters + public static Collection sendingModes() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } @Before public void setup() { System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); + Learner.setAsyncSending(asyncSending); QuorumPeerConfig.setReconfigEnabled(true); } + @BeforeClass + public static void saveAsyncSendingFlag() { + bakAsyncSending = Learner.getAsyncSending(); + } + + @AfterClass + public static void resetAsyncSendingFlag() { + Learner.setAsyncSending(bakAsyncSending); + } + /** *
      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.