diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java b/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java index 46cd6648e0..c98d019ad5 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/CoapEndpoint.java @@ -67,6 +67,9 @@ * was started. * Achim Kraus (Bosch Software Innovations GmbH) - cancel pending messages on stop(). * Achim Kraus (Bosch Software Innovations GmbH) - add support for multicast + * Achim Kraus (Bosch Software Innovations GmbH) - move response retransmission + * setup to BaseCoapStack to include + * it also in a try-catch ******************************************************************************/ package org.eclipse.californium.core.network; @@ -547,12 +550,6 @@ public void sendResponse(final Exchange exchange, final Response response) { exchange.execute(new Runnable() { @Override public void run() { - if (exchange.getRequest().getOptions().hasObserve()) { - // observe- or cancel-observe-requests may have multiple responses - // when observes are finished, the last response has no longer an - // observe option. Therefore check the request for it. - exchange.retransmitResponse(); - } coapstack.sendResponse(exchange, response); } }); diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java b/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java index 04b15fc513..f8a64cb211 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java @@ -233,7 +233,12 @@ public void receiveRequest(final Request request, final EndpointReceiver receive @Override public void run() { - receiver.receiveRequest(previous, request); + try { + receiver.receiveRequest(previous, request); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving request {} again!", request, ex); + receiver.reject(request); + } } }); } else { @@ -242,7 +247,12 @@ public void run() { @Override public void run() { - receiver.receiveRequest(exchange, request); + try { + receiver.receiveRequest(exchange, request); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving request {}", request, ex); + receiver.reject(request); + } } }); } @@ -294,8 +304,8 @@ public void run() { receiver.receiveResponse(prev, response); return; } - } catch (Exception ex) { - LOGGER.error("error receiving response {} for {}", response, prev, ex); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving response {} for {}", response, prev, ex); } reject(response, receiver); } @@ -388,8 +398,8 @@ public void run() { LOGGER.debug("ignoring potentially forged response for token {} with non-matching endpoint context", idByToken); } - } catch (Exception ex) { - LOGGER.error("error receiving response {} for {}", response, exchange, ex); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving response {} for {}", response, exchange, ex); } reject(response, receiver); } @@ -433,8 +443,8 @@ public void run() { "ignoring potentially forged reply for message {} with non-matching endpoint context", idByMID); } - } catch (Exception ex) { - LOGGER.error("error receiving empty message {} for {}", message, exchange, ex); + } catch (RuntimeException ex) { + LOGGER.warn("error receiving empty message {} for {}", message, exchange, ex); } } }); @@ -444,11 +454,11 @@ private void reject(final Response response, final EndpointReceiver receiver) { if (response.getType() != Type.ACK && response.hasMID()) { // reject only messages with MID, ignore for TCP - LOGGER.debug("rejecting unmatchable response from {}", response.getSourceContext()); + LOGGER.debug("rejecting response from {}", response.getSourceContext()); receiver.reject(response); } } - + private class RemoveHandlerImpl implements RemoveHandler { @Override diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java index 0e19cf1570..02c623b155 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java @@ -81,19 +81,39 @@ protected final void setLayers(final Layer specificLayers[]) { @Override public void sendRequest(final Exchange exchange, final Request request) { // delegate to top - top.sendRequest(exchange, request); + try { + top.sendRequest(exchange, request); + } catch (RuntimeException ex) { + request.setSendError(ex); + } } @Override public void sendResponse(final Exchange exchange, final Response response) { // delegate to top - top.sendResponse(exchange, response); + try { + if (exchange.getRequest().getOptions().hasObserve()) { + // observe- or cancel-observe-requests may have + // multiple responses. + // when observes are finished, the last response has + // no longer an observe option. Therefore check the + // request for it. + exchange.retransmitResponse(); + } + top.sendResponse(exchange, response); + } catch (RuntimeException ex) { + response.setSendError(ex); + } } @Override public void sendEmptyMessage(final Exchange exchange, final EmptyMessage message) { // delegate to top - top.sendEmptyMessage(exchange, message); + try { + top.sendEmptyMessage(exchange, message); + } catch (RuntimeException ex) { + message.setSendError(ex); + } } @Override diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java index 1c594056f1..b92385ec08 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/Block1BlockwiseStatus.java @@ -120,9 +120,10 @@ public synchronized Request getNextRequestBlock() { block.setDestinationContext(request.getDestinationContext()); // copy options block.setOptions(new OptionSet(request.getOptions())); - // copy message observers so that a failing blockwise request also notifies observers registered with - // the original request + // copy message observers so that a failing blockwise request + // also notifies observers registered with the original request block.addMessageObservers(request.getMessageObservers()); + if (num == 0) { // indicate overall body size to peer block.getOptions().setSize1(request.getPayloadSize()); diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java index 95939d3885..919c16e29b 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BlockwiseLayer.java @@ -808,15 +808,25 @@ private void sendNextBlock(final Exchange exchange, final Response response, fin } int nextNum = status.getCurrentNum() + currentSize / newSize; LOGGER.debug("sending next Block1 num={}", nextNum); - Request nextBlock = status.getNextRequestBlock(nextNum, newSzx); - // we use the same token to ease traceability - nextBlock.setToken(response.getToken()); - nextBlock.setDestinationContext(response.getSourceContext()); - addBlock1CleanUpObserver(nextBlock, key, status); - - exchange.setCurrentRequest(nextBlock); - prepareBlock1Cleanup(status, key); - lower().sendRequest(exchange, nextBlock); + Request nextBlock = null; + try { + nextBlock = status.getNextRequestBlock(nextNum, newSzx); + // we use the same token to ease traceability + nextBlock.setToken(response.getToken()); + nextBlock.setDestinationContext(response.getSourceContext()); + addBlock1CleanUpObserver(nextBlock, key, status); + + exchange.setCurrentRequest(nextBlock); + prepareBlock1Cleanup(status, key); + lower().sendRequest(exchange, nextBlock); + } catch (RuntimeException ex) { + LOGGER.warn("cannot process next block request, aborting request!", ex); + if (nextBlock != null) { + nextBlock.setSendError(ex); + } else { + exchange.getRequest().setSendError(ex); + } + } } /** @@ -894,45 +904,50 @@ private void handleBlock2Response(final Exchange exchange, final Response respon Request request = exchange.getRequest(); Request block = new Request(request.getCode()); - // do not enforce CON, since NON could make sense over SMS or similar transports - block.setType(request.getType()); - block.setDestinationContext(response.getSourceContext()); - - /* - * WARNING: - * - * For Observe, the Matcher then will store the same - * exchange under a different KeyToken in exchangesByToken, - * which is cleaned up in the else case below. - */ - if (!response.isNotification()) { - block.setToken(response.getToken()); - } else if (exchange.isNotification()) { - // Recreate cleanup message observer - request.addMessageObserver(new CleanupMessageObserver(exchange)); - } - - // copy options - block.setOptions(new OptionSet(request.getOptions())); - block.getOptions().setBlock2(newSzx, false, nextNum); - - // make sure NOT to use Observe for block retrieval - block.getOptions().removeObserve(); - - // copy message observers from original request so that they will be notified - // if something goes wrong with this blockwise request, e.g. if it times out - block.addMessageObservers(request.getMessageObservers()); - // add an observer that cleans up the block2 transfer tracker if the - // block request fails - addBlock2CleanUpObserver(block, key, status); - - - status.setCurrentNum(nextNum); + try { + + // do not enforce CON, since NON could make sense over SMS or similar transports + block.setType(request.getType()); + block.setDestinationContext(response.getSourceContext()); + + /* + * WARNING: + * + * For Observe, the Matcher then will store the same + * exchange under a different KeyToken in exchangesByToken, + * which is cleaned up in the else case below. + */ + if (!response.isNotification()) { + block.setToken(response.getToken()); + } else if (exchange.isNotification()) { + // Recreate cleanup message observer + request.addMessageObserver(new CleanupMessageObserver(exchange)); + } - LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block); - exchange.setCurrentRequest(block); - prepareBlock2Cleanup(status, key); - lower().sendRequest(exchange, block); + // copy options + block.setOptions(new OptionSet(request.getOptions())); + block.getOptions().setBlock2(newSzx, false, nextNum); + + // make sure NOT to use Observe for block retrieval + block.getOptions().removeObserve(); + + // copy message observers from original request so that they will be notified + // if something goes wrong with this blockwise request, e.g. if it times out + block.addMessageObservers(request.getMessageObservers()); + // add an observer that cleans up the block2 transfer tracker if the + // block request fails + addBlock2CleanUpObserver(block, key, status); + + status.setCurrentNum(nextNum); + + LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block); + exchange.setCurrentRequest(block); + prepareBlock2Cleanup(status, key); + lower().sendRequest(exchange, block); + } catch (RuntimeException ex) { + LOGGER.warn("cannot process next block request, aborting request!", ex); + block.setSendError(ex); + } } else { diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java b/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java index 210d9ed69c..551d2f818c 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/ErrorInjector.java @@ -17,7 +17,6 @@ import java.util.concurrent.atomic.AtomicBoolean; -import org.eclipse.californium.core.coap.Message; import org.eclipse.californium.core.coap.MessageObserverAdapter; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; @@ -45,46 +44,34 @@ public void setErrorOnReadyToSend() { @Override public void sendRequest(final Request request) { - request.addMessageObserver(new ErrorInjectorMessageObserver(request)); + request.addMessageObserver(new ErrorInjectorMessageObserver()); } @Override public void sendResponse(final Response response) { - response.addMessageObserver(new ErrorInjectorMessageObserver(response)); + response.addMessageObserver(new ErrorInjectorMessageObserver()); } public class ErrorInjectorMessageObserver extends MessageObserverAdapter { - private Message message; - - public ErrorInjectorMessageObserver(Message message) { - this.message = message; - } - @Override public void onReadyToSend() { - if (errorOnReadyToSend.getAndSet(false)) { - RuntimeException exception = new IntendedTestException("Simulate error before to sent"); - message.setSendError(exception); - throw exception; + if (errorOnReadyToSend.compareAndSet(true, false)) { + throw new IntendedTestException("Simulate error before to sent"); } } @Override public void onSent() { - if (errorOnReadyToSend.getAndSet(false)) { - RuntimeException exception = new IntendedTestException("Simulate error on sent"); - message.setSendError(exception); - throw exception; + if (errorOnReadyToSend.compareAndSet(true, false)) { + throw new IntendedTestException("Simulate error on sent"); } } @Override public void onContextEstablished(EndpointContext endpointContext) { - if (errorOnEstablishedContext.getAndSet(false)) { - RuntimeException exception = new IntendedTestException("Simulate error on context established"); - message.setSendError(exception); - throw exception; + if (errorOnEstablishedContext.compareAndSet(true, false)) { + throw new IntendedTestException("Simulate error on context established"); } } } diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java index b251ba66dd..d096bb6330 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/BlockwiseInterceptor.java @@ -23,7 +23,6 @@ import org.eclipse.californium.core.Utils; import org.eclipse.californium.core.coap.BlockOption; import org.eclipse.californium.core.coap.EmptyMessage; -import org.eclipse.californium.core.coap.Message; import org.eclipse.californium.core.coap.MessageObserver; import org.eclipse.californium.core.coap.MessageObserverAdapter; import org.eclipse.californium.core.coap.OptionSet; @@ -189,8 +188,8 @@ protected abstract class LoggingMessageObserver extends MessageObserverAdapter { private final MessageObserver errorInjectorObserver; - protected LoggingMessageObserver(final ErrorInjector errorInjector, final Message message) { - this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver(message); + protected LoggingMessageObserver(final ErrorInjector errorInjector) { + this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver(); } private void countDown() { diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java index c3228d28c1..c0e723f49b 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ClientBlockwiseInterceptor.java @@ -38,7 +38,7 @@ public synchronized void sendRequest(final Request request) { appendRequestDetails(request); if (errorInjector != null) { buffer.append(" (should be dropped by error)"); - request.addMessageObserver(new LoggingMessageObserver(errorInjector, request) { + request.addMessageObserver(new LoggingMessageObserver(errorInjector) { @Override public void log(IntendedTestException exception) { diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java index b835d4d2cc..38f7304404 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/lockstep/ServerBlockwiseInterceptor.java @@ -45,7 +45,7 @@ public synchronized void sendResponse(final Response response) { if (errorInjector != null) { logNewLine("(should be dropped by error) "); appendResponseDetails(response); - response.addMessageObserver(new LoggingMessageObserver(errorInjector, response) { + response.addMessageObserver(new LoggingMessageObserver(errorInjector) { @Override public void log(IntendedTestException exception) { diff --git a/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java b/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java index 2812253eec..f6436e00a6 100644 --- a/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java +++ b/element-connector/src/main/java/org/eclipse/californium/elements/util/SerialExecutor.java @@ -29,6 +29,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Serial executor. * @@ -36,6 +39,8 @@ */ public class SerialExecutor extends AbstractExecutorService { + private static final Logger LOGGER = LoggerFactory.getLogger(SerialExecutor.class.getName()); + /** * Target executor to execute job serially. */ @@ -270,6 +275,8 @@ public void run() { setOwner(); try { command.run(); + } catch (Throwable t) { + LOGGER.error("unexpected error occurred:", t); } finally { clearOwner(); }