diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java index f2f225c81ec..b7a6471910a 100644 --- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java @@ -667,21 +667,17 @@ public static void closeSock(SocketChannel sock) { * org.apache.jute.Record, java.lang.String) */ @Override - public void sendResponse(ReplyHeader h, Record r, String tag) { + public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); // Make space for length BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); - try { - baos.write(fourBytes); - bos.writeRecord(h, "header"); - if (r != null) { - bos.writeRecord(r, tag); - } - baos.close(); - } catch (IOException e) { - LOG.error("Error serializing response"); + baos.write(fourBytes); + bos.writeRecord(h, "header"); + if (r != null) { + bos.writeRecord(r, tag); } + baos.close(); byte b[] = baos.toByteArray(); ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); @@ -694,7 +690,7 @@ public void sendResponse(ReplyHeader h, Record r, String tag) { } } } catch(Exception e) { - LOG.warn("Unexpected exception. Destruction averted.", e); + throw new IOException(e); } } @@ -716,7 +712,11 @@ public void process(WatchedEvent event) { // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); - sendResponse(h, e, "notification"); + try { + sendResponse(h, e, "notification"); + } catch (IOException ex) { + LOG.debug("Problem sending to " + getRemoteSocketAddress(), ex); + } } /* diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java index 7a72757860b..c370cb5a072 100644 --- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.PrintWriter; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; @@ -113,6 +115,14 @@ public void uncaughtException(Thread t, Throwable e) { ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES, 64 * 1024); } + /** + * serverCnxnClassCtr is introduced to improve testability of NIOServerCnxn + * as creation of NIOServerCnxn instance is buried deep in the factory code. + * It will come into play only if ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN property is set + * otherwise default behavior will be preserved + */ + private Constructor serverCnxnClassCtr = null; + /** * AbstractSelectThread is an abstract base class containing a few bits * of code shared by the AcceptThread (which selects on the listen socket) @@ -630,6 +640,16 @@ public static ByteBuffer getDirectBuffer() { * limits of the operating system). startup(zks) must be called subsequently. */ public NIOServerCnxnFactory() { + String serverCnxnClassName = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN, "org.apache.zookeeper.server.NIOServerCnxn"); + try { + Class serverCnxnClass = Class.forName(serverCnxnClassName).asSubclass(NIOServerCnxn.class); + serverCnxnClassCtr = + serverCnxnClass.getConstructor(ZooKeeperServer.class, SocketChannel.class, + SelectionKey.class, NIOServerCnxnFactory.class, + SelectorThread.class); + } catch (Throwable t) { + throw new RuntimeException("Exception when trying to get constructor for " + serverCnxnClassName, t); + } } private volatile boolean stopped = true; @@ -842,7 +862,12 @@ private void addCnxn(NIOServerCnxn cnxn) { protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException { - return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread); + + try { + return serverCnxnClassCtr.newInstance(zkServer, sock, sk, this, selectorThread); + } catch (Throwable t) { + throw new IOException("Can not instantiate class for " + serverCnxnClassCtr.getName(), t); + } } private int getClientCnxnCount(InetAddress cl) { diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java index c48f6b1afe4..5c4c6191bd7 100644 --- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java @@ -71,7 +71,7 @@ public class NettyServerCnxn extends ServerCnxn { NettyServerCnxnFactory factory; boolean initialized; - NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { + public NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { this.channel = channel; this.zkServer = zks; this.factory = factory; @@ -139,9 +139,7 @@ public void process(WatchedEvent event) { try { sendResponse(h, e, "notification"); } catch (IOException e1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); - } + LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); close(); } } @@ -165,31 +163,31 @@ static class ResumeMessageEvent implements MessageEvent { @Override public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { - if (!channel.isOpen()) { - return; - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - // Make space for length - BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { + if (!channel.isOpen()) { + return; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // Make space for length + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); baos.write(fourBytes); bos.writeRecord(h, "header"); if (r != null) { bos.writeRecord(r, tag); } baos.close(); - } catch (IOException e) { - LOG.error("Error serializing response"); - } - byte b[] = baos.toByteArray(); - ByteBuffer bb = ByteBuffer.wrap(b); - bb.putInt(b.length - 4).rewind(); - sendBuffer(bb); - if (h.getXid() > 0) { - // zks cannot be null otherwise we would not have gotten here! - if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { - enableRecv(); + byte b[] = baos.toByteArray(); + ByteBuffer bb = ByteBuffer.wrap(b); + bb.putInt(b.length - 4).rewind(); + sendBuffer(bb); + if (h.getXid() > 0) { + // zks cannot be null otherwise we would not have gotten here! + if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { + enableRecv(); + } } + } catch (Exception e) { + throw new IOException(e); } } diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java index 25b682b8633..ff24243cf81 100644 --- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java +++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java @@ -21,6 +21,8 @@ import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.KeyManagementException; @@ -79,6 +81,14 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { InetSocketAddress localAddress; int maxClientCnxns = 60; + /** + * serverCnxnClassCtr is introduced to improve testability of NettyServerCnxn + * as creation of NettyServerCnxn instance is buried deep in the factory code. + * It will come into play only if ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN property is set + * otherwise default behavior will be preserved + */ + private Constructor serverCnxnClassCtr = null; + /** * This is an inner class since we need to extend SimpleChannelHandler, but * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner @@ -105,8 +115,14 @@ public void channelConnected(ChannelHandlerContext ctx, LOG.trace("Channel connected " + e); } - NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(), - zkServer, NettyServerCnxnFactory.this); + NettyServerCnxn cnxn = null; + try { + cnxn = serverCnxnClassCtr.newInstance(ctx.getChannel(), + zkServer, NettyServerCnxnFactory.this); + } catch (Throwable t) { + throw new IOException("Exception while trying to instantiate " + serverCnxnClassCtr.getName(), t); + } + ctx.setAttachment(cnxn); if (secure) { @@ -326,6 +342,15 @@ public void operationComplete(ChannelFuture future) CnxnChannelHandler channelHandler = new CnxnChannelHandler(); NettyServerCnxnFactory() { + String serverCnxnClassName = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN, "org.apache.zookeeper.server.NettyServerCnxn"); + try { + Class serverCnxnClass = Class.forName(serverCnxnClassName).asSubclass(NettyServerCnxn.class); + serverCnxnClassCtr = + serverCnxnClass.getConstructor(Channel.class, ZooKeeperServer.class, NettyServerCnxnFactory.class); + } catch (Throwable t) { + throw new RuntimeException("Exception when trying to get constructor for " + serverCnxnClassName, t); + } + bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java index fc6766cdc13..4627b68a674 100644 --- a/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java +++ b/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java @@ -43,7 +43,12 @@ public abstract class ServerCnxnFactory { public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory"; - + /** + * property to be able to overwrite NIO/NettyServerCnxn with it's subclass if needed. + * Used for unit testing + */ + public static final String ZOOKEEPER_SERVER_CNXN = "zookeeper.serverCnxn"; + private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class); // Tells whether SSL is enabled on this ServerCnxnFactory diff --git a/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java b/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java index 40eab0a8178..11e58cb2193 100644 --- a/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java +++ b/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java @@ -18,9 +18,14 @@ package org.apache.zookeeper.server; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.io.IOException; + +import org.apache.jute.Record; +import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread; public class MockNIOServerCnxn extends NIOServerCnxn { @@ -40,6 +45,41 @@ public void doIO(SelectionKey k) throws InterruptedException { @Override protected boolean isSocketOpen() { + // trying to reuse this class for different tests. + // problem with Java, that I can not create a class that inherits from some other - like + if (System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN) != null) { + return super.isSocketOpen(); + } return true; } + + @Override + public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { + String exceptionType = System.getProperty("exception.type", "NoException"); + switch(exceptionType) { + case "IOException": + throw new IOException("test IOException"); + case "NoException": + super.sendResponse(h,r,tag); + break; + case "RunTimeException": + try { + Field zkServerField = NIOServerCnxn.class.getDeclaredField("zkServer"); + zkServerField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(zkServerField, zkServerField.getModifiers() & ~Modifier.FINAL); + zkServerField.set((NIOServerCnxn)this, null); + super.sendResponse(h,r,tag); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + break; + default: + break; + } + } + } diff --git a/src/java/test/org/apache/zookeeper/server/MockNettyServerCnxn.java b/src/java/test/org/apache/zookeeper/server/MockNettyServerCnxn.java new file mode 100644 index 00000000000..9abbe2e8b51 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/MockNettyServerCnxn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import org.apache.jute.Record; +import org.apache.zookeeper.proto.ReplyHeader; +import org.jboss.netty.channel.Channel; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +/** + * Helper class to test different scenarios in NettyServerCnxn + */ +public class MockNettyServerCnxn extends NettyServerCnxn { + + public MockNettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { + super(channel, zks, factory); + } + + @Override + public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { + String exceptionType = System.getProperty("exception.type", "IOException"); + switch(exceptionType) { + case "IOException": + throw new IOException("test IOException"); + case "NoException": + super.sendResponse(h,r,tag); + break; + case "RunTimeException": + try { + Field zkServerField = NettyServerCnxn.class.getDeclaredField("zkServer"); + zkServerField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(zkServerField, zkServerField.getModifiers() & ~Modifier.FINAL); + zkServerField.set((NettyServerCnxn)this, null); + super.sendResponse(h,r,tag); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + break; + default: + break; + } + } +} diff --git a/src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java b/src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java new file mode 100644 index 00000000000..f43307e2951 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.test.ClientBase; +import org.junit.AfterClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.fail; + +/** + * Unit tests to test different exceptions scenarious in sendResponse + */ +public class ServerCxnExceptionsTest extends ClientBase { + + private static final Logger LOG = LoggerFactory.getLogger(ServerCxnExceptionsTest.class); + + private String exceptionType; + + private void NettySetup() throws Exception { + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, + "org.apache.zookeeper.server.NettyServerCnxnFactory"); + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN, "org.apache.zookeeper.server.MockNettyServerCnxn"); + System.setProperty("exception.type", "NoException"); + } + + private void NIOSetup() throws Exception { + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, + "org.apache.zookeeper.server.NIOServerCnxnFactory"); + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN, "org.apache.zookeeper.server.MockNIOServerCnxn"); + System.setProperty("exception.type", "NoException"); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY); + System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN); + System.clearProperty("exception.type"); + } + + @Test (timeout = 60000) + public void testNettyIOException() throws Exception { + tearDown(); + NettySetup(); + testIOExceptionHelper(); + } + + @Test (timeout = 60000) + public void testNIOIOException() throws Exception { + tearDown(); + NIOSetup(); + testIOExceptionHelper(); + } + + private void testIOExceptionHelper() throws Exception { + System.setProperty("exception.type", "IOException"); + super.setUp(); + final ZooKeeper zk = createClient(); + final String path = "/a"; + try { + // make sure zkclient works + zk.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + fail("Should not come here"); + Stat stats = zk.exists(path, false); + if (stats != null) { + int length = stats.getDataLength(); + } + } catch (KeeperException.ConnectionLossException cle) { + LOG.info("ConnectionLossException: {}", cle); + } finally { + zk.close(); + } + } + + @Test (timeout = 10000) + public void testNettyNoException() throws Exception { + tearDown(); + NettySetup(); + testZKNoExceptionHelper(); + } + + @Test (timeout = 10000) + public void testNIONoException() throws Exception { + tearDown(); + NIOSetup(); + testZKNoExceptionHelper(); + } + + private void testZKNoExceptionHelper() throws Exception { + System.setProperty("exception.type", "NoException"); + super.setUp(); + final ZooKeeper zk = createClient(); + final String path = "/a"; + try { + // make sure zkclient works + zk.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + Stat stats = zk.exists(path, false); + if ( stats != null ) { + int length = stats.getDataLength(); + } + } catch (KeeperException.ConnectionLossException cle) { + LOG.error("ConnectionLossException: {}", cle); + fail("No exception should be thrown"); + } catch (Throwable t) { + // error + LOG.error("Throwable {}", t); + fail("No exception should be thrown"); + } finally { + zk.close(); + } + } + @Test (timeout = 10000) + public void testNettyRunTimeException() throws Exception { + tearDown(); + NettySetup(); + testZKRunTimeExceptionHelper(); + } + + @Test (timeout = 10000) + public void testNIORunTimeException() throws Exception { + tearDown(); + NIOSetup(); + testZKRunTimeExceptionHelper(); + } + + private void testZKRunTimeExceptionHelper() throws Exception { + System.setProperty("exception.type", "RunTimeException"); + super.setUp(); + final ZooKeeper zk = createClient(); + final String path = "/a"; + try { + // make sure zkclient works + String returnPath = zk.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + Stat stats = zk.exists(returnPath, false); + if ( stats != null ) { + int length = stats.getDataLength(); + } + fail("should not reach here"); + } catch (KeeperException.ConnectionLossException cle) { + LOG.info("ConnectionLossException: {}", cle); + } finally { + zk.close(); + } + } +}