diff --git a/.gitignore b/.gitignore index 0cbf40d712d..28502876854 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,7 @@ out/ *.ipr *.iws *.iml +*.eml # NetBeans *~.nib diff --git a/build.xml b/build.xml index aacf55aa192..395a3de5b67 100644 --- a/build.xml +++ b/build.xml @@ -1166,6 +1166,11 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant"> + + diff --git a/ivy.xml b/ivy.xml index 4ecb3a60a02..6feec4f7ed5 100644 --- a/ivy.xml +++ b/ivy.xml @@ -83,5 +83,8 @@ conf="optional->default"/> + + + diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java index f2f225c81ec..b7a6471910a 100644 --- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java @@ -667,21 +667,17 @@ public static void closeSock(SocketChannel sock) { * org.apache.jute.Record, java.lang.String) */ @Override - public void sendResponse(ReplyHeader h, Record r, String tag) { + public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); // Make space for length BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); - try { - baos.write(fourBytes); - bos.writeRecord(h, "header"); - if (r != null) { - bos.writeRecord(r, tag); - } - baos.close(); - } catch (IOException e) { - LOG.error("Error serializing response"); + baos.write(fourBytes); + bos.writeRecord(h, "header"); + if (r != null) { + bos.writeRecord(r, tag); } + baos.close(); byte b[] = baos.toByteArray(); ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); @@ -694,7 +690,7 @@ public void sendResponse(ReplyHeader h, Record r, String tag) { } } } catch(Exception e) { - LOG.warn("Unexpected exception. Destruction averted.", e); + throw new IOException(e); } } @@ -716,7 +712,11 @@ public void process(WatchedEvent event) { // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); - sendResponse(h, e, "notification"); + try { + sendResponse(h, e, "notification"); + } catch (IOException ex) { + LOG.debug("Problem sending to " + getRemoteSocketAddress(), ex); + } } /* diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java index c48f6b1afe4..93c92a727f3 100644 --- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java @@ -139,9 +139,7 @@ public void process(WatchedEvent event) { try { sendResponse(h, e, "notification"); } catch (IOException e1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); - } + LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e1); close(); } } @@ -165,31 +163,31 @@ static class ResumeMessageEvent implements MessageEvent { @Override public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { - if (!channel.isOpen()) { - return; - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - // Make space for length - BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { + if (!channel.isOpen()) { + return; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // Make space for length + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); baos.write(fourBytes); bos.writeRecord(h, "header"); if (r != null) { bos.writeRecord(r, tag); } baos.close(); - } catch (IOException e) { - LOG.error("Error serializing response"); - } - byte b[] = baos.toByteArray(); - ByteBuffer bb = ByteBuffer.wrap(b); - bb.putInt(b.length - 4).rewind(); - sendBuffer(bb); - if (h.getXid() > 0) { - // zks cannot be null otherwise we would not have gotten here! - if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { - enableRecv(); + byte b[] = baos.toByteArray(); + ByteBuffer bb = ByteBuffer.wrap(b); + bb.putInt(b.length - 4).rewind(); + sendBuffer(bb); + if (h.getXid() > 0) { + // zks cannot be null otherwise we would not have gotten here! + if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { + enableRecv(); + } } + } catch (Exception e) { + throw new IOException(e); } } diff --git a/src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java b/src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java new file mode 100644 index 00000000000..b5ca113fd62 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.test.ClientBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.fail; + +/** + * Unit tests to test different exceptions scenarios in sendResponse + */ +@RunWith(BMUnitRunner.class) +public class ServerCxnExceptionsTest extends ClientBase { + + private static final Logger LOG = LoggerFactory.getLogger(ServerCxnExceptionsTest.class); + private static String previousFactory = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + previousFactory = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (previousFactory != null) { + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, previousFactory); + } + } + + @Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class) + @BMRule(name = "IOExceptionNetty", + targetClass = "org.apache.zookeeper.server.NettyServerCnxn", + targetMethod = "sendResponse", + action = "throw new IOException(\"Test IOException from ServerCxnExceptionsTest with Netty\");", + targetLocation = "AT ENTRY" + ) + public void testIOExceptionNetty() throws Exception { + tearDown(); + nettySetup(); + testZKHelper(true); + } + + @Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class) + @BMRule(name = "IOExceptionNIO", + targetClass = "org.apache.zookeeper.server.NIOServerCnxn", + targetMethod = "sendResponse", + action = "throw new IOException(\"Test IOException from ServerCxnExceptionsTest with NIO\");", + targetLocation = "AT ENTRY" + ) + public void testIOExceptionNIO() throws Exception { + tearDown(); + nioSetup(); + testZKHelper(true); + } + + @Test(timeout = 60000) + public void testNoExceptionNetty() throws Exception { + tearDown(); + nettySetup(); + testZKHelper(false); + } + + @Test(timeout = 60000) + public void testNoExceptionNIO() throws Exception { + tearDown(); + nioSetup(); + testZKHelper(false); + } + + @Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class) + @BMRule(name = "RuntimeException Netty", + targetClass = "org.apache.zookeeper.server.NettyServerCnxn", + targetMethod = "sendResponse", + action = "throw new RuntimeException(\"Test RuntimeException from ServerCxnExceptionsTest\")", + targetLocation = "AT ENTRY" + ) + public void testNettyRunTimeException() throws Exception { + tearDown(); + nettySetup(); + testZKHelper(true); + } + + @Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class) + @BMRule(name = "RuntimeException Netty", + targetClass = "org.apache.zookeeper.server.NIOServerCnxn", + targetMethod = "sendResponse", + action = "throw new RuntimeException(\"Test RuntimeException from ServerCxnExceptionsTest\")", + targetLocation = "AT ENTRY" + ) + public void testNIORunTimeException() throws Exception { + tearDown(); + nioSetup(); + testZKHelper(true); + } + + private void nettySetup() throws Exception { + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, + "org.apache.zookeeper.server.NettyServerCnxnFactory"); + } + + private void nioSetup() throws Exception { + System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, + "org.apache.zookeeper.server.NIOServerCnxnFactory"); + } + + private void testZKHelper(boolean shouldFail) throws Exception { + super.setUp(); + final ZooKeeper zk = createClient(); + final String path = "/a"; + try { + // make sure zkclient works + zk.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + Stat stats = zk.exists(path, false); + if (stats != null) { + int length = stats.getDataLength(); + } + if (shouldFail) { + fail("sendResponse() should have thrown IOException"); + } + } + finally { + try { + zk.close(); + } + catch (Exception e) { + // IGNORE any exception during close + LOG.debug("Exception during close: {}", e); + } + } + } +}