Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,12 @@ property, when available, is noted below.
effect due to TLS handshake timeout when there are too many in-flight TLS
handshakes. Set it to something like 250 is good enough to avoid herd effect.

* *learner.asyncSending*
(Java system property: **zookeeper.learner.asyncSending**)
(Java system property: **learner.asyncSending**)(Added for backward compatibility)
**New in 3.7.0:**
The sending and receiving packets in Learner were done synchronously in a critical section. An untimely network issue could cause the followers to hang (see [ZOOKEEPER-3575](https://issues.apache.org/jira/browse/ZOOKEEPER-3575) and [ZOOKEEPER-4074](https://issues.apache.org/jira/browse/ZOOKEEPER-4074)). The new design moves sending packets in Learner to a separate thread and sends the packets asynchronously. The new design is enabled with this parameter (learner.asyncSending).
The default is false.
<a name="sc_clusterOptions"></a>

#### Cluster Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
Expand Down Expand Up @@ -95,6 +96,7 @@ public Socket getSocket() {
return sock;
}

LearnerSender sender = null;
protected InputArchive leaderIs;
protected OutputArchive leaderOs;
/** the protocol version of the leader */
Expand All @@ -113,9 +115,13 @@ public Socket getSocket() {

private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");

public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
private static boolean asyncSending =
Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
static {
LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
LOG.info("TCP NoDelay set to: {}", nodelay);
LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
}

final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
Expand All @@ -124,6 +130,15 @@ public int getPendingRevalidationsCount() {
return pendingRevalidations.size();
}

// for testing
protected static void setAsyncSending(boolean newMode) {
asyncSending = newMode;
LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);

}
protected static boolean getAsyncSending() {
return asyncSending;
}
/**
* validate a session for a client
*
Expand Down Expand Up @@ -152,13 +167,27 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep
}

/**
* write a packet to the leader
* write a packet to the leader.
*
* This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
* When packets are sent synchronously, writing is done within a synchronization block.
* When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
* Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
* So we have only one thread writing to leaderOs at a time in either case.
*
* @param pp
* the proposal packet to be sent to the leader
* @throws IOException
*/
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
if (asyncSending) {
sender.queuePacket(pp);
} else {
writePacketNow(pp, flush);
}
}

void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THREAD_SAFETY_VIOLATION: Read/Write race. Non-private method Learner.writePacketNow(...) reads without synchronization from this.leaderOs. Potentially races with write in method Learner.connectToLeader(...).
Reporting because this access may occur on a background thread.
(at-me in a reply with help or ignore)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If learner.asyncSending is not set, all QuorumPacket will be sent in this writePacketNow method.
If learner.asyncSending is set, all QuorumPacket will be sent in the LearnerSender thread, which doesn't need the synchronization.
Therefore, there wouldn't be potential race.

if (pp != null) {
messageTracker.trackSent(pp.getType());
Expand All @@ -170,6 +199,14 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
}
}

/**
* Start thread that will forward any packet in the queue to the leader
*/
protected void startSendingThread() {
sender = new LearnerSender(this);
sender.start();
}

/**
* read a packet from the leader
*
Expand Down Expand Up @@ -305,6 +342,9 @@ protected void connectToLeader(MultipleAddresses multiAddr, String hostname) thr
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
if (asyncSending) {
startSendingThread();
}
}

class LeaderConnector implements Runnable {
Expand Down Expand Up @@ -788,8 +828,9 @@ protected void ping(QuorumPacket qp) throws IOException {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
qp.setData(bos.toByteArray());
writePacket(qp, true);

QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
writePacket(pingReply, true);
}

/**
Expand All @@ -799,6 +840,11 @@ public void shutdown() {
self.setZooKeeperServer(null);
self.closeAllConnections();
self.adminServer.setZooKeeperServer(null);

if (sender != null) {
sender.shutdown();
}

closeSocket();
// shutdown previous zookeeper
if (zk != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LearnerSender extends ZooKeeperCriticalThread {
private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class);

private final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
private final QuorumPacket proposalOfDeath = new QuorumPacket();

Learner learner;

public LearnerSender(Learner learner) {
super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener());
this.learner = learner;
}

@Override
public void run() {
while (true) {
try {
QuorumPacket p = queuedPackets.poll();
if (p == null) {
learner.bufferedOutput.flush();
p = queuedPackets.take();
}

if (p == proposalOfDeath) {
// Packet of death!
break;
}

learner.messageTracker.trackSent(p.getType());
learner.leaderOs.writeRecord(p, "packet");
} catch (IOException e) {
handleException(this.getName(), e);
break;
} catch (InterruptedException e) {
handleException(this.getName(), e);
break;
}
}

LOG.info("LearnerSender exited");
}

public void queuePacket(QuorumPacket pp) throws IOException {
if (pp == null) {
learner.bufferedOutput.flush();
} else {
queuedPackets.add(pp);
}
}

public void shutdown() {
LOG.info("Shutting down LearnerSender");
queuedPackets.clear();
queuedPackets.add(proposalOfDeath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,29 @@ public static String[] getHostAndPort(String s) throws ConfigException {
}
}

/**
* Some old configuration properties are not configurable in zookeeper configuration file
* zoo.cfg. To make these properties configurable in zoo.cfg old properties are prepended
* with zookeeper. For example prop.x.y.z changed to zookeeper.prop.x.y.z. But for backward
* compatibility both prop.x.y.z and zookeeper.prop.x.y.z should be supported.
* This method first gets value from new property, if first property is not configured
* then gets value from old property
*
* @param newPropertyKey new property key which starts with zookeeper.
* @return either new or old system property value. Null if none of the properties are set.
*/
public static String getPropertyBackwardCompatibleWay(String newPropertyKey) {
String newKeyValue = System.getProperty(newPropertyKey);
if (newKeyValue != null) {
return newKeyValue.trim();
}
String oldPropertyKey = newPropertyKey.replace("zookeeper.", "");
String oldKeyValue = System.getProperty(oldPropertyKey);

if (oldKeyValue != null) {
return oldKeyValue.trim();
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
Expand All @@ -31,14 +33,46 @@
import org.apache.zookeeper.test.ClientBase;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class LearnerMetricsTest extends QuorumPeerTestBase {

private static final int TIMEOUT_SECONDS = 30;
private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
private ZooKeeper zk_client;
private boolean asyncSending;
private static boolean bakAsyncSending;

public LearnerMetricsTest(boolean asyncSending) {
this.asyncSending = asyncSending;
}

@Parameterized.Parameters
public static Collection sendingModes() {
return Arrays.asList(new Object[][]{{true}, {false}});
}

@Before
public void setAsyncSendingFlag() {
Learner.setAsyncSending(asyncSending);
}

@BeforeClass
public static void saveAsyncSendingFlag() {
bakAsyncSending = Learner.getAsyncSending();
}

@AfterClass
public static void resetAsyncSendingFlag() {
Learner.setAsyncSending(bakAsyncSending);
}

@Test
public void testLearnerMetricsTest() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
Expand All @@ -38,23 +40,51 @@
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static int SERVER_COUNT = 3;
private MainThread[] mt;
private static boolean bakAsyncSending;

private boolean asyncSending;

public ReconfigDuringLeaderSyncTest(boolean asyncSending) {
this.asyncSending = asyncSending;
}

@Parameterized.Parameters
public static Collection sendingModes() {
return Arrays.asList(new Object[][]{{true}, {false}});
}

@Before
public void setup() {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
Learner.setAsyncSending(asyncSending);
QuorumPeerConfig.setReconfigEnabled(true);
}

@BeforeClass
public static void saveAsyncSendingFlag() {
bakAsyncSending = Learner.getAsyncSending();
}

@AfterClass
public static void resetAsyncSendingFlag() {
Learner.setAsyncSending(bakAsyncSending);
}

/**
* <pre>
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
Expand Down
Loading