Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public Socket getSocket() {
return sock;
}

LearnerSender sender = null;
protected InputArchive leaderIs;
protected OutputArchive leaderOs;
/** the protocol version of the leader */
Expand All @@ -103,9 +104,12 @@ public Socket getSocket() {

private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");

public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending";
private static boolean asyncSending = Boolean.getBoolean(LEARNER_ASYNC_SENDING);
static {
LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
LOG.info("TCP NoDelay set to: {}", nodelay);
LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
}

final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
Expand All @@ -114,6 +118,15 @@ public int getPendingRevalidationsCount() {
return pendingRevalidations.size();
}

// for testing
protected static void setAsyncSending(boolean newMode) {
asyncSending = newMode;
LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);

}
protected static boolean getAsyncSending() {
return asyncSending;
}
/**
* validate a session for a client
*
Expand Down Expand Up @@ -149,6 +162,14 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep
* @throws IOException
*/
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
if (asyncSending) {
sender.queuePacket(pp);
} else {
writePacketNow(pp, flush);
}
}

void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
messageTracker.trackSent(pp.getType());
Expand All @@ -160,6 +181,14 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
}
}

/**
* Start thread that will forward any packet in the queue to the leader
*/
protected void startSendingThread() {
sender = new LearnerSender(this);
sender.start();
}

/**
* read a packet from the leader
*
Expand Down Expand Up @@ -323,6 +352,9 @@ protected void connectToLeader(InetSocketAddress addr, String hostname) throws I
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
if (asyncSending) {
startSendingThread();
}
}

private Socket createSocket() throws X509Exception, IOException {
Expand Down Expand Up @@ -682,8 +714,9 @@ protected void ping(QuorumPacket qp) throws IOException {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
qp.setData(bos.toByteArray());
writePacket(qp, true);

QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
writePacket(pingReply, true);
}

/**
Expand All @@ -693,6 +726,11 @@ public void shutdown() {
self.setZooKeeperServer(null);
self.closeAllConnections();
self.adminServer.setZooKeeperServer(null);

if (sender != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: diff shows a missing space

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

sender.shutdown();
}

closeSocket();
// shutdown previous zookeeper
if (zk != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LearnerSender extends ZooKeeperCriticalThread {
private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class);

private final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
private final QuorumPacket proposalOfDeath = new QuorumPacket();

Learner learner;

public LearnerSender(Learner learner) {
super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener());
this.learner = learner;
}

@Override
public void run() {
while (true) {
try {
QuorumPacket p = queuedPackets.poll();
if (p == null) {
learner.bufferedOutput.flush();
p = queuedPackets.take();
}

if (p == proposalOfDeath) {
// Packet of death!
break;
}

learner.messageTracker.trackSent(p.getType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here https://github.com/apache/zookeeper/pull/1116/files#diff-c8b414c1ca2084ecb9fe32a0a3832d44R170
we accessing learner internal variables inside a synchronized block.
it looks like an inconsistent synchronization

maybe it depends on the fact that this feature is enabled or not, but I am not sure we are 100% safe
can you please motivate or at least leave some comment for future inspection and understanding of the flow ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without LearnerSender, there might be more than one thread writing to leaderOs so synchronization is required.

With LearnerSender, there still might be more than one thread calling writePacket but they are writing to a BlockingQueue so it's fine. Reading from the queue and writing to leaderOs is one thread (the learner sender thread), so we don't need synchronization. It's like we synchronize on the blockingQueue implicitly.

This is tricky. I may have missed some thing. Let me know and I'll fix the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a minimal comment to explain this point?
I agree with your explanation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, it might be better to not visit the internal variable directly, but using things like get to make the code easier to manage and reason about though.

learner.leaderOs.writeRecord(p, "packet");
} catch (IOException e) {
handleException(this.getName(), e);
break;
} catch (InterruptedException e) {
handleException(this.getName(), e);
break;
}
}

LOG.info("LearnerSender exited");
}

public void queuePacket(QuorumPacket pp) throws IOException {
if (pp == null) {
learner.bufferedOutput.flush();
} else {
queuedPackets.add(pp);
}
}

public void shutdown() {
LOG.info("Shutting down LearnerSender");
queuedPackets.clear();
queuedPackets.add(proposalOfDeath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
Expand All @@ -31,14 +33,46 @@
import org.apache.zookeeper.test.ClientBase;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class LearnerMetricsTest extends QuorumPeerTestBase {

private static final int TIMEOUT_SECONDS = 30;
private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
private ZooKeeper zk_client;
private boolean asyncSending;
private static boolean bakAsyncSending;

public LearnerMetricsTest(boolean asyncSending) {
this.asyncSending = asyncSending;
}

@Parameterized.Parameters
public static Collection sendingModes() {
return Arrays.asList(new Object[][]{{true}, {false}});
}

@Before
public void setAsyncSendingFlag() {
Learner.setAsyncSending(asyncSending);
}

@BeforeClass
public static void saveAsyncSendingFlag() {
bakAsyncSending = Learner.getAsyncSending();
}

@AfterClass
public static void resetAsyncSendingFlag() {
Learner.setAsyncSending(bakAsyncSending);
}

@Test
public void testLearnerMetricsTest() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
Expand All @@ -38,23 +40,51 @@
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static int SERVER_COUNT = 3;
private MainThread[] mt;
private static boolean bakAsyncSending;

private boolean asyncSending;

public ReconfigDuringLeaderSyncTest(boolean asyncSending) {
this.asyncSending = asyncSending;
}

@Parameterized.Parameters
public static Collection sendingModes() {
return Arrays.asList(new Object[][]{{true}, {false}});
}

@Before
public void setup() {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
Learner.setAsyncSending(asyncSending);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please reset the value in a @before method
it is a static flag, so it will affect other tests

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset in @afterclass

QuorumPeerConfig.setReconfigEnabled(true);
}

@BeforeClass
public static void saveAsyncSendingFlag() {
bakAsyncSending = Learner.getAsyncSending();
}

@AfterClass
public static void resetAsyncSendingFlag() {
Learner.setAsyncSending(bakAsyncSending);
}

/**
* <pre>
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
Expand Down