Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ out/
*.ipr
*.iws
*.iml
*.eml

# NetBeans
*~.nib
Expand Down
5 changes: 5 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,11 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
<path id="junit.classpath">
<path refid="test.java.classpath" />
<pathelement path="${test.java.classes}" />
<!-- Ant & Byteman need tools.jar classpath specified.
Maven & Byteman don't require this.
With jdk9 tools.jar will be retired so this line should be removed!
-->
<pathelement location="${java.home}/../lib/tools.jar"/>
</path>
<fileset id="quicktest.files" dir="${test.src.dir}">
<include name="**/*${test.category}Test.java" />
Expand Down
3 changes: 3 additions & 0 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@
conf="optional->default"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.9.11"
conf="optional->default"/>
<dependency org="org.jboss.byteman" name="byteman" rev="3.0.6" conf="test->default"/>
<dependency org="org.jboss.byteman" name="byteman-bmunit" rev="3.0.6" conf="test->default"/>
<dependency org="org.jboss.byteman" name="byteman-submit" rev="3.0.6" conf="test->default"/>
</dependencies>
</ivy-module>
24 changes: 12 additions & 12 deletions src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -667,21 +667,17 @@ public static void closeSock(SocketChannel sock) {
* org.apache.jute.Record, java.lang.String)
*/
@Override
public void sendResponse(ReplyHeader h, Record r, String tag) {
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
Expand All @@ -694,7 +690,7 @@ public void sendResponse(ReplyHeader h, Record r, String tag) {
}
}
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
throw new IOException(e);
}
}

Expand All @@ -716,7 +712,11 @@ public void process(WatchedEvent event) {
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, "notification");
try {
sendResponse(h, e, "notification");
} catch (IOException ex) {
LOG.debug("Problem sending to " + getRemoteSocketAddress(), ex);
}
}

/*
Expand Down
38 changes: 18 additions & 20 deletions src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ public void process(WatchedEvent event) {
try {
sendResponse(h, e, "notification");
} catch (IOException e1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
}
LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e1);
close();
}
}
Expand All @@ -165,31 +163,31 @@ static class ResumeMessageEvent implements MessageEvent {
@Override
public void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException {
if (!channel.isOpen()) {
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
if (!channel.isOpen()) {
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
if (h.getXid() > 0) {
// zks cannot be null otherwise we would not have gotten here!
if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
enableRecv();
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
if (h.getXid() > 0) {
// zks cannot be null otherwise we would not have gotten here!
if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
enableRecv();
}
}
} catch (Exception e) {
throw new IOException(e);
}
}

Expand Down
160 changes: 160 additions & 0 deletions src/java/test/org/apache/zookeeper/server/ServerCxnExceptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.test.ClientBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.fail;

/**
* Unit tests to test different exceptions scenarios in sendResponse
*/
@RunWith(BMUnitRunner.class)
public class ServerCxnExceptionsTest extends ClientBase {

private static final Logger LOG = LoggerFactory.getLogger(ServerCxnExceptionsTest.class);
private static String previousFactory = null;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
previousFactory = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
if (previousFactory != null) {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, previousFactory);
}
}

@Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class)
@BMRule(name = "IOExceptionNetty",
targetClass = "org.apache.zookeeper.server.NettyServerCnxn",
targetMethod = "sendResponse",
action = "throw new IOException(\"Test IOException from ServerCxnExceptionsTest with Netty\");",
targetLocation = "AT ENTRY"
)
public void testIOExceptionNetty() throws Exception {
tearDown();
nettySetup();
testZKHelper(true);
}

@Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class)
@BMRule(name = "IOExceptionNIO",
targetClass = "org.apache.zookeeper.server.NIOServerCnxn",
targetMethod = "sendResponse",
action = "throw new IOException(\"Test IOException from ServerCxnExceptionsTest with NIO\");",
targetLocation = "AT ENTRY"
)
public void testIOExceptionNIO() throws Exception {
tearDown();
nioSetup();
testZKHelper(true);
}

@Test(timeout = 60000)
public void testNoExceptionNetty() throws Exception {
tearDown();
nettySetup();
testZKHelper(false);
}

@Test(timeout = 60000)
public void testNoExceptionNIO() throws Exception {
tearDown();
nioSetup();
testZKHelper(false);
}

@Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class)
@BMRule(name = "RuntimeException Netty",
targetClass = "org.apache.zookeeper.server.NettyServerCnxn",
targetMethod = "sendResponse",
action = "throw new RuntimeException(\"Test RuntimeException from ServerCxnExceptionsTest\")",
targetLocation = "AT ENTRY"
)
public void testNettyRunTimeException() throws Exception {
tearDown();
nettySetup();
testZKHelper(true);
}

@Test(timeout = 60000, expected = KeeperException.ConnectionLossException.class)
@BMRule(name = "RuntimeException Netty",
targetClass = "org.apache.zookeeper.server.NIOServerCnxn",
targetMethod = "sendResponse",
action = "throw new RuntimeException(\"Test RuntimeException from ServerCxnExceptionsTest\")",
targetLocation = "AT ENTRY"
)
public void testNIORunTimeException() throws Exception {
tearDown();
nioSetup();
testZKHelper(true);
}

private void nettySetup() throws Exception {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
"org.apache.zookeeper.server.NettyServerCnxnFactory");
}

private void nioSetup() throws Exception {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
"org.apache.zookeeper.server.NIOServerCnxnFactory");
}

private void testZKHelper(boolean shouldFail) throws Exception {
super.setUp();
final ZooKeeper zk = createClient();
final String path = "/a";
try {
// make sure zkclient works
zk.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Stat stats = zk.exists(path, false);
if (stats != null) {
int length = stats.getDataLength();
}
if (shouldFail) {
fail("sendResponse() should have thrown IOException");
}
}
finally {
try {
zk.close();
}
catch (Exception e) {
// IGNORE any exception during close
LOG.debug("Exception during close: {}", e);
}
}
}
}