Skip to content

Commit

Permalink
#834: UDPConnector now drops truncated payload
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jan 10, 2019
1 parent 532c03b commit ed2951a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ private class Receiver extends NetworkStageThread {

private Receiver(String name) {
super(name);
this.size = receiverPacketSize;
// we add one byte to be able to detect potential truncation.
this.size = receiverPacketSize + 1;
this.datagram = new DatagramPacket(new byte[size], size);
}

Expand All @@ -339,16 +340,23 @@ protected void work() throws IOException {
DatagramSocket currentSocket = getSocket();
if (currentSocket != null) {
currentSocket.receive(datagram);
LOGGER.debug("UDPConnector ({}) received {} bytes from {}:{}", effectiveAddr, datagram.getLength(),
datagram.getAddress(), datagram.getPort());
byte[] bytes = Arrays.copyOfRange(datagram.getData(), datagram.getOffset(), datagram.getLength());
RawData msg = RawData.inbound(bytes,
new UdpEndpointContext(new InetSocketAddress(datagram.getAddress(), datagram.getPort())),
false);
receiver.receiveData(msg);
if (datagram.getLength() >= size) {
// too large datagram for our buffer! data could have been
// truncated, so we discard it.
LOGGER.debug(
"UDPConnector ({}) received truncated UDP datagram from {}:{}. Maximum size allowed {}. Discarding ...",
effectiveAddr, datagram.getAddress(), datagram.getPort(), size - 1);
} else {
LOGGER.debug("UDPConnector ({}) received {} bytes from {}:{}", effectiveAddr, datagram.getLength(),
datagram.getAddress(), datagram.getPort());
byte[] bytes = Arrays.copyOfRange(datagram.getData(), datagram.getOffset(), datagram.getLength());
RawData msg = RawData.inbound(bytes,
new UdpEndpointContext(new InetSocketAddress(datagram.getAddress(), datagram.getPort())),
false);
receiver.receiveData(msg);
}
}
}

}

private class Sender extends NetworkStageThread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
package org.eclipse.californium.elements;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.californium.elements.rule.NetworkRule;
Expand All @@ -48,6 +53,7 @@ public class UDPConnectorTest {

UDPConnector connector;
UDPConnector destination;
LinkedBlockingQueue<RawData> incoming = new LinkedBlockingQueue<>();
TestEndpointContextMatcher matcher;

@Before
Expand All @@ -61,6 +67,7 @@ public void setup() throws IOException {

@Override
public void receiveData(RawData raw) {
incoming.offer(raw);
}
});
destination.start();
Expand Down Expand Up @@ -114,6 +121,49 @@ public void testMessageCallbackOnSent() throws InterruptedException {
assertThat(callback.toString(), callback.isSent(), is(true));
}

@Test
public void testTooLargeDatagramIsDropped() throws InterruptedException {
matcher.setMatches(2);

// ensure too large datagram is dropped
byte[] data = new byte[destination.getReceiverPacketSize()+1];
Arrays.fill(data, (byte) 1);
InetSocketAddress dest = destination.getAddress();
EndpointContext context = new UdpEndpointContext(dest);

RawData message = RawData.outbound(data, context, null, false);
connector.send(message);

RawData receivedData = incoming.poll(100, TimeUnit.MILLISECONDS);
assertThat("first received data:", receivedData, is(nullValue())); // null means packet is dropped

// ensure next datagram is correctly received
data = new byte[5];
Arrays.fill(data, (byte) 2);
context = new UdpEndpointContext(dest);
message = RawData.outbound(data, context, null, false);
connector.send(message);

receivedData = incoming.poll(1000000000, TimeUnit.MILLISECONDS);
assertThat("second received data:", receivedData, is(notNullValue()));
assertThat("bytes received:", receivedData.bytes, is(equalTo(data)));
}

@Test
public void testLargestDatagramIsReceived() throws InterruptedException {
byte[] data = new byte[destination.getReceiverPacketSize()];
Arrays.fill(data, (byte) 1);
InetSocketAddress dest = destination.getAddress();
EndpointContext context = new UdpEndpointContext(dest);

RawData message = RawData.outbound(data, context, null, false);
connector.send(message);

RawData receivedData = incoming.poll(100, TimeUnit.MILLISECONDS);
assertThat("second received data:", receivedData, is(notNullValue()));
assertThat("bytes received:", receivedData.bytes, is(equalTo(data)));
}

@Test
public void testMessageCallbackOnError() throws InterruptedException {
byte[] data = { 0, 1, 2 };
Expand Down Expand Up @@ -176,6 +226,13 @@ public TestEndpointContextMatcher(int count, int matches) {
this.latchSendMatcher = new CountDownLatch(count);
this.matches = new AtomicInteger(matches);
}

/**
* set the number of matches allowed
*/
public void setMatches(int matches) {
this.matches.set(matches);
}

public synchronized EndpointContext getMessageEndpointContext() {
return messageContext;
Expand Down

0 comments on commit ed2951a

Please sign in to comment.