Skip to content

Commit

Permalink
[Java] Handle fragment assemble when unreliable streams are enabled a…
Browse files Browse the repository at this point in the history
…nd loss can stretch to the middle of two fragmented messages.
  • Loading branch information
mjpt777 committed Jul 2, 2022
1 parent 2307c57 commit 54e8c73
Show file tree
Hide file tree
Showing 27 changed files with 384 additions and 299 deletions.
23 changes: 22 additions & 1 deletion aeron-client/src/main/java/io/aeron/BufferBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public final class BufferBuilder
static final int INIT_MIN_CAPACITY = 4096;

private final boolean isDirect;
private int limit = 0;
private int limit;
private int nextTermOffset;
private final UnsafeBuffer buffer;

/**
Expand Down Expand Up @@ -122,6 +123,26 @@ public void limit(final int limit)
this.limit = limit;
}

/**
* Get the value which the next term offset for a fragment to be assembled should begin at.
*
* @return the value which the next term offset for a fragment to be assembled should begin at.
*/
public int nextTermOffset()
{
return nextTermOffset;
}

/**
* Set the value which the next term offset for a fragment to be assembled should begin at.
*
* @param offset which the next term offset for a fragment to be assembled should begin at.
*/
public void nextTermOffset(final int offset)
{
nextTermOffset = offset;
}

/**
* The {@link MutableDirectBuffer} that encapsulates the internal buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;

import static io.aeron.logbuffer.FrameDescriptor.*;
import static io.aeron.protocol.DataHeaderFlyweight.HEADER_LENGTH;

/**
* A {@link ControlledFragmentHandler} that sits in a chain-of-responsibility pattern that reassembles fragmented
Expand Down Expand Up @@ -126,7 +128,9 @@ public Action onFragment(final DirectBuffer buffer, final int offset, final int
if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG)
{
final BufferBuilder builder = getBufferBuilder(header.sessionId());
builder.reset().append(buffer, offset, length);
builder.reset()
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));
}
else
{
Expand All @@ -136,22 +140,31 @@ public Action onFragment(final DirectBuffer buffer, final int offset, final int
final int limit = builder.limit();
if (limit > 0)
{
builder.append(buffer, offset, length);

if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
if (offset == builder.nextTermOffset())
{
final int msgLength = builder.limit();
action = delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));

if (Action.ABORT == action)
{
builder.limit(limit);
}
else
if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
builder.reset();
final int msgLength = builder.limit();
action = delegate.onFragment(builder.buffer(), 0, msgLength, header);

if (Action.ABORT == action)
{
builder.limit(limit);
}
else
{
builder.reset();
}
}
}
else
{
builder.reset();
}
}
}
}
Expand Down
25 changes: 19 additions & 6 deletions aeron-client/src/main/java/io/aeron/FragmentAssembler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;

import static io.aeron.logbuffer.FrameDescriptor.*;
import static io.aeron.protocol.DataHeaderFlyweight.HEADER_LENGTH;

/**
* A {@link FragmentHandler} that sits in a chain-of-responsibility pattern that reassembles fragmented messages
Expand Down Expand Up @@ -129,19 +131,30 @@ private void handleFragment(
if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG)
{
final BufferBuilder builder = getBufferBuilder(header.sessionId());
builder.reset().append(buffer, offset, length);
builder.reset()
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));
}
else
{
final BufferBuilder builder = builderBySessionIdMap.get(header.sessionId());
if (null != builder && builder.limit() > 0)
{
builder.append(buffer, offset, length);

if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
if (offset == builder.nextTermOffset())
{
builder
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));

if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
final int msgLength = builder.limit();
delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder.reset();
}
}
else
{
final int msgLength = builder.limit();
delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder.reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;

import static io.aeron.logbuffer.FrameDescriptor.*;
import static io.aeron.protocol.DataHeaderFlyweight.HEADER_LENGTH;

/**
* A {@link ControlledFragmentHandler} that sits in a chain-of-responsibility pattern that reassembles fragmented
Expand Down Expand Up @@ -116,28 +118,40 @@ public Action onFragment(final DirectBuffer buffer, final int offset, final int
{
if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG)
{
builder.reset().append(buffer, offset, length);
builder.reset()
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));
}
else
{
final int limit = builder.limit();
if (limit > 0)
{
builder.append(buffer, offset, length);

if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
if (offset == builder.nextTermOffset())
{
action = delegate.onFragment(builder.buffer(), 0, builder.limit(), header);
builder
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));

if (Action.ABORT == action)
{
builder.limit(limit);
}
else
if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
builder.reset();
final int msgLength = builder.limit();
action = delegate.onFragment(builder.buffer(), 0, msgLength, header);

if (Action.ABORT == action)
{
builder.limit(limit);
}
else
{
builder.reset();
}
}
}
else
{
builder.reset();
}
}
}
}
Expand Down
23 changes: 18 additions & 5 deletions aeron-client/src/main/java/io/aeron/ImageFragmentAssembler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;

import static io.aeron.logbuffer.FrameDescriptor.*;
import static io.aeron.protocol.DataHeaderFlyweight.HEADER_LENGTH;

/**
* A {@link FragmentHandler} that sits in a chain-of-responsibility pattern that reassembles fragmented messages
Expand Down Expand Up @@ -119,16 +121,27 @@ private void handleFragment(
{
if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG)
{
builder.reset().append(buffer, offset, length);
builder.reset()
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));
}
else if (builder.limit() > 0)
{
builder.append(buffer, offset, length);
if (offset == builder.nextTermOffset())
{
builder
.append(buffer, offset, length)
.nextTermOffset(BitUtil.align(offset + length + HEADER_LENGTH, FRAME_ALIGNMENT));

if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
final int msgLength = builder.limit();
delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder.reset();
}
}
else
{
final int msgLength = builder.limit();
delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder.reset();
}
}
Expand Down
26 changes: 13 additions & 13 deletions aeron-client/src/test/java/io/aeron/BufferBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,28 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class BufferBuilderTest
class BufferBuilderTest
{
private final BufferBuilder bufferBuilder = new BufferBuilder();

@Test
public void shouldFindMaxCapacityWhenRequested()
void shouldFindMaxCapacityWhenRequested()
{
assertEquals(
BufferBuilder.MAX_CAPACITY,
BufferBuilder.findSuitableCapacity(0, BufferBuilder.MAX_CAPACITY));
}

@Test
public void shouldInitialiseToDefaultValues()
void shouldInitialiseToDefaultValues()
{
assertEquals(0, bufferBuilder.capacity());
assertEquals(0, bufferBuilder.buffer().capacity());
assertEquals(0, bufferBuilder.limit());
}

@Test
public void shouldGrowDirectBuffer()
void shouldGrowDirectBuffer()
{
final BufferBuilder builder = new BufferBuilder(0, true);
assertEquals(0, builder.capacity());
Expand All @@ -67,7 +67,7 @@ public void shouldGrowDirectBuffer()
}

@Test
public void shouldAppendNothingForZeroLength()
void shouldAppendNothingForZeroLength()
{
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[INIT_MIN_CAPACITY]);

Expand All @@ -77,7 +77,7 @@ public void shouldAppendNothingForZeroLength()
}

@Test
public void shouldGrowToMultipleOfInitialCapacity()
void shouldGrowToMultipleOfInitialCapacity()
{
final int srcCapacity = INIT_MIN_CAPACITY * 5;
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[srcCapacity]);
Expand All @@ -89,7 +89,7 @@ public void shouldGrowToMultipleOfInitialCapacity()
}

@Test
public void shouldAppendThenReset()
void shouldAppendThenReset()
{
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[INIT_MIN_CAPACITY]);

Expand All @@ -103,7 +103,7 @@ public void shouldAppendThenReset()
}

@Test
public void shouldAppendOneBufferWithoutResizing()
void shouldAppendOneBufferWithoutResizing()
{
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[INIT_MIN_CAPACITY]);
final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8);
Expand All @@ -120,7 +120,7 @@ public void shouldAppendOneBufferWithoutResizing()
}

@Test
public void shouldAppendTwoBuffersWithoutResizing()
void shouldAppendTwoBuffersWithoutResizing()
{
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[INIT_MIN_CAPACITY]);
final byte[] bytes = "1111111122222222".getBytes(StandardCharsets.UTF_8);
Expand All @@ -138,7 +138,7 @@ public void shouldAppendTwoBuffersWithoutResizing()
}

@Test
public void shouldFillBufferWithoutResizing()
void shouldFillBufferWithoutResizing()
{
final int bufferLength = 128;
final byte[] buffer = new byte[bufferLength];
Expand All @@ -158,7 +158,7 @@ public void shouldFillBufferWithoutResizing()
}

@Test
public void shouldResizeWhenBufferJustDoesNotFit()
void shouldResizeWhenBufferJustDoesNotFit()
{
final int bufferLength = 128;
final byte[] buffer = new byte[bufferLength + 1];
Expand All @@ -178,7 +178,7 @@ public void shouldResizeWhenBufferJustDoesNotFit()
}

@Test
public void shouldAppendTwoBuffersAndResize()
void shouldAppendTwoBuffersAndResize()
{
final int bufferLength = 128;
final byte[] buffer = new byte[bufferLength];
Expand All @@ -201,7 +201,7 @@ public void shouldAppendTwoBuffersAndResize()
}

@Test
public void shouldCompactBufferToLowerLimit()
void shouldCompactBufferToLowerLimit()
{
final int bufferLength = INIT_MIN_CAPACITY / 2;
final byte[] buffer = new byte[bufferLength];
Expand Down
Loading

0 comments on commit 54e8c73

Please sign in to comment.