Skip to content

Commit

Permalink
Generally processing of exception with serial execution.
Browse files Browse the repository at this point in the history
Signed-off-by: Achim Kraus <[email protected]>
  • Loading branch information
Achim Kraus committed Jan 11, 2019
1 parent a90366c commit a94f267
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
* was started.
* Achim Kraus (Bosch Software Innovations GmbH) - cancel pending messages on stop().
* Achim Kraus (Bosch Software Innovations GmbH) - add support for multicast
* Achim Kraus (Bosch Software Innovations GmbH) - move response retransmission
* setup to BaseCoapStack
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -547,12 +549,6 @@ public void sendResponse(final Exchange exchange, final Response response) {
exchange.execute(new Runnable() {
@Override
public void run() {
if (exchange.getRequest().getOptions().hasObserve()) {
// observe- or cancel-observe-requests may have multiple responses
// when observes are finished, the last response has no longer an
// observe option. Therefore check the request for it.
exchange.retransmitResponse();
}
coapstack.sendResponse(exchange, response);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ public void receiveRequest(final Request request, final EndpointReceiver receive

@Override
public void run() {
receiver.receiveRequest(previous, request);
try {
receiver.receiveRequest(previous, request);
} catch (Exception ex) {
LOGGER.debug("error receiving request {} again!", request, ex);
receiver.reject(request);
}
}
});
} else {
Expand All @@ -242,7 +247,12 @@ public void run() {

@Override
public void run() {
receiver.receiveRequest(exchange, request);
try {
receiver.receiveRequest(exchange, request);
} catch (Exception ex) {
LOGGER.debug("error receiving request {}", request, ex);
receiver.reject(request);
}
}
});
}
Expand Down Expand Up @@ -295,7 +305,7 @@ public void run() {
return;
}
} catch (Exception ex) {
LOGGER.error("error receiving response {} for {}", response, prev, ex);
LOGGER.debug("error receiving response {} for {}", response, prev, ex);
}
reject(response, receiver);
}
Expand Down Expand Up @@ -389,7 +399,7 @@ public void run() {
idByToken);
}
} catch (Exception ex) {
LOGGER.error("error receiving response {} for {}", response, exchange, ex);
LOGGER.debug("error receiving response {} for {}", response, exchange, ex);
}
reject(response, receiver);
}
Expand Down Expand Up @@ -434,7 +444,7 @@ public void run() {
idByMID);
}
} catch (Exception ex) {
LOGGER.error("error receiving empty message {} for {}", message, exchange, ex);
LOGGER.debug("error receiving empty message {} for {}", message, exchange, ex);
}
}
});
Expand All @@ -444,11 +454,11 @@ private void reject(final Response response, final EndpointReceiver receiver) {

if (response.getType() != Type.ACK && response.hasMID()) {
// reject only messages with MID, ignore for TCP
LOGGER.debug("rejecting unmatchable response from {}", response.getSourceContext());
LOGGER.debug("rejecting response from {}", response.getSourceContext());
receiver.reject(response);
}
}

private class RemoveHandlerImpl implements RemoveHandler {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,39 @@ protected final void setLayers(final Layer specificLayers[]) {
@Override
public void sendRequest(final Exchange exchange, final Request request) {
// delegate to top
top.sendRequest(exchange, request);
try {
top.sendRequest(exchange, request);
} catch (Exception ex) {
request.setSendError(ex);
}
}

@Override
public void sendResponse(final Exchange exchange, final Response response) {
// delegate to top
top.sendResponse(exchange, response);
try {
if (exchange.getRequest().getOptions().hasObserve()) {
// observe- or cancel-observe-requests may have
// multiple responses.
// when observes are finished, the last response has
// no longer an observe option. Therefore check the
// request for it.
exchange.retransmitResponse();
}
top.sendResponse(exchange, response);
} catch (Exception ex) {
response.setSendError(ex);
}
}

@Override
public void sendEmptyMessage(final Exchange exchange, final EmptyMessage message) {
// delegate to top
top.sendEmptyMessage(exchange, message);
try {
top.sendEmptyMessage(exchange, message);
} catch (Exception ex) {
message.setSendError(ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ public synchronized Request getNextRequestBlock() {
block.setDestinationContext(request.getDestinationContext());
// copy options
block.setOptions(new OptionSet(request.getOptions()));
// copy message observers so that a failing blockwise request also notifies observers registered with
// the original request
// copy message observers so that a failing blockwise request
// also notifies observers registered with the original request
block.addMessageObservers(request.getMessageObservers());
block.addMessageObserver(new FailureForwardingMessageObserver(request));

if (num == 0) {
// indicate overall body size to peer
block.getOptions().setSize1(request.getPayloadSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,25 @@ private void sendNextBlock(final Exchange exchange, final Response response, fin
}
int nextNum = status.getCurrentNum() + currentSize / newSize;
LOGGER.debug("sending next Block1 num={}", nextNum);
Request nextBlock = status.getNextRequestBlock(nextNum, newSzx);
// we use the same token to ease traceability
nextBlock.setToken(response.getToken());
nextBlock.setDestinationContext(response.getSourceContext());
addBlock1CleanUpObserver(nextBlock, key, status);

exchange.setCurrentRequest(nextBlock);
prepareBlock1Cleanup(status, key);
lower().sendRequest(exchange, nextBlock);
Request nextBlock = null;
try {
nextBlock = status.getNextRequestBlock(nextNum, newSzx);
// we use the same token to ease traceability
nextBlock.setToken(response.getToken());
nextBlock.setDestinationContext(response.getSourceContext());
addBlock1CleanUpObserver(nextBlock, key, status);

exchange.setCurrentRequest(nextBlock);
prepareBlock1Cleanup(status, key);
lower().sendRequest(exchange, nextBlock);
} catch (Exception ex) {
LOGGER.debug("cannot process next block request, aborting request!", ex);
if (nextBlock != null) {
nextBlock.setSendError(ex);
} else {
exchange.getRequest().setSendError(ex);
}
}
}

/**
Expand Down Expand Up @@ -894,45 +904,51 @@ private void handleBlock2Response(final Exchange exchange, final Response respon
Request request = exchange.getRequest();

Request block = new Request(request.getCode());
// do not enforce CON, since NON could make sense over SMS or similar transports
block.setType(request.getType());
block.setDestinationContext(response.getSourceContext());

/*
* WARNING:
*
* For Observe, the Matcher then will store the same
* exchange under a different KeyToken in exchangesByToken,
* which is cleaned up in the else case below.
*/
if (!response.isNotification()) {
block.setToken(response.getToken());
} else if (exchange.isNotification()) {
// Recreate cleanup message observer
request.addMessageObserver(new CleanupMessageObserver(exchange));
}

// copy options
block.setOptions(new OptionSet(request.getOptions()));
block.getOptions().setBlock2(newSzx, false, nextNum);

// make sure NOT to use Observe for block retrieval
block.getOptions().removeObserve();

// copy message observers from original request so that they will be notified
// if something goes wrong with this blockwise request, e.g. if it times out
block.addMessageObservers(request.getMessageObservers());
// add an observer that cleans up the block2 transfer tracker if the
// block request fails
addBlock2CleanUpObserver(block, key, status);


status.setCurrentNum(nextNum);
block.addMessageObserver(new FailureForwardingMessageObserver(request));
try {

// do not enforce CON, since NON could make sense over SMS or similar transports
block.setType(request.getType());
block.setDestinationContext(response.getSourceContext());

/*
* WARNING:
*
* For Observe, the Matcher then will store the same
* exchange under a different KeyToken in exchangesByToken,
* which is cleaned up in the else case below.
*/
if (!response.isNotification()) {
block.setToken(response.getToken());
} else if (exchange.isNotification()) {
// Recreate cleanup message observer
request.addMessageObserver(new CleanupMessageObserver(exchange));
}

LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block);
exchange.setCurrentRequest(block);
prepareBlock2Cleanup(status, key);
lower().sendRequest(exchange, block);
// copy options
block.setOptions(new OptionSet(request.getOptions()));
block.getOptions().setBlock2(newSzx, false, nextNum);

// make sure NOT to use Observe for block retrieval
block.getOptions().removeObserve();

// copy message observers from original request so that they will be notified
// if something goes wrong with this blockwise request, e.g. if it times out
block.addMessageObservers(request.getMessageObservers());
// add an observer that cleans up the block2 transfer tracker if the
// block request fails
addBlock2CleanUpObserver(block, key, status);

status.setCurrentNum(nextNum);

LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block);
exchange.setCurrentRequest(block);
prepareBlock2Cleanup(status, key);
lower().sendRequest(exchange, block);
} catch (Exception ex) {
LOGGER.debug("cannot process next block request, aborting request!", ex);
block.setSendError(ex);
}

} else {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2019 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Bosch Software Innovations GmbH - initial creation
******************************************************************************/
package org.eclipse.californium.core.network.stack;

import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Forward failures to other messages.
*
* Forward {@link #onCancel()}, {@link #onReject()}, {@link #onTimeout()}, and
* {@link #onSendError(Throwable)} to provided message. Intended to be used for
* blockwise transfers
*/
public class FailureForwardingMessageObserver extends MessageObserverAdapter {

protected static final Logger LOGGER = LoggerFactory.getLogger(FailureForwardingMessageObserver.class.getName());

protected final Message message;

protected FailureForwardingMessageObserver(final Message message) {
this.message = message;
}

@Override
public void onCancel() {
message.cancel();
}

@Override
public void onReject() {
message.setRejected(true);
}

@Override
public void onTimeout() {
message.setTimedOut(true);
}

@Override
public void onSendError(Throwable error) {
message.setSendError(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
Expand Down Expand Up @@ -45,46 +44,34 @@ public void setErrorOnReadyToSend() {

@Override
public void sendRequest(final Request request) {
request.addMessageObserver(new ErrorInjectorMessageObserver(request));
request.addMessageObserver(new ErrorInjectorMessageObserver());
}

@Override
public void sendResponse(final Response response) {
response.addMessageObserver(new ErrorInjectorMessageObserver(response));
response.addMessageObserver(new ErrorInjectorMessageObserver());
}

public class ErrorInjectorMessageObserver extends MessageObserverAdapter {

private Message message;

public ErrorInjectorMessageObserver(Message message) {
this.message = message;
}

@Override
public void onReadyToSend() {
if (errorOnReadyToSend.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error before to sent");
message.setSendError(exception);
throw exception;
if (errorOnReadyToSend.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error before to sent");
}
}

@Override
public void onSent() {
if (errorOnReadyToSend.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error on sent");
message.setSendError(exception);
throw exception;
if (errorOnReadyToSend.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error on sent");
}
}

@Override
public void onContextEstablished(EndpointContext endpointContext) {
if (errorOnEstablishedContext.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error on context established");
message.setSendError(exception);
throw exception;
if (errorOnEstablishedContext.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error on context established");
}
}
}
Expand Down
Loading

0 comments on commit a94f267

Please sign in to comment.