Skip to content

Commit

Permalink
Generally processing of exception with serial execution.
Browse files Browse the repository at this point in the history
Signed-off-by: Achim Kraus <[email protected]>
  • Loading branch information
Achim Kraus committed Jan 21, 2019
1 parent 439397a commit a87c5e8
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
* was started.
* Achim Kraus (Bosch Software Innovations GmbH) - cancel pending messages on stop().
* Achim Kraus (Bosch Software Innovations GmbH) - add support for multicast
* Achim Kraus (Bosch Software Innovations GmbH) - move response retransmission
* setup to BaseCoapStack to include
* it also in a try-catch
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -547,12 +550,6 @@ public void sendResponse(final Exchange exchange, final Response response) {
exchange.execute(new Runnable() {
@Override
public void run() {
if (exchange.getRequest().getOptions().hasObserve()) {
// observe- or cancel-observe-requests may have multiple responses
// when observes are finished, the last response has no longer an
// observe option. Therefore check the request for it.
exchange.retransmitResponse();
}
coapstack.sendResponse(exchange, response);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ public void receiveRequest(final Request request, final EndpointReceiver receive

@Override
public void run() {
receiver.receiveRequest(previous, request);
try {
receiver.receiveRequest(previous, request);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving request {} again!", request, ex);
receiver.reject(request);
}
}
});
} else {
Expand All @@ -242,7 +247,12 @@ public void run() {

@Override
public void run() {
receiver.receiveRequest(exchange, request);
try {
receiver.receiveRequest(exchange, request);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving request {}", request, ex);
receiver.reject(request);
}
}
});
}
Expand Down Expand Up @@ -294,8 +304,8 @@ public void run() {
receiver.receiveResponse(prev, response);
return;
}
} catch (Exception ex) {
LOGGER.error("error receiving response {} for {}", response, prev, ex);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving response {} for {}", response, prev, ex);
}
reject(response, receiver);
}
Expand Down Expand Up @@ -388,8 +398,8 @@ public void run() {
LOGGER.debug("ignoring potentially forged response for token {} with non-matching endpoint context",
idByToken);
}
} catch (Exception ex) {
LOGGER.error("error receiving response {} for {}", response, exchange, ex);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving response {} for {}", response, exchange, ex);
}
reject(response, receiver);
}
Expand Down Expand Up @@ -433,8 +443,8 @@ public void run() {
"ignoring potentially forged reply for message {} with non-matching endpoint context",
idByMID);
}
} catch (Exception ex) {
LOGGER.error("error receiving empty message {} for {}", message, exchange, ex);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving empty message {} for {}", message, exchange, ex);
}
}
});
Expand All @@ -444,11 +454,11 @@ private void reject(final Response response, final EndpointReceiver receiver) {

if (response.getType() != Type.ACK && response.hasMID()) {
// reject only messages with MID, ignore for TCP
LOGGER.debug("rejecting unmatchable response from {}", response.getSourceContext());
LOGGER.debug("rejecting response from {}", response.getSourceContext());
receiver.reject(response);
}
}

private class RemoveHandlerImpl implements RemoveHandler {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,39 @@ protected final void setLayers(final Layer specificLayers[]) {
@Override
public void sendRequest(final Exchange exchange, final Request request) {
// delegate to top
top.sendRequest(exchange, request);
try {
top.sendRequest(exchange, request);
} catch (RuntimeException ex) {
request.setSendError(ex);
}
}

@Override
public void sendResponse(final Exchange exchange, final Response response) {
// delegate to top
top.sendResponse(exchange, response);
try {
if (exchange.getRequest().getOptions().hasObserve()) {
// observe- or cancel-observe-requests may have
// multiple responses.
// when observes are finished, the last response has
// no longer an observe option. Therefore check the
// request for it.
exchange.retransmitResponse();
}
top.sendResponse(exchange, response);
} catch (RuntimeException ex) {
response.setSendError(ex);
}
}

@Override
public void sendEmptyMessage(final Exchange exchange, final EmptyMessage message) {
// delegate to top
top.sendEmptyMessage(exchange, message);
try {
top.sendEmptyMessage(exchange, message);
} catch (RuntimeException ex) {
message.setSendError(ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ public synchronized Request getNextRequestBlock() {
block.setDestinationContext(request.getDestinationContext());
// copy options
block.setOptions(new OptionSet(request.getOptions()));
// copy message observers so that a failing blockwise request also notifies observers registered with
// the original request
// copy message observers so that a failing blockwise request
// also notifies observers registered with the original request
block.addMessageObservers(request.getMessageObservers());

if (num == 0) {
// indicate overall body size to peer
block.getOptions().setSize1(request.getPayloadSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,25 @@ private void sendNextBlock(final Exchange exchange, final Response response, fin
}
int nextNum = status.getCurrentNum() + currentSize / newSize;
LOGGER.debug("sending next Block1 num={}", nextNum);
Request nextBlock = status.getNextRequestBlock(nextNum, newSzx);
// we use the same token to ease traceability
nextBlock.setToken(response.getToken());
nextBlock.setDestinationContext(response.getSourceContext());
addBlock1CleanUpObserver(nextBlock, key, status);

exchange.setCurrentRequest(nextBlock);
prepareBlock1Cleanup(status, key);
lower().sendRequest(exchange, nextBlock);
Request nextBlock = null;
try {
nextBlock = status.getNextRequestBlock(nextNum, newSzx);
// we use the same token to ease traceability
nextBlock.setToken(response.getToken());
nextBlock.setDestinationContext(response.getSourceContext());
addBlock1CleanUpObserver(nextBlock, key, status);

exchange.setCurrentRequest(nextBlock);
prepareBlock1Cleanup(status, key);
lower().sendRequest(exchange, nextBlock);
} catch (RuntimeException ex) {
LOGGER.warn("cannot process next block request, aborting request!", ex);
if (nextBlock != null) {
nextBlock.setSendError(ex);
} else {
exchange.getRequest().setSendError(ex);
}
}
}

/**
Expand Down Expand Up @@ -894,45 +904,50 @@ private void handleBlock2Response(final Exchange exchange, final Response respon
Request request = exchange.getRequest();

Request block = new Request(request.getCode());
// do not enforce CON, since NON could make sense over SMS or similar transports
block.setType(request.getType());
block.setDestinationContext(response.getSourceContext());

/*
* WARNING:
*
* For Observe, the Matcher then will store the same
* exchange under a different KeyToken in exchangesByToken,
* which is cleaned up in the else case below.
*/
if (!response.isNotification()) {
block.setToken(response.getToken());
} else if (exchange.isNotification()) {
// Recreate cleanup message observer
request.addMessageObserver(new CleanupMessageObserver(exchange));
}

// copy options
block.setOptions(new OptionSet(request.getOptions()));
block.getOptions().setBlock2(newSzx, false, nextNum);

// make sure NOT to use Observe for block retrieval
block.getOptions().removeObserve();

// copy message observers from original request so that they will be notified
// if something goes wrong with this blockwise request, e.g. if it times out
block.addMessageObservers(request.getMessageObservers());
// add an observer that cleans up the block2 transfer tracker if the
// block request fails
addBlock2CleanUpObserver(block, key, status);


status.setCurrentNum(nextNum);
try {

// do not enforce CON, since NON could make sense over SMS or similar transports
block.setType(request.getType());
block.setDestinationContext(response.getSourceContext());

/*
* WARNING:
*
* For Observe, the Matcher then will store the same
* exchange under a different KeyToken in exchangesByToken,
* which is cleaned up in the else case below.
*/
if (!response.isNotification()) {
block.setToken(response.getToken());
} else if (exchange.isNotification()) {
// Recreate cleanup message observer
request.addMessageObserver(new CleanupMessageObserver(exchange));
}

LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block);
exchange.setCurrentRequest(block);
prepareBlock2Cleanup(status, key);
lower().sendRequest(exchange, block);
// copy options
block.setOptions(new OptionSet(request.getOptions()));
block.getOptions().setBlock2(newSzx, false, nextNum);

// make sure NOT to use Observe for block retrieval
block.getOptions().removeObserve();

// copy message observers from original request so that they will be notified
// if something goes wrong with this blockwise request, e.g. if it times out
block.addMessageObservers(request.getMessageObservers());
// add an observer that cleans up the block2 transfer tracker if the
// block request fails
addBlock2CleanUpObserver(block, key, status);

status.setCurrentNum(nextNum);

LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block);
exchange.setCurrentRequest(block);
prepareBlock2Cleanup(status, key);
lower().sendRequest(exchange, block);
} catch (RuntimeException ex) {
LOGGER.warn("cannot process next block request, aborting request!", ex);
block.setSendError(ex);
}

} else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
Expand Down Expand Up @@ -45,46 +44,34 @@ public void setErrorOnReadyToSend() {

@Override
public void sendRequest(final Request request) {
request.addMessageObserver(new ErrorInjectorMessageObserver(request));
request.addMessageObserver(new ErrorInjectorMessageObserver());
}

@Override
public void sendResponse(final Response response) {
response.addMessageObserver(new ErrorInjectorMessageObserver(response));
response.addMessageObserver(new ErrorInjectorMessageObserver());
}

public class ErrorInjectorMessageObserver extends MessageObserverAdapter {

private Message message;

public ErrorInjectorMessageObserver(Message message) {
this.message = message;
}

@Override
public void onReadyToSend() {
if (errorOnReadyToSend.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error before to sent");
message.setSendError(exception);
throw exception;
if (errorOnReadyToSend.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error before to sent");
}
}

@Override
public void onSent() {
if (errorOnReadyToSend.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error on sent");
message.setSendError(exception);
throw exception;
if (errorOnReadyToSend.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error on sent");
}
}

@Override
public void onContextEstablished(EndpointContext endpointContext) {
if (errorOnEstablishedContext.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error on context established");
message.setSendError(exception);
throw exception;
if (errorOnEstablishedContext.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error on context established");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.californium.core.Utils;
import org.eclipse.californium.core.coap.BlockOption;
import org.eclipse.californium.core.coap.EmptyMessage;
import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.MessageObserver;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.eclipse.californium.core.coap.OptionSet;
Expand Down Expand Up @@ -189,8 +188,8 @@ protected abstract class LoggingMessageObserver extends MessageObserverAdapter {

private final MessageObserver errorInjectorObserver;

protected LoggingMessageObserver(final ErrorInjector errorInjector, final Message message) {
this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver(message);
protected LoggingMessageObserver(final ErrorInjector errorInjector) {
this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver();
}

private void countDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public synchronized void sendRequest(final Request request) {
appendRequestDetails(request);
if (errorInjector != null) {
buffer.append(" (should be dropped by error)");
request.addMessageObserver(new LoggingMessageObserver(errorInjector, request) {
request.addMessageObserver(new LoggingMessageObserver(errorInjector) {

@Override
public void log(IntendedTestException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public synchronized void sendResponse(final Response response) {
if (errorInjector != null) {
logNewLine("(should be dropped by error) ");
appendResponseDetails(response);
response.addMessageObserver(new LoggingMessageObserver(errorInjector, response) {
response.addMessageObserver(new LoggingMessageObserver(errorInjector) {

@Override
public void log(IntendedTestException exception) {
Expand Down
Loading

0 comments on commit a87c5e8

Please sign in to comment.