Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -667,21 +667,17 @@ public static void closeSock(SocketChannel sock) {
* org.apache.jute.Record, java.lang.String)
*/
@Override
public void sendResponse(ReplyHeader h, Record r, String tag) {
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
Expand All @@ -694,7 +690,7 @@ public void sendResponse(ReplyHeader h, Record r, String tag) {
}
}
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
throw new IOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion: wdyt about bubbling up a more custom message with the IOException instead of just encapsulate the Exception? I mean, something like:

throw new IOException("sendMessage exception: blah blah", e);

}
}

Expand All @@ -716,7 +712,11 @@ public void process(WatchedEvent event) {
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, "notification");
try {
sendResponse(h, e, "notification");
} catch (IOException ex) {
LOG.debug("Problem sending to " + getRemoteSocketAddress(), ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have mixed feelings with concatenating strings in a hot path (IOException happening here is a hot path when, for instance, a network blip happens).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion here? Not to getRemoteSocketAddress() at all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using LOG.debug, so it shouldn't be an issue on prod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would use a modern debug format:

LOG.debug("Problem sending to {}", getRemoteSocketAddress(), ex);

}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
Expand Down Expand Up @@ -113,6 +115,14 @@ public void uncaughtException(Thread t, Throwable e) {
ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES, 64 * 1024);
}

/**
* serverCnxnClassCtr is introduced to improve testability of NIOServerCnxn
* as creation of NIOServerCnxn instance is buried deep in the factory code.
* It will come into play only if ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN property is set
* otherwise default behavior will be preserved
*/
private Constructor<? extends NIOServerCnxn> serverCnxnClassCtr = null;

/**
* AbstractSelectThread is an abstract base class containing a few bits
* of code shared by the AcceptThread (which selects on the listen socket)
Expand Down Expand Up @@ -630,6 +640,16 @@ public static ByteBuffer getDirectBuffer() {
* limits of the operating system). startup(zks) must be called subsequently.
*/
public NIOServerCnxnFactory() {
String serverCnxnClassName = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN, "org.apache.zookeeper.server.NIOServerCnxn");
try {
Class<? extends NIOServerCnxn> serverCnxnClass = Class.forName(serverCnxnClassName).asSubclass(NIOServerCnxn.class);
serverCnxnClassCtr =
serverCnxnClass.getConstructor(ZooKeeperServer.class, SocketChannel.class,
SelectionKey.class, NIOServerCnxnFactory.class,
SelectorThread.class);
} catch (Throwable t) {
throw new RuntimeException("Exception when trying to get constructor for " + serverCnxnClassName, t);
}
}

private volatile boolean stopped = true;
Expand Down Expand Up @@ -842,7 +862,12 @@ private void addCnxn(NIOServerCnxn cnxn) {

protected NIOServerCnxn createConnection(SocketChannel sock,
SelectionKey sk, SelectorThread selectorThread) throws IOException {
return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);

try {
return serverCnxnClassCtr.newInstance(zkServer, sock, sk, this, selectorThread);
} catch (Throwable t) {
throw new IOException("Can not instantiate class for " + serverCnxnClassCtr.getName(), t);
}
}

private int getClientCnxnCount(InetAddress cl) {
Expand Down
40 changes: 19 additions & 21 deletions src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class NettyServerCnxn extends ServerCnxn {
NettyServerCnxnFactory factory;
boolean initialized;

NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
public NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not obvious to me why the access specifier of NettyServerCnxn should be changed public here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did this to match NIOServerCnxn constructor. It can be kept package level, as my test class is in the same package namespace. I can change it back, but it will be inconsistent with NIO

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take my words back. I need it to be public as I use reflection to create it in NettyServerCnxnFactory and if it is not I would have to do couple of more steps during init to set access to public which is unnecessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yufeldman Thanks for explaining, makes sense to me. It is required to run the tests, however I do find another problem with tests: it looks like the Netty tests (testNetty*) never run with current configuration. Proof: remove the public access specifier appertain to NettyServerCnxn and all tests of ServerCxnExceptionsTest still pass. We expect Netty related tests fail here without public access specifier, right? Now put back the public for NettyServerCnxn but remove the public access specifier appertains to NIOServercCnxn, now all tests failed while we expect only NIO tests fail but Netty tests pass.

It's likely caused by the intervening of the java system properties that controls the Netty vs NIO server selection. One solution is to split the ServerCxnExceptionsTest into Netty and NIO specific tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanm - there is nothing wrong with the tests, they do run fine. They do not use ctor from NettyServerCnxn, but from MockNettyServerCnxn - that has public ctor.
You can make MockNettyServerCnxn not public and you will have the same issue. And BTW
org.apache.zookeeper.server.NettyServerCnxnTest fails with not public ctor in NettyServerCnxn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yufeldman Yeah the tests are fine. What I mentioned that leads to NettyServerCnxnx not get instantiated only happens in erroneous cases when ZOOKEEPER_SERVER_CNXN_FACTORY is not initialized properly - this happens when the public specifier was removed so all tests fall back to create NIOServerCnxnx instead. That's not a real alarm.

Copy link
Author

@yufeldman yufeldman Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanm - >>> so all tests fall back to create NIOServerCnxnx instead

It is not the case, they don't fall back to NIO - they fail.
Here is printout from running tests with both Netty and MockNettyServerCnxn not having public ctor:
[junit] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.159 sec
[junit] Test org.apache.zookeeper.server.NettyServerCnxnTest FAILED
[junit] Running org.apache.zookeeper.server.PrepRequestProcessorTest
[junit] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.279 sec
[junit] Running org.apache.zookeeper.server.PurgeTxnTest
[junit] Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.431 sec
[junit] Running org.apache.zookeeper.server.ReferenceCountedACLCacheTest
[junit] Tests run: 10, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.089 sec
[junit] Running org.apache.zookeeper.server.SerializationPerfTest
[junit] Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.114 sec
[junit] Running org.apache.zookeeper.server.ServerCxnExceptionsTest
[junit] Tests run: 6, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 0.284 sec
[junit] Test org.apache.zookeeper.server.ServerCxnExceptionsTest FAILED

Copy link
Contributor

@hanm hanm Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the case, they don't fall back to NIO - they fail.

The fallback I was referring to is https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java#L130. It gets hit when public was removed from NIOServerCnxn for Netty* tests. An example call stack (note that a Netty test complaining about NIOServerCnxnFactory):
` INFO [main:ZKTestCase$1@70] - FAILED testNettyRunTimeException
java.io.IOException: Couldn't instantiate org.apache.zookeeper.server.NIOServerCnxnFactory

-------at org.apache.zookeeper.server.ServerCnxnFactory.createFactory(ServerCnxnFactory.java:142)
-------at org.apache.zookeeper.server.ServerCnxnFactory.createFactory(ServerCnxnFactory.java:158)
-------at org.apache.zookeeper.server.ServerCnxnFactory.createFactory(ServerCnxnFactory.java:152)`

Thanks for persisting on this, but I don't think this erroneous case need to be investigated further as it would not happen when real test cases were running.

Copy link
Author

@yufeldman yufeldman Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. For me it actually is:

2016-12-05 11:11:02,523 [myid:] - INFO [Time-limited test:JUnit4ZKTestRunner$LoggedInvokeMethod@98] - TEST METHOD FAILED testNettyRunTimeException
java.io.IOException: Couldn't instantiate org.apache.zookeeper.server.NettyServerCnxnFactory at org.apache.zookeeper.server.ServerCnxnFactory.createFactory(ServerCnxnFactory.java:141) at org.apache.zookeeper.server.ServerCnxnFactory.createFactory(ServerCnxnFactory.java:157)
at org.apache.zookeeper.server.ServerCnxnFactory.createFactory(ServerCnxnFactory.java:151)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanm - thank you for all the reviews and feedback. Really appreciate it.

this.channel = channel;
this.zkServer = zks;
this.factory = factory;
Expand Down Expand Up @@ -139,9 +139,7 @@ public void process(WatchedEvent event) {
try {
sendResponse(h, e, "notification");
} catch (IOException e1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
}
LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
close();
}
}
Expand All @@ -165,31 +163,31 @@ static class ResumeMessageEvent implements MessageEvent {
@Override
public void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException {
if (!channel.isOpen()) {
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
if (!channel.isOpen()) {
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
if (h.getXid() > 0) {
// zks cannot be null otherwise we would not have gotten here!
if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
enableRecv();
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
if (h.getXid() > 0) {
// zks cannot be null otherwise we would not have gotten here!
if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
enableRecv();
}
}
} catch (Exception e) {
throw new IOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.KeyManagementException;
Expand Down Expand Up @@ -79,6 +81,14 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
InetSocketAddress localAddress;
int maxClientCnxns = 60;

/**
* serverCnxnClassCtr is introduced to improve testability of NettyServerCnxn
* as creation of NettyServerCnxn instance is buried deep in the factory code.
* It will come into play only if ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN property is set
* otherwise default behavior will be preserved
*/
private Constructor<? extends NettyServerCnxn> serverCnxnClassCtr = null;

/**
* This is an inner class since we need to extend SimpleChannelHandler, but
* NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
Expand All @@ -105,8 +115,14 @@ public void channelConnected(ChannelHandlerContext ctx,
LOG.trace("Channel connected " + e);
}

NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
zkServer, NettyServerCnxnFactory.this);
NettyServerCnxn cnxn = null;
try {
cnxn = serverCnxnClassCtr.newInstance(ctx.getChannel(),
zkServer, NettyServerCnxnFactory.this);
} catch (Throwable t) {
throw new IOException("Exception while trying to instantiate " + serverCnxnClassCtr.getName(), t);
}

ctx.setAttachment(cnxn);

if (secure) {
Expand Down Expand Up @@ -326,6 +342,15 @@ public void operationComplete(ChannelFuture future)
CnxnChannelHandler channelHandler = new CnxnChannelHandler();

NettyServerCnxnFactory() {
String serverCnxnClassName = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN, "org.apache.zookeeper.server.NettyServerCnxn");
try {
Class<? extends NettyServerCnxn> serverCnxnClass = Class.forName(serverCnxnClassName).asSubclass(NettyServerCnxn.class);
serverCnxnClassCtr =
serverCnxnClass.getConstructor(Channel.class, ZooKeeperServer.class, NettyServerCnxnFactory.class);
} catch (Throwable t) {
throw new RuntimeException("Exception when trying to get constructor for " + serverCnxnClassName, t);
}

bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@
public abstract class ServerCnxnFactory {

public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";

/**
* property to be able to overwrite NIO/NettyServerCnxn with it's subclass if needed.
* Used for unit testing
*/
public static final String ZOOKEEPER_SERVER_CNXN = "zookeeper.serverCnxn";

private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);

// Tells whether SSL is enabled on this ServerCnxnFactory
Expand Down
40 changes: 40 additions & 0 deletions src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package org.apache.zookeeper.server;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.io.IOException;

import org.apache.jute.Record;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;

public class MockNIOServerCnxn extends NIOServerCnxn {
Expand All @@ -40,6 +45,41 @@ public void doIO(SelectionKey k) throws InterruptedException {

@Override
protected boolean isSocketOpen() {
// trying to reuse this class for different tests.
// problem with Java, that I can not create a class that inherits from some other - like <T extends ServerCnxn>
if (System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN) != null) {
return super.isSocketOpen();
}
return true;
}

@Override
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
String exceptionType = System.getProperty("exception.type", "NoException");
switch(exceptionType) {
case "IOException":
throw new IOException("test IOException");
case "NoException":
super.sendResponse(h,r,tag);
break;
case "RunTimeException":
try {
Field zkServerField = NIOServerCnxn.class.getDeclaredField("zkServer");
zkServerField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(zkServerField, zkServerField.getModifiers() & ~Modifier.FINAL);
zkServerField.set((NIOServerCnxn)this, null);
super.sendResponse(h,r,tag);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
break;
default:
break;
}
}

}
65 changes: 65 additions & 0 deletions src/java/test/org/apache/zookeeper/server/MockNettyServerCnxn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;

import org.apache.jute.Record;
import org.apache.zookeeper.proto.ReplyHeader;
import org.jboss.netty.channel.Channel;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;

/**
* Helper class to test different scenarios in NettyServerCnxn
*/
public class MockNettyServerCnxn extends NettyServerCnxn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file, tab is indented with 2 spaces while the rest of the ZooKeeper files use 4 spaces (I only discovered this 'cause my IDE complained about it).


public MockNettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
super(channel, zks, factory);
}

@Override
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
String exceptionType = System.getProperty("exception.type", "IOException");
switch(exceptionType) {
case "IOException":
throw new IOException("test IOException");
case "NoException":
super.sendResponse(h,r,tag);
break;
case "RunTimeException":
try {
Field zkServerField = NettyServerCnxn.class.getDeclaredField("zkServer");
zkServerField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(zkServerField, zkServerField.getModifiers() & ~Modifier.FINAL);
zkServerField.set((NettyServerCnxn)this, null);
super.sendResponse(h,r,tag);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
break;
default:
break;
}
}
}
Loading