diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java b/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java index d5e03a301f..32f854359f 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java @@ -28,12 +28,16 @@ import uk.co.real_logic.artio.messages.*; import uk.co.real_logic.artio.messages.ControlNotificationEncoder.SessionsEncoder; +import java.lang.reflect.Field; import java.util.List; +import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; import static io.aeron.protocol.DataHeaderFlyweight.BEGIN_FLAG; import static io.aeron.protocol.DataHeaderFlyweight.END_FLAG; import static java.nio.ByteOrder.LITTLE_ENDIAN; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.agrona.BitUtil.align; +import static org.agrona.UnsafeAccess.UNSAFE; import static uk.co.real_logic.artio.DebugLogger.logSbeMessage; import static uk.co.real_logic.artio.LogTag.*; import static uk.co.real_logic.artio.messages.ErrorDecoder.messageHeaderLength; @@ -79,6 +83,21 @@ public class GatewayPublication extends ClaimablePublication private static final int END_OF_DAY_LENGTH = HEADER_LENGTH + EndOfDayEncoder.BLOCK_LENGTH; + + private static final long TERM_OFFSET_OFFSET; + static + { + try + { + final Field termOffsetField = ExclusivePublication.class.getDeclaredField("termOffset"); + TERM_OFFSET_OFFSET = UNSAFE.objectFieldOffset(termOffsetField); + } + catch (final NoSuchFieldException e) + { + throw new Error(e); + } + } + private final ManageSessionEncoder manageSessionEncoder = new ManageSessionEncoder(); private final InitiateConnectionEncoder initiateConnection = new InitiateConnectionEncoder(); private final RequestDisconnectEncoder requestDisconnect = new RequestDisconnectEncoder(); @@ -168,6 +187,30 @@ public long saveMessage( int srcFragmentLength = fragmented ? maxInitialBodyLength : srcLength; int srcFragmentOffset = srcOffset; + if (fragmented) + { + // Add a padding message at the end of the term buffer if needed. + final int length = framedLength; + final int numMaxPayloads = length / maxPayloadLength; + final int remainingPayload = length % maxPayloadLength; + final int lastFrameLength = remainingPayload > 0 ? + align(remainingPayload + HEADER_LENGTH, FRAME_ALIGNMENT) : 0; + final int requiredLength = (numMaxPayloads * (maxPayloadLength + HEADER_LENGTH)) + lastFrameLength; + final int termLength = dataPublication.termBufferLength(); + final int termOffset = UNSAFE.getInt(dataPublication, TERM_OFFSET_OFFSET); + final int resultingOffset = termOffset + requiredLength; + + if (resultingOffset > termLength) + { + final long paddingPosition = dataPublication.appendPadding(termLength - resultingOffset); + if (paddingPosition < 0) + { + System.out.println("PADDING FAILED"); + return paddingPosition; + } + } + } + long position = claim(claimLength); if (position < 0) {