diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java b/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java index 46cd6648e0..c98d019ad5 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java @@ -67,6 +67,9 @@ * was started. * Achim Kraus (Bosch Software Innovations GmbH) - cancel pending messages on stop(). * Achim Kraus (Bosch Software Innovations GmbH) - add support for multicast + * Achim Kraus (Bosch Software Innovations GmbH) - move response retransmission + * setup to BaseCoapStack to include + * it also in a try-catch ******************************************************************************/ package org.eclipse.californium.core.network; @@ -547,12 +550,6 @@ public void sendResponse(final Exchange exchange, final Response response) { exchange.execute(new Runnable() { @Override public void run() { - if (exchange.getRequest().getOptions().hasObserve()) { - // observe- or cancel-observe-requests may have multiple responses - // when observes are finished, the last response has no longer an - // observe option. Therefore check the request for it. - exchange.retransmitResponse(); - } coapstack.sendResponse(exchange, response); } }); diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java b/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java index 04b15fc513..f8a64cb211 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java @@ -233,7 +233,12 @@ public void receiveRequest(final Request request, final EndpointReceiver receive @Override public void run() { - receiver.receiveRequest(previous, request); + try { + receiver.receiveRequest(previous, request); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving request {} again!", request, ex); + receiver.reject(request); + } } }); } else { @@ -242,7 +247,12 @@ public void run() { @Override public void run() { - receiver.receiveRequest(exchange, request); + try { + receiver.receiveRequest(exchange, request); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving request {}", request, ex); + receiver.reject(request); + } } }); } @@ -294,8 +304,8 @@ public void run() { receiver.receiveResponse(prev, response); return; } - } catch (Exception ex) { - LOGGER.error("error receiving response {} for {}", response, prev, ex); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving response {} for {}", response, prev, ex); } reject(response, receiver); } @@ -388,8 +398,8 @@ public void run() { LOGGER.debug("ignoring potentially forged response for token {} with non-matching endpoint context", idByToken); } - } catch (Exception ex) { - LOGGER.error("error receiving response {} for {}", response, exchange, ex); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving response {} for {}", response, exchange, ex); } reject(response, receiver); } @@ -433,8 +443,8 @@ public void run() { "ignoring potentially forged reply for message {} with non-matching endpoint context", idByMID); } - } catch (Exception ex) { - LOGGER.error("error receiving empty message {} for {}", message, exchange, ex); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving empty message {} for {}", message, exchange, ex); } } }); @@ -444,11 +454,11 @@ private void reject(final Response response, final EndpointReceiver receiver) { if (response.getType() != Type.ACK && response.hasMID()) { // reject only messages with MID, ignore for TCP - LOGGER.debug("rejecting unmatchable response from {}", response.getSourceContext()); + LOGGER.debug("rejecting response from {}", response.getSourceContext()); receiver.reject(response); } } - + private class RemoveHandlerImpl implements RemoveHandler { @Override diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java index 0e19cf1570..02c623b155 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java @@ -81,19 +81,39 @@ protected final void setLayers(final Layer specificLayers[]) { @Override public void sendRequest(final Exchange exchange, final Request request) { // delegate to top - top.sendRequest(exchange, request); + try { + top.sendRequest(exchange, request); + } catch (RuntimeException ex) { + request.setSendError(ex); + } } @Override public void sendResponse(final Exchange exchange, final Response response) { // delegate to top - top.sendResponse(exchange, response); + try { + if (exchange.getRequest().getOptions().hasObserve()) { + // observe- or cancel-observe-requests may have + // multiple responses. + // when observes are finished, the last response has + // no longer an observe option. Therefore check the + // request for it. + exchange.retransmitResponse(); + } + top.sendResponse(exchange, response); + } catch (RuntimeException ex) { + response.setSendError(ex); + } } @Override public void sendEmptyMessage(final Exchange exchange, final EmptyMessage message) { // delegate to top - top.sendEmptyMessage(exchange, message); + try { + top.sendEmptyMessage(exchange, message); + } catch (RuntimeException ex) { + message.setSendError(ex); + } } @Override diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java index 1c594056f1..e7052d715c 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java @@ -120,9 +120,11 @@ public synchronized Request getNextRequestBlock() { block.setDestinationContext(request.getDestinationContext()); // copy options block.setOptions(new OptionSet(request.getOptions())); - // copy message observers so that a failing blockwise request also notifies observers registered with - // the original request + // copy message observers so that a failing blockwise request + // also notifies observers registered with the original request block.addMessageObservers(request.getMessageObservers()); + block.addMessageObserver(new FailureForwardingMessageObserver(request)); + if (num == 0) { // indicate overall body size to peer block.getOptions().setSize1(request.getPayloadSize()); diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java index 95939d3885..91e1d60405 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java @@ -808,15 +808,25 @@ private void sendNextBlock(final Exchange exchange, final Response response, fin } int nextNum = status.getCurrentNum() + currentSize / newSize; LOGGER.debug("sending next Block1 num={}", nextNum); - Request nextBlock = status.getNextRequestBlock(nextNum, newSzx); - // we use the same token to ease traceability - nextBlock.setToken(response.getToken()); - nextBlock.setDestinationContext(response.getSourceContext()); - addBlock1CleanUpObserver(nextBlock, key, status); - - exchange.setCurrentRequest(nextBlock); - prepareBlock1Cleanup(status, key); - lower().sendRequest(exchange, nextBlock); + Request nextBlock = null; + try { + nextBlock = status.getNextRequestBlock(nextNum, newSzx); + // we use the same token to ease traceability + nextBlock.setToken(response.getToken()); + nextBlock.setDestinationContext(response.getSourceContext()); + addBlock1CleanUpObserver(nextBlock, key, status); + + exchange.setCurrentRequest(nextBlock); + prepareBlock1Cleanup(status, key); + lower().sendRequest(exchange, nextBlock); + } catch (RuntimeException ex) { + LOGGER.warn("cannot process next block request, aborting request!", ex); + if (nextBlock != null) { + nextBlock.setSendError(ex); + } else { + exchange.getRequest().setSendError(ex); + } + } } /** @@ -894,45 +904,51 @@ private void handleBlock2Response(final Exchange exchange, final Response respon Request request = exchange.getRequest(); Request block = new Request(request.getCode()); - // do not enforce CON, since NON could make sense over SMS or similar transports - block.setType(request.getType()); - block.setDestinationContext(response.getSourceContext()); - - /* - * WARNING: - * - * For Observe, the Matcher then will store the same - * exchange under a different KeyToken in exchangesByToken, - * which is cleaned up in the else case below. - */ - if (!response.isNotification()) { - block.setToken(response.getToken()); - } else if (exchange.isNotification()) { - // Recreate cleanup message observer - request.addMessageObserver(new CleanupMessageObserver(exchange)); - } - - // copy options - block.setOptions(new OptionSet(request.getOptions())); - block.getOptions().setBlock2(newSzx, false, nextNum); - - // make sure NOT to use Observe for block retrieval - block.getOptions().removeObserve(); - - // copy message observers from original request so that they will be notified - // if something goes wrong with this blockwise request, e.g. if it times out - block.addMessageObservers(request.getMessageObservers()); - // add an observer that cleans up the block2 transfer tracker if the - // block request fails - addBlock2CleanUpObserver(block, key, status); - - - status.setCurrentNum(nextNum); + block.addMessageObserver(new FailureForwardingMessageObserver(request)); + try { + + // do not enforce CON, since NON could make sense over SMS or similar transports + block.setType(request.getType()); + block.setDestinationContext(response.getSourceContext()); + + /* + * WARNING: + * + * For Observe, the Matcher then will store the same + * exchange under a different KeyToken in exchangesByToken, + * which is cleaned up in the else case below. + */ + if (!response.isNotification()) { + block.setToken(response.getToken()); + } else if (exchange.isNotification()) { + // Recreate cleanup message observer + request.addMessageObserver(new CleanupMessageObserver(exchange)); + } - LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block); - exchange.setCurrentRequest(block); - prepareBlock2Cleanup(status, key); - lower().sendRequest(exchange, block); + // copy options + block.setOptions(new OptionSet(request.getOptions())); + block.getOptions().setBlock2(newSzx, false, nextNum); + + // make sure NOT to use Observe for block retrieval + block.getOptions().removeObserve(); + + // copy message observers from original request so that they will be notified + // if something goes wrong with this blockwise request, e.g. if it times out + block.addMessageObservers(request.getMessageObservers()); + // add an observer that cleans up the block2 transfer tracker if the + // block request fails + addBlock2CleanUpObserver(block, key, status); + + status.setCurrentNum(nextNum); + + LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block); + exchange.setCurrentRequest(block); + prepareBlock2Cleanup(status, key); + lower().sendRequest(exchange, block); + } catch (RuntimeException ex) { + LOGGER.warn("cannot process next block request, aborting request!", ex); + block.setSendError(ex); + } } else { diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/FailureForwardingMessageObserver.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/FailureForwardingMessageObserver.java new file mode 100644 index 0000000000..ae63443985 --- /dev/null +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/FailureForwardingMessageObserver.java @@ -0,0 +1,59 @@ +/******************************************************************************* + * Copyright (c) 2019 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.html. + * + * Contributors: + * Bosch Software Innovations GmbH - initial creation + ******************************************************************************/ +package org.eclipse.californium.core.network.stack; + +import org.eclipse.californium.core.coap.Message; +import org.eclipse.californium.core.coap.MessageObserverAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Forward failures to other messages. + * + * Forward {@link #onCancel()}, {@link #onReject()}, {@link #onTimeout()}, and + * {@link #onSendError(Throwable)} to provided message. Intended to be used for + * blockwise transfers + */ +public class FailureForwardingMessageObserver extends MessageObserverAdapter { + + protected static final Logger LOGGER = LoggerFactory.getLogger(FailureForwardingMessageObserver.class.getName()); + + protected final Message message; + + protected FailureForwardingMessageObserver(final Message message) { + this.message = message; + } + + @Override + public void onCancel() { + message.cancel(); + } + + @Override + public void onReject() { + message.setRejected(true); + } + + @Override + public void onTimeout() { + message.setTimedOut(true); + } + + @Override + public void onSendError(Throwable error) { + message.setSendError(error); + } +} diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java b/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java index 210d9ed69c..551d2f818c 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java @@ -17,7 +17,6 @@ import java.util.concurrent.atomic.AtomicBoolean; -import org.eclipse.californium.core.coap.Message; import org.eclipse.californium.core.coap.MessageObserverAdapter; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; @@ -45,46 +44,34 @@ public void setErrorOnReadyToSend() { @Override public void sendRequest(final Request request) { - request.addMessageObserver(new ErrorInjectorMessageObserver(request)); + request.addMessageObserver(new ErrorInjectorMessageObserver()); } @Override public void sendResponse(final Response response) { - response.addMessageObserver(new ErrorInjectorMessageObserver(response)); + response.addMessageObserver(new ErrorInjectorMessageObserver()); } public class ErrorInjectorMessageObserver extends MessageObserverAdapter { - private Message message; - - public ErrorInjectorMessageObserver(Message message) { - this.message = message; - } - @Override public void onReadyToSend() { - if (errorOnReadyToSend.getAndSet(false)) { - RuntimeException exception = new IntendedTestException("Simulate error before to sent"); - message.setSendError(exception); - throw exception; + if (errorOnReadyToSend.compareAndSet(true, false)) { + throw new IntendedTestException("Simulate error before to sent"); } } @Override public void onSent() { - if (errorOnReadyToSend.getAndSet(false)) { - RuntimeException exception = new IntendedTestException("Simulate error on sent"); - message.setSendError(exception); - throw exception; + if (errorOnReadyToSend.compareAndSet(true, false)) { + throw new IntendedTestException("Simulate error on sent"); } } @Override public void onContextEstablished(EndpointContext endpointContext) { - if (errorOnEstablishedContext.getAndSet(false)) { - RuntimeException exception = new IntendedTestException("Simulate error on context established"); - message.setSendError(exception); - throw exception; + if (errorOnEstablishedContext.compareAndSet(true, false)) { + throw new IntendedTestException("Simulate error on context established"); } } } diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/Issue834Test.java b/californium-core/src/test/java/org/eclipse/californium/core/test/Issue834Test.java new file mode 100644 index 0000000000..6e5c37d3dd --- /dev/null +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/Issue834Test.java @@ -0,0 +1,281 @@ +package org.eclipse.californium.core.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.CoapServer; +import org.eclipse.californium.core.coap.CoAP.Code; +import org.eclipse.californium.core.coap.CoAP.ResponseCode; +import org.eclipse.californium.core.coap.Request; +import org.eclipse.californium.core.network.config.NetworkConfig; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.eclipse.californium.rule.CoapNetworkRule; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +/** + * Test to clarify Californium issue #834 + * + * @author Rogier Cobben + * + * Results : Failed tests: + * testGetWithLargePayload(nl.teslanet.test.cf.Issue834Test): wrong + * content length returned: expected:<8192> but was:<2012> + * testDeleteWithLargePayload(nl.teslanet.test.cf.Issue834Test): wrong + * content length returned: expected:<8192> but was:<2012> + * testPatchWithLargePayload(nl.teslanet.test.cf.Issue834Test): wrong + * content length returned: expected:<8192> but was:<2012> + * testFetchWithLargePayload(nl.teslanet.test.cf.Issue834Test): wrong + * content length returned: expected:<8192> but was:<2012> Tests run: + * 12, Failures: 4, Errors: 0, Skipped: 0 + */ +public class Issue834Test { + @ClassRule + public static CoapNetworkRule network = new CoapNetworkRule(CoapNetworkRule.Mode.DIRECT, CoapNetworkRule.Mode.NATIVE); + + private static final int SMALL_CONTENT_SIZE = 10; + private static final int LARGE_CONTENT_SIZE = 8192; + private static CoapServer server = null; + private CoapClient client = null; + + /** + * Start server + */ + @BeforeClass + public static void setupServer() { + NetworkConfig config = network.getStandardTestConfig(); + config.set(NetworkConfig.Keys.UDP_CONNECTOR_DATAGRAM_SIZE, LARGE_CONTENT_SIZE * 2); + server = new CoapServer(); + server.add(new PayloadLengthResource("return_payload_length")); + server.start(); + } + + /** + * Stop server + */ + @AfterClass + public static void tearDownServer() { + if (server != null) { + server.stop(); + server.destroy(); + server = null; + } + } + + /** + * create client + */ + @Before + public void setupClient() { + client = new CoapClient("coap://127.0.0.1/return_payload_length"); + } + + /** + * destroy client + */ + @After + public void tearDownClient() { + if (client != null) { + client.shutdown(); + client = null; + } + } + + /** + * test service using given payload and assert returned responsecode and + * payload length + * + * @param code is the request code to use + * @param payload to use in the request + * @param expect is the expected response code + */ + public void runTestCase(Code code, byte[] payload, ResponseCode expect) { + Request request = new Request(code); + if (!request.isIntendedPayload()) { + request.setUnintendedPayload(); + } + request.setPayload(payload); + + CoapResponse response = client.advanced(request); + + assertNotNull("no response: ", response); + assertEquals("wrong responsecode: ", expect, response.getCode()); + assertEquals("wrong content length returned: ", payload.length, Integer.parseInt(response.getResponseText())); + } + + /** + * test get with small payload + */ + @Test + public void testGetWithSmallPayload() { + runTestCase(Code.GET, getSmallContent(), ResponseCode.CONTENT); + } + + /** + * test post with small payload + */ + @Test + public void testPostWithSmallPayload() { + runTestCase(Code.POST, getSmallContent(), ResponseCode.CREATED); + } + + /** + * test put with small payload + */ + @Test + public void testPutWithSmallPayload() { + runTestCase(Code.PUT, getSmallContent(), ResponseCode.CHANGED); + } + + /** + * test delete with small payload + */ + @Test + public void testDeleteWithSmallPayload() { + runTestCase(Code.DELETE, getSmallContent(), ResponseCode.DELETED); + } + + /** + * test fetch with small payload + */ + @Test + public void testFetchWithSmallPayload() { + runTestCase(Code.FETCH, getSmallContent(), ResponseCode.CONTENT); + } + + /** + * test patch with small payload + */ + @Test + public void testPatchWithSmallPayload() { + runTestCase(Code.PATCH, getSmallContent(), ResponseCode.CHANGED); + } + + /** + * test get with large payload + */ + @Test + public void testGetWithLargePayload() { + runTestCase(Code.GET, getLargeContent(), ResponseCode.CONTENT); + } + + /** + * test post with large payload + */ + @Test + public void testPostWithLargePayload() { + runTestCase(Code.POST, getLargeContent(), ResponseCode.CREATED); + } + + /** + * test put with large payload + */ + @Test + public void testPutWithLargePayload() { + runTestCase(Code.PUT, getLargeContent(), ResponseCode.CHANGED); + } + + /** + * test delete with large payload + */ + @Test + public void testDeleteWithLargePayload() { + runTestCase(Code.DELETE, getLargeContent(), ResponseCode.DELETED); + } + + /** + * test fetch with larege payload + */ + @Test + public void testFetchWithLargePayload() { + runTestCase(Code.FETCH, getLargeContent(), ResponseCode.CONTENT); + } + + /** + * test patch with large payload + */ + @Test + public void testPatchWithLargePayload() { + runTestCase(Code.PATCH, getLargeContent(), ResponseCode.CHANGED); + } + + /** + * Create small test content + * + * @return the test content + */ + public static byte[] getSmallContent() { + byte[] content = new byte[SMALL_CONTENT_SIZE]; + for (int i = 0; i < SMALL_CONTENT_SIZE; i++) { + content[i] = (byte) (i % (Byte.MAX_VALUE + 1)); + } + return content; + } + + /** + * Create large test content + * + * @return the test content + */ + public static byte[] getLargeContent() { + byte[] content = new byte[LARGE_CONTENT_SIZE]; + for (int i = 0; i < LARGE_CONTENT_SIZE; i++) { + content[i] = (byte) (i % (Byte.MAX_VALUE + 1)); + } + return content; + } + + /** + * Service resource + * + */ + public static class PayloadLengthResource extends CoapResource { + + public PayloadLengthResource(String name) { + super(name); + } + + @Override + public void handleGET(CoapExchange exchange) { + byte[] requestPayload = exchange.getRequestPayload(); + exchange.respond(ResponseCode.CONTENT, Integer.toString(requestPayload.length)); + } + + @Override + public void handlePOST(CoapExchange exchange) { + byte[] requestPayload = exchange.getRequestPayload(); + exchange.respond(ResponseCode.CREATED, Integer.toString(requestPayload.length)); + } + + @Override + public void handlePUT(CoapExchange exchange) { + byte[] requestPayload = exchange.getRequestPayload(); + exchange.respond(ResponseCode.CHANGED, Integer.toString(requestPayload.length)); + } + + @Override + public void handleDELETE(CoapExchange exchange) { + byte[] requestPayload = exchange.getRequestPayload(); + exchange.respond(ResponseCode.DELETED, Integer.toString(requestPayload.length)); + } + + @Override + public void handleFETCH(CoapExchange exchange) { + byte[] requestPayload = exchange.getRequestPayload(); + exchange.respond(ResponseCode.CONTENT, Integer.toString(requestPayload.length)); + } + + @Override + public void handlePATCH(CoapExchange exchange) { + byte[] requestPayload = exchange.getRequestPayload(); + exchange.respond(ResponseCode.CHANGED, Integer.toString(requestPayload.length)); + } + } +} diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java index b251ba66dd..d096bb6330 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java @@ -23,7 +23,6 @@ import org.eclipse.californium.core.Utils; import org.eclipse.californium.core.coap.BlockOption; import org.eclipse.californium.core.coap.EmptyMessage; -import org.eclipse.californium.core.coap.Message; import org.eclipse.californium.core.coap.MessageObserver; import org.eclipse.californium.core.coap.MessageObserverAdapter; import org.eclipse.californium.core.coap.OptionSet; @@ -189,8 +188,8 @@ protected abstract class LoggingMessageObserver extends MessageObserverAdapter { private final MessageObserver errorInjectorObserver; - protected LoggingMessageObserver(final ErrorInjector errorInjector, final Message message) { - this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver(message); + protected LoggingMessageObserver(final ErrorInjector errorInjector) { + this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver(); } private void countDown() { diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java index c3228d28c1..c0e723f49b 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java @@ -38,7 +38,7 @@ public synchronized void sendRequest(final Request request) { appendRequestDetails(request); if (errorInjector != null) { buffer.append(" (should be dropped by error)"); - request.addMessageObserver(new LoggingMessageObserver(errorInjector, request) { + request.addMessageObserver(new LoggingMessageObserver(errorInjector) { @Override public void log(IntendedTestException exception) { diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java index b835d4d2cc..38f7304404 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java @@ -45,7 +45,7 @@ public synchronized void sendResponse(final Response response) { if (errorInjector != null) { logNewLine("(should be dropped by error) "); appendResponseDetails(response); - response.addMessageObserver(new LoggingMessageObserver(errorInjector, response) { + response.addMessageObserver(new LoggingMessageObserver(errorInjector) { @Override public void log(IntendedTestException exception) { diff --git a/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java b/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java index 2812253eec..f6436e00a6 100644 --- a/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java +++ b/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java @@ -29,6 +29,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Serial executor. * @@ -36,6 +39,8 @@ */ public class SerialExecutor extends AbstractExecutorService { + private static final Logger LOGGER = LoggerFactory.getLogger(SerialExecutor.class.getName()); + /** * Target executor to execute job serially. */ @@ -270,6 +275,8 @@ public void run() { setOwner(); try { command.run(); + } catch (Throwable t) { + LOGGER.error("unexpected error occurred:", t); } finally { clearOwner(); }