Skip to content

Commit

Permalink
fix term padding at the end of term buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardWarburton committed Nov 12, 2019
1 parent 1ffe03f commit 75b9fde
Showing 1 changed file with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import uk.co.real_logic.artio.messages.*;
import uk.co.real_logic.artio.messages.ControlNotificationEncoder.SessionsEncoder;

import java.lang.reflect.Field;
import java.util.List;

import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT;
import static io.aeron.protocol.DataHeaderFlyweight.BEGIN_FLAG;
import static io.aeron.protocol.DataHeaderFlyweight.END_FLAG;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.agrona.BitUtil.align;
import static org.agrona.UnsafeAccess.UNSAFE;
import static uk.co.real_logic.artio.DebugLogger.logSbeMessage;
import static uk.co.real_logic.artio.LogTag.*;
import static uk.co.real_logic.artio.messages.ErrorDecoder.messageHeaderLength;
Expand Down Expand Up @@ -79,6 +83,21 @@ public class GatewayPublication extends ClaimablePublication
private static final int END_OF_DAY_LENGTH =
HEADER_LENGTH + EndOfDayEncoder.BLOCK_LENGTH;


private static final long TERM_OFFSET_OFFSET;
static
{
try
{
final Field termOffsetField = ExclusivePublication.class.getDeclaredField("termOffset");
TERM_OFFSET_OFFSET = UNSAFE.objectFieldOffset(termOffsetField);
}
catch (final NoSuchFieldException e)
{
throw new Error(e);
}
}

private final ManageSessionEncoder manageSessionEncoder = new ManageSessionEncoder();
private final InitiateConnectionEncoder initiateConnection = new InitiateConnectionEncoder();
private final RequestDisconnectEncoder requestDisconnect = new RequestDisconnectEncoder();
Expand Down Expand Up @@ -168,6 +187,30 @@ public long saveMessage(
int srcFragmentLength = fragmented ? maxInitialBodyLength : srcLength;
int srcFragmentOffset = srcOffset;

if (fragmented)
{
// Add a padding message at the end of the term buffer if needed.
final int length = framedLength;
final int numMaxPayloads = length / maxPayloadLength;
final int remainingPayload = length % maxPayloadLength;
final int lastFrameLength = remainingPayload > 0 ?
align(remainingPayload + HEADER_LENGTH, FRAME_ALIGNMENT) : 0;
final int requiredLength = (numMaxPayloads * (maxPayloadLength + HEADER_LENGTH)) + lastFrameLength;
final int termLength = dataPublication.termBufferLength();
final int termOffset = UNSAFE.getInt(dataPublication, TERM_OFFSET_OFFSET);
final int resultingOffset = termOffset + requiredLength;

if (resultingOffset > termLength)
{
final long paddingPosition = dataPublication.appendPadding(termLength - resultingOffset);
if (paddingPosition < 0)
{
System.out.println("PADDING FAILED");
return paddingPosition;
}
}
}

long position = claim(claimLength);
if (position < 0)
{
Expand Down

0 comments on commit 75b9fde

Please sign in to comment.