@@ -184,8 +184,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
184184 public static final Setting .AffixSetting <Integer > PUBLISH_PORT_PROFILE = affixKeySetting ("transport.profiles." , "publish_port" ,
185185 key -> intSetting (key , -1 , -1 , Setting .Property .NodeScope ));
186186
187- private static final long NINETY_PER_HEAP_SIZE = (long ) (JvmInfo .jvmInfo ().getMem ().getHeapMax ().getBytes () * 0.9 );
187+ // This is the number of bytes necessary to read the message size
188+ public static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader .MARKER_BYTES_SIZE + TcpHeader .MESSAGE_LENGTH_SIZE ;
188189 public static final int PING_DATA_SIZE = -1 ;
190+ private static final long NINETY_PER_HEAP_SIZE = (long ) (JvmInfo .jvmInfo ().getMem ().getHeapMax ().getBytes () * 0.9 );
191+ private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray (new byte [0 ]);
192+
189193 private final CircuitBreakerService circuitBreakerService ;
190194 // package visibility for tests
191195 protected final ScheduledPing scheduledPing ;
@@ -317,8 +321,7 @@ public String executor() {
317321 public class ScheduledPing extends AbstractLifecycleRunnable {
318322
319323 /**
320- * The magic number (must be lower than 0) for a ping message. This is handled
321- * specifically in {@link TcpTransport#validateMessageHeader}.
324+ * The magic number (must be lower than 0) for a ping message.
322325 */
323326 private final BytesReference pingHeader ;
324327 final CounterMetric successfulPings = new CounterMetric ();
@@ -1210,7 +1213,7 @@ private void sendResponse(Version nodeVersion, TcpChannel channel, final Transpo
12101213 * @param length the payload length in bytes
12111214 * @see TcpHeader
12121215 */
1213- final BytesReference buildHeader (long requestId , byte status , Version protocolVersion , int length ) throws IOException {
1216+ private BytesReference buildHeader (long requestId , byte status , Version protocolVersion , int length ) throws IOException {
12141217 try (BytesStreamOutput headerOutput = new BytesStreamOutput (TcpHeader .HEADER_SIZE )) {
12151218 headerOutput .setVersion (protocolVersion );
12161219 TcpHeader .writeHeader (headerOutput , requestId , status , protocolVersion , length );
@@ -1247,76 +1250,135 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer
12471250 }
12481251
12491252 /**
1250- * Validates the first N bytes of the message header and returns <code>false</code> if the message is
1251- * a ping message and has no payload ie. isn't a real user level message .
1253+ * Consumes bytes that are available from network reads. This method returns the number of bytes consumed
1254+ * in this call .
12521255 *
1253- * @throws IllegalStateException if the message is too short, less than the header or less that the header plus the message size
1254- * @throws HttpOnTransportException if the message has no valid header and appears to be a HTTP message
1255- * @throws IllegalArgumentException if the message is greater that the maximum allowed frame size. This is dependent on the available
1256- * memory.
1256+ * @param channel the channel read from
1257+ * @param bytesReference the bytes available to consume
1258+ * @return the number of bytes consumed
1259+ * @throws StreamCorruptedException if the message header format is not recognized
1260+ * @throws TcpTransport.HttpOnTransportException if the message header appears to be a HTTP message
1261+ * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size.
1262+ * This is dependent on the available memory.
12571263 */
1258- public static boolean validateMessageHeader (BytesReference buffer ) throws IOException {
1259- final int sizeHeaderLength = TcpHeader .MARKER_BYTES_SIZE + TcpHeader .MESSAGE_LENGTH_SIZE ;
1260- if (buffer .length () < sizeHeaderLength ) {
1261- throw new IllegalStateException ("message size must be >= to the header size" );
1262- }
1263- int offset = 0 ;
1264- if (buffer .get (offset ) != 'E' || buffer .get (offset + 1 ) != 'S' ) {
1265- // special handling for what is probably HTTP
1266- if (bufferStartsWith (buffer , offset , "GET " ) ||
1267- bufferStartsWith (buffer , offset , "POST " ) ||
1268- bufferStartsWith (buffer , offset , "PUT " ) ||
1269- bufferStartsWith (buffer , offset , "HEAD " ) ||
1270- bufferStartsWith (buffer , offset , "DELETE " ) ||
1271- bufferStartsWith (buffer , offset , "OPTIONS " ) ||
1272- bufferStartsWith (buffer , offset , "PATCH " ) ||
1273- bufferStartsWith (buffer , offset , "TRACE " )) {
1274-
1275- throw new HttpOnTransportException ("This is not a HTTP port" );
1264+ public int consumeNetworkReads (TcpChannel channel , BytesReference bytesReference ) throws IOException {
1265+ BytesReference message = decodeFrame (bytesReference );
1266+
1267+ if (message == null ) {
1268+ return 0 ;
1269+ } else if (message .length () == 0 ) {
1270+ // This is a ping and should not be handled.
1271+ return BYTES_NEEDED_FOR_MESSAGE_SIZE ;
1272+ } else {
1273+ try {
1274+ messageReceived (message , channel );
1275+ } catch (Exception e ) {
1276+ onException (channel , e );
12761277 }
1278+ return message .length () + BYTES_NEEDED_FOR_MESSAGE_SIZE ;
1279+ }
1280+ }
12771281
1278- // we have 6 readable bytes, show 4 (should be enough)
1279- throw new StreamCorruptedException ("invalid internal transport message format, got ("
1280- + Integer .toHexString (buffer .get (offset ) & 0xFF ) + ","
1281- + Integer .toHexString (buffer .get (offset + 1 ) & 0xFF ) + ","
1282- + Integer .toHexString (buffer .get (offset + 2 ) & 0xFF ) + ","
1283- + Integer .toHexString (buffer .get (offset + 3 ) & 0xFF ) + ")" );
1282+ /**
1283+ * Attempts to a decode a message from the provided bytes. If a full message is not available, null is
1284+ * returned. If the message is a ping, an empty {@link BytesReference} will be returned.
1285+ *
1286+ * @param networkBytes the will be read
1287+ * @return the message decoded
1288+ * @throws StreamCorruptedException if the message header format is not recognized
1289+ * @throws TcpTransport.HttpOnTransportException if the message header appears to be a HTTP message
1290+ * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size.
1291+ * This is dependent on the available memory.
1292+ */
1293+ public static BytesReference decodeFrame (BytesReference networkBytes ) throws IOException {
1294+ int messageLength = readMessageLength (networkBytes );
1295+ if (messageLength == -1 ) {
1296+ return null ;
1297+ } else {
1298+ int totalLength = messageLength + BYTES_NEEDED_FOR_MESSAGE_SIZE ;
1299+ if (totalLength > networkBytes .length ()) {
1300+ return null ;
1301+ } else if (totalLength == 6 ) {
1302+ return EMPTY_BYTES_REFERENCE ;
1303+ } else {
1304+ return networkBytes .slice (BYTES_NEEDED_FOR_MESSAGE_SIZE , messageLength );
1305+ }
12841306 }
1307+ }
12851308
1286- final int dataLen ;
1287- try (StreamInput input = buffer .streamInput ()) {
1288- input .skip (TcpHeader .MARKER_BYTES_SIZE );
1289- dataLen = input .readInt ();
1290- if (dataLen == PING_DATA_SIZE ) {
1291- // discard the messages we read and continue, this is achieved by skipping the bytes
1292- // and returning null
1293- return false ;
1309+ /**
1310+ * Validates the first 6 bytes of the message header and returns the length of the message. If 6 bytes
1311+ * are not available, it returns -1.
1312+ *
1313+ * @param networkBytes the will be read
1314+ * @return the length of the message
1315+ * @throws StreamCorruptedException if the message header format is not recognized
1316+ * @throws TcpTransport.HttpOnTransportException if the message header appears to be a HTTP message
1317+ * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size.
1318+ * This is dependent on the available memory.
1319+ */
1320+ public static int readMessageLength (BytesReference networkBytes ) throws IOException {
1321+ if (networkBytes .length () < BYTES_NEEDED_FOR_MESSAGE_SIZE ) {
1322+ return -1 ;
1323+ } else {
1324+ return readHeaderBuffer (networkBytes );
1325+ }
1326+ }
1327+
1328+ private static int readHeaderBuffer (BytesReference headerBuffer ) throws IOException {
1329+ if (headerBuffer .get (0 ) != 'E' || headerBuffer .get (1 ) != 'S' ) {
1330+ if (appearsToBeHTTP (headerBuffer )) {
1331+ throw new TcpTransport .HttpOnTransportException ("This is not a HTTP port" );
12941332 }
1333+
1334+ throw new StreamCorruptedException ("invalid internal transport message format, got ("
1335+ + Integer .toHexString (headerBuffer .get (0 ) & 0xFF ) + ","
1336+ + Integer .toHexString (headerBuffer .get (1 ) & 0xFF ) + ","
1337+ + Integer .toHexString (headerBuffer .get (2 ) & 0xFF ) + ","
1338+ + Integer .toHexString (headerBuffer .get (3 ) & 0xFF ) + ")" );
1339+ }
1340+ final int messageLength ;
1341+ try (StreamInput input = headerBuffer .streamInput ()) {
1342+ input .skip (TcpHeader .MARKER_BYTES_SIZE );
1343+ messageLength = input .readInt ();
12951344 }
12961345
1297- if (dataLen <= 0 ) {
1298- throw new StreamCorruptedException ("invalid data length: " + dataLen );
1346+ if (messageLength == TcpTransport .PING_DATA_SIZE ) {
1347+ // This is a ping
1348+ return 0 ;
12991349 }
1300- // safety against too large frames being sent
1301- if (dataLen > NINETY_PER_HEAP_SIZE ) {
1302- throw new IllegalArgumentException ("transport content length received [" + new ByteSizeValue (dataLen ) + "] exceeded ["
1303- + new ByteSizeValue (NINETY_PER_HEAP_SIZE ) + "]" );
1350+
1351+ if (messageLength <= 0 ) {
1352+ throw new StreamCorruptedException ("invalid data length: " + messageLength );
13041353 }
13051354
1306- if (buffer .length () < dataLen + sizeHeaderLength ) {
1307- throw new IllegalStateException ("buffer must be >= to the message size but wasn't" );
1355+ if (messageLength > NINETY_PER_HEAP_SIZE ) {
1356+ throw new IllegalArgumentException ("transport content length received [" + new ByteSizeValue (messageLength ) + "] exceeded ["
1357+ + new ByteSizeValue (NINETY_PER_HEAP_SIZE ) + "]" );
13081358 }
1309- return true ;
1359+
1360+ return messageLength ;
1361+ }
1362+
1363+ private static boolean appearsToBeHTTP (BytesReference headerBuffer ) {
1364+ return bufferStartsWith (headerBuffer , "GET" ) ||
1365+ bufferStartsWith (headerBuffer , "POST" ) ||
1366+ bufferStartsWith (headerBuffer , "PUT" ) ||
1367+ bufferStartsWith (headerBuffer , "HEAD" ) ||
1368+ bufferStartsWith (headerBuffer , "DELETE" ) ||
1369+ // Actually 'OPTIONS'. But we are only guaranteed to have read six bytes at this point.
1370+ bufferStartsWith (headerBuffer , "OPTION" ) ||
1371+ bufferStartsWith (headerBuffer , "PATCH" ) ||
1372+ bufferStartsWith (headerBuffer , "TRACE" );
13101373 }
13111374
1312- private static boolean bufferStartsWith (BytesReference buffer , int offset , String method ) {
1375+ private static boolean bufferStartsWith (BytesReference buffer , String method ) {
13131376 char [] chars = method .toCharArray ();
13141377 for (int i = 0 ; i < chars .length ; i ++) {
1315- if (buffer .get (offset + i ) != chars [i ]) {
1378+ if (buffer .get (i ) != chars [i ]) {
13161379 return false ;
13171380 }
13181381 }
1319-
13201382 return true ;
13211383 }
13221384
@@ -1343,8 +1405,10 @@ public HttpOnTransportException(StreamInput in) throws IOException {
13431405 /**
13441406 * This method handles the message receive part for both request and responses
13451407 */
1346- public final void messageReceived (BytesReference reference , TcpChannel channel , String profileName ,
1347- InetSocketAddress remoteAddress , int messageLengthBytes ) throws IOException {
1408+ public final void messageReceived (BytesReference reference , TcpChannel channel ) throws IOException {
1409+ String profileName = channel .getProfile ();
1410+ InetSocketAddress remoteAddress = channel .getRemoteAddress ();
1411+ int messageLengthBytes = reference .length ();
13481412 final int totalMessageSize = messageLengthBytes + TcpHeader .MARKER_BYTES_SIZE + TcpHeader .MESSAGE_LENGTH_SIZE ;
13491413 readBytesMetric .inc (totalMessageSize );
13501414 // we have additional bytes to read, outside of the header
0 commit comments