Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generally processing of exception with serial execution. #848

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
* was started.
* Achim Kraus (Bosch Software Innovations GmbH) - cancel pending messages on stop().
* Achim Kraus (Bosch Software Innovations GmbH) - add support for multicast
* Achim Kraus (Bosch Software Innovations GmbH) - move response retransmission
* setup to BaseCoapStack to include
* it also in a try-catch
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -547,12 +550,6 @@ public void sendResponse(final Exchange exchange, final Response response) {
exchange.execute(new Runnable() {
@Override
public void run() {
if (exchange.getRequest().getOptions().hasObserve()) {
// observe- or cancel-observe-requests may have multiple responses
// when observes are finished, the last response has no longer an
// observe option. Therefore check the request for it.
exchange.retransmitResponse();
}
coapstack.sendResponse(exchange, response);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ public void receiveRequest(final Request request, final EndpointReceiver receive

@Override
public void run() {
receiver.receiveRequest(previous, request);
try {
receiver.receiveRequest(previous, request);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving request {} again!", request, ex);
receiver.reject(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if sending RST is a good idea... 🤔

Did you consider to send an Internal Server Error Response ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added those try-catch, because without, nothing happens.
For a "Internal Server Error Response" I would try to create more speaky ones.
If a Exception reaches that, I hope the log will help to fix the bug or create a speaky response.

}
}
});
} else {
Expand All @@ -242,7 +247,12 @@ public void run() {

@Override
public void run() {
receiver.receiveRequest(exchange, request);
try {
receiver.receiveRequest(exchange, request);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving request {}", request, ex);
receiver.reject(request);
}
}
});
}
Expand Down Expand Up @@ -294,8 +304,8 @@ public void run() {
receiver.receiveResponse(prev, response);
return;
}
} catch (Exception ex) {
LOGGER.error("error receiving response {} for {}", response, prev, ex);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving response {} for {}", response, prev, ex);
}
reject(response, receiver);
}
Expand Down Expand Up @@ -388,8 +398,8 @@ public void run() {
LOGGER.debug("ignoring potentially forged response for token {} with non-matching endpoint context",
idByToken);
}
} catch (Exception ex) {
LOGGER.error("error receiving response {} for {}", response, exchange, ex);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving response {} for {}", response, exchange, ex);
}
reject(response, receiver);
}
Expand Down Expand Up @@ -433,8 +443,8 @@ public void run() {
"ignoring potentially forged reply for message {} with non-matching endpoint context",
idByMID);
}
} catch (Exception ex) {
LOGGER.error("error receiving empty message {} for {}", message, exchange, ex);
} catch (RuntimeException ex) {
LOGGER.warn("error receiving empty message {} for {}", message, exchange, ex);
}
}
});
Expand All @@ -444,11 +454,11 @@ private void reject(final Response response, final EndpointReceiver receiver) {

if (response.getType() != Type.ACK && response.hasMID()) {
// reject only messages with MID, ignore for TCP
LOGGER.debug("rejecting unmatchable response from {}", response.getSourceContext());
LOGGER.debug("rejecting response from {}", response.getSourceContext());
receiver.reject(response);
}
}

private class RemoveHandlerImpl implements RemoveHandler {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,39 @@ protected final void setLayers(final Layer specificLayers[]) {
@Override
public void sendRequest(final Exchange exchange, final Request request) {
// delegate to top
top.sendRequest(exchange, request);
try {
top.sendRequest(exchange, request);
} catch (RuntimeException ex) {
request.setSendError(ex);
}
}

@Override
public void sendResponse(final Exchange exchange, final Response response) {
// delegate to top
top.sendResponse(exchange, response);
try {
if (exchange.getRequest().getOptions().hasObserve()) {
// observe- or cancel-observe-requests may have
// multiple responses.
// when observes are finished, the last response has
// no longer an observe option. Therefore check the
// request for it.
exchange.retransmitResponse();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is totally out of topic because you just move this if block from CoapEndpoint to BaseCoapStack but I can not get the point of it.

Copy link
Contributor Author

@boaks boaks Jan 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I move it into the "try - catch". The CoapEndpoint, for some reasons, use

		if (exchange.checkOwner()) {
			// send response while processing exchange.
			coapstack.sendResponse(exchange, response);
		} else {
			exchange.execute(new Runnable() {
				@Override
				public void run() {
					coapstack.sendResponse(exchange, response);
				}
			});
		}

In my very first variant, I used two try-catch, but moving it reduces that. And moving the if into the try-catch makes it somehow more complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question was more about the code itself (exchange.retransmitResponse) instead of the move. (that's why I notice that my question was totally out of topic :p)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. A server-side-observe-exchange gets completed with the current response. Before sending the next notify, the Exchange must be prepared to accept that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok It's a bit clearer
(even If I do not really understand how we do that as the function just set the exchange as "not complete".

At first sight, I was confused because I thought that retransmitResponse does "retransmit the previous response)

top.sendResponse(exchange, response);
} catch (RuntimeException ex) {
response.setSendError(ex);
}
}

@Override
public void sendEmptyMessage(final Exchange exchange, final EmptyMessage message) {
// delegate to top
top.sendEmptyMessage(exchange, message);
try {
top.sendEmptyMessage(exchange, message);
} catch (RuntimeException ex) {
message.setSendError(ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ public synchronized Request getNextRequestBlock() {
block.setDestinationContext(request.getDestinationContext());
// copy options
block.setOptions(new OptionSet(request.getOptions()));
// copy message observers so that a failing blockwise request also notifies observers registered with
// the original request
// copy message observers so that a failing blockwise request
// also notifies observers registered with the original request
block.addMessageObservers(request.getMessageObservers());

if (num == 0) {
// indicate overall body size to peer
block.getOptions().setSize1(request.getPayloadSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,25 @@ private void sendNextBlock(final Exchange exchange, final Response response, fin
}
int nextNum = status.getCurrentNum() + currentSize / newSize;
LOGGER.debug("sending next Block1 num={}", nextNum);
Request nextBlock = status.getNextRequestBlock(nextNum, newSzx);
// we use the same token to ease traceability
nextBlock.setToken(response.getToken());
nextBlock.setDestinationContext(response.getSourceContext());
addBlock1CleanUpObserver(nextBlock, key, status);

exchange.setCurrentRequest(nextBlock);
prepareBlock1Cleanup(status, key);
lower().sendRequest(exchange, nextBlock);
Request nextBlock = null;
try {
nextBlock = status.getNextRequestBlock(nextNum, newSzx);
// we use the same token to ease traceability
nextBlock.setToken(response.getToken());
nextBlock.setDestinationContext(response.getSourceContext());
addBlock1CleanUpObserver(nextBlock, key, status);

exchange.setCurrentRequest(nextBlock);
prepareBlock1Cleanup(status, key);
lower().sendRequest(exchange, nextBlock);
} catch (RuntimeException ex) {
LOGGER.warn("cannot process next block request, aborting request!", ex);
if (nextBlock != null) {
nextBlock.setSendError(ex);
} else {
exchange.getRequest().setSendError(ex);
}
}
}

/**
Expand Down Expand Up @@ -894,45 +904,50 @@ private void handleBlock2Response(final Exchange exchange, final Response respon
Request request = exchange.getRequest();

Request block = new Request(request.getCode());
// do not enforce CON, since NON could make sense over SMS or similar transports
block.setType(request.getType());
block.setDestinationContext(response.getSourceContext());

/*
* WARNING:
*
* For Observe, the Matcher then will store the same
* exchange under a different KeyToken in exchangesByToken,
* which is cleaned up in the else case below.
*/
if (!response.isNotification()) {
block.setToken(response.getToken());
} else if (exchange.isNotification()) {
// Recreate cleanup message observer
request.addMessageObserver(new CleanupMessageObserver(exchange));
}

// copy options
block.setOptions(new OptionSet(request.getOptions()));
block.getOptions().setBlock2(newSzx, false, nextNum);

// make sure NOT to use Observe for block retrieval
block.getOptions().removeObserve();

// copy message observers from original request so that they will be notified
// if something goes wrong with this blockwise request, e.g. if it times out
block.addMessageObservers(request.getMessageObservers());
// add an observer that cleans up the block2 transfer tracker if the
// block request fails
addBlock2CleanUpObserver(block, key, status);


status.setCurrentNum(nextNum);
try {

// do not enforce CON, since NON could make sense over SMS or similar transports
block.setType(request.getType());
block.setDestinationContext(response.getSourceContext());

/*
* WARNING:
*
* For Observe, the Matcher then will store the same
* exchange under a different KeyToken in exchangesByToken,
* which is cleaned up in the else case below.
*/
if (!response.isNotification()) {
block.setToken(response.getToken());
} else if (exchange.isNotification()) {
// Recreate cleanup message observer
request.addMessageObserver(new CleanupMessageObserver(exchange));
}

LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block);
exchange.setCurrentRequest(block);
prepareBlock2Cleanup(status, key);
lower().sendRequest(exchange, block);
// copy options
block.setOptions(new OptionSet(request.getOptions()));
block.getOptions().setBlock2(newSzx, false, nextNum);

// make sure NOT to use Observe for block retrieval
block.getOptions().removeObserve();

// copy message observers from original request so that they will be notified
// if something goes wrong with this blockwise request, e.g. if it times out
block.addMessageObservers(request.getMessageObservers());
// add an observer that cleans up the block2 transfer tracker if the
// block request fails
addBlock2CleanUpObserver(block, key, status);

status.setCurrentNum(nextNum);

LOGGER.debug("requesting next Block2 [num={}]: {}", nextNum, block);
exchange.setCurrentRequest(block);
prepareBlock2Cleanup(status, key);
lower().sendRequest(exchange, block);
} catch (RuntimeException ex) {
LOGGER.warn("cannot process next block request, aborting request!", ex);
block.setSendError(ex);
}

} else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
Expand Down Expand Up @@ -45,46 +44,34 @@ public void setErrorOnReadyToSend() {

@Override
public void sendRequest(final Request request) {
request.addMessageObserver(new ErrorInjectorMessageObserver(request));
request.addMessageObserver(new ErrorInjectorMessageObserver());
}

@Override
public void sendResponse(final Response response) {
response.addMessageObserver(new ErrorInjectorMessageObserver(response));
response.addMessageObserver(new ErrorInjectorMessageObserver());
}

public class ErrorInjectorMessageObserver extends MessageObserverAdapter {

private Message message;

public ErrorInjectorMessageObserver(Message message) {
this.message = message;
}

@Override
public void onReadyToSend() {
if (errorOnReadyToSend.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error before to sent");
message.setSendError(exception);
throw exception;
if (errorOnReadyToSend.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error before to sent");
}
}

@Override
public void onSent() {
if (errorOnReadyToSend.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error on sent");
message.setSendError(exception);
throw exception;
if (errorOnReadyToSend.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error on sent");
}
}

@Override
public void onContextEstablished(EndpointContext endpointContext) {
if (errorOnEstablishedContext.getAndSet(false)) {
RuntimeException exception = new IntendedTestException("Simulate error on context established");
message.setSendError(exception);
throw exception;
if (errorOnEstablishedContext.compareAndSet(true, false)) {
throw new IntendedTestException("Simulate error on context established");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.californium.core.Utils;
import org.eclipse.californium.core.coap.BlockOption;
import org.eclipse.californium.core.coap.EmptyMessage;
import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.MessageObserver;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.eclipse.californium.core.coap.OptionSet;
Expand Down Expand Up @@ -189,8 +188,8 @@ protected abstract class LoggingMessageObserver extends MessageObserverAdapter {

private final MessageObserver errorInjectorObserver;

protected LoggingMessageObserver(final ErrorInjector errorInjector, final Message message) {
this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver(message);
protected LoggingMessageObserver(final ErrorInjector errorInjector) {
this.errorInjectorObserver = errorInjector.new ErrorInjectorMessageObserver();
}

private void countDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public synchronized void sendRequest(final Request request) {
appendRequestDetails(request);
if (errorInjector != null) {
buffer.append(" (should be dropped by error)");
request.addMessageObserver(new LoggingMessageObserver(errorInjector, request) {
request.addMessageObserver(new LoggingMessageObserver(errorInjector) {

@Override
public void log(IntendedTestException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public synchronized void sendResponse(final Response response) {
if (errorInjector != null) {
logNewLine("(should be dropped by error) ");
appendResponseDetails(response);
response.addMessageObserver(new LoggingMessageObserver(errorInjector, response) {
response.addMessageObserver(new LoggingMessageObserver(errorInjector) {

@Override
public void log(IntendedTestException exception) {
Expand Down
Loading