2222import org .apache .logging .log4j .LogManager ;
2323import org .apache .logging .log4j .Logger ;
2424import org .apache .logging .log4j .message .ParameterizedMessage ;
25+ import org .elasticsearch .Build ;
2526import org .elasticsearch .Version ;
2627import org .elasticsearch .action .ActionListener ;
2728import org .elasticsearch .action .ActionListenerResponseHandler ;
3738import org .elasticsearch .common .io .stream .StreamOutput ;
3839import org .elasticsearch .common .io .stream .Writeable ;
3940import org .elasticsearch .common .lease .Releasable ;
41+ import org .elasticsearch .common .logging .DeprecationLogger ;
4042import org .elasticsearch .common .logging .Loggers ;
4143import org .elasticsearch .common .regex .Regex ;
4244import org .elasticsearch .common .settings .ClusterSettings ;
7375import java .util .function .Predicate ;
7476import java .util .function .Supplier ;
7577
76- public class TransportService extends AbstractLifecycleComponent implements ReportingService <TransportInfo >, TransportMessageListener ,
77- TransportConnectionListener {
78+ public class TransportService extends AbstractLifecycleComponent
79+ implements ReportingService <TransportInfo >, TransportMessageListener , TransportConnectionListener {
80+
7881 private static final Logger logger = LogManager .getLogger (TransportService .class );
7982
83+ private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds" ;
84+ private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS
85+ = Boolean .parseBoolean (System .getProperty (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY , "false" ));
86+
8087 public static final String DIRECT_RESPONSE_PROFILE = ".direct" ;
8188 public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake" ;
8289
@@ -115,6 +122,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
115122 private final RemoteClusterService remoteClusterService ;
116123
117124 private final boolean validateConnections ;
125+ private final boolean requireCompatibleBuild ;
118126
119127 /** if set will call requests sent to this id to shortcut and executed locally */
120128 volatile DiscoveryNode localNode = null ;
@@ -160,9 +168,15 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
160168 public TransportService (Settings settings , Transport transport , ThreadPool threadPool , TransportInterceptor transportInterceptor ,
161169 Function <BoundTransportAddress , DiscoveryNode > localNodeFactory , @ Nullable ClusterSettings clusterSettings ,
162170 Set <String > taskHeaders , ConnectionManager connectionManager ) {
171+
172+ final boolean isTransportClient = TransportClient .CLIENT_TYPE .equals (settings .get (Client .CLIENT_TYPE_SETTING_S .getKey ()));
173+
174+ // If we are a transport client then we skip the check that the remote node has a compatible build hash
175+ this .requireCompatibleBuild = isTransportClient == false ;
176+
163177 // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
164- this .validateConnections = TransportClient . CLIENT_TYPE . equals ( settings . get ( Client . CLIENT_TYPE_SETTING_S . getKey ())) == false ||
165- TransportClient . CLIENT_TRANSPORT_SNIFF . get ( settings );
178+ this .validateConnections = isTransportClient == false || TransportClient . CLIENT_TRANSPORT_SNIFF . get ( settings );
179+
166180 this .transport = transport ;
167181 transport .setSlowLogThreshold (TransportSettings .SLOW_OPERATION_THRESHOLD_SETTING .get (settings ));
168182 this .threadPool = threadPool ;
@@ -192,7 +206,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
192206 false , false ,
193207 HandshakeRequest ::new ,
194208 (request , channel , task ) -> channel .sendResponse (
195- new HandshakeResponse (localNode , clusterName , localNode .getVersion ())));
209+ new HandshakeResponse (localNode .getVersion (), Build .CURRENT .hash (), localNode , clusterName )));
210+
211+ if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS ) {
212+ logger .warn ("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
213+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning" );
214+ DeprecationLogger .getLogger (TransportService .class ).deprecate ("permit_handshake_from_incompatible_builds" ,
215+ "system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed" );
216+ }
196217 }
197218
198219 public RemoteClusterService getRemoteClusterService () {
@@ -482,7 +503,7 @@ public void onFailure(Exception e) {
482503 listener .onFailure (e );
483504 }
484505 }
485- , HandshakeResponse :: new , ThreadPool .Names .GENERIC
506+ , in -> new HandshakeResponse ( in , requireCompatibleBuild ) , ThreadPool .Names .GENERIC
486507 ));
487508 }
488509
@@ -504,28 +525,89 @@ private HandshakeRequest() {
504525 }
505526
506527 public static class HandshakeResponse extends TransportResponse {
528+
529+ private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version .V_7_11_0 ;
530+
531+ private final Version version ;
532+
533+ @ Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION
534+ private final String buildHash ;
535+
507536 private final DiscoveryNode discoveryNode ;
537+
508538 private final ClusterName clusterName ;
509- private final Version version ;
510539
511- public HandshakeResponse (DiscoveryNode discoveryNode , ClusterName clusterName , Version version ) {
512- this .discoveryNode = discoveryNode ;
513- this .version = version ;
514- this .clusterName = clusterName ;
540+ public HandshakeResponse (Version version , String buildHash , DiscoveryNode discoveryNode , ClusterName clusterName ) {
541+ this .buildHash = Objects .requireNonNull (buildHash );
542+ this .discoveryNode = Objects .requireNonNull (discoveryNode );
543+ this .version = Objects .requireNonNull (version );
544+ this .clusterName = Objects .requireNonNull (clusterName );
515545 }
516546
517- public HandshakeResponse (StreamInput in ) throws IOException {
547+ public HandshakeResponse (StreamInput in , boolean requireCompatibleBuild ) throws IOException {
518548 super (in );
519- discoveryNode = in .readOptionalWriteable (DiscoveryNode ::new );
520- clusterName = new ClusterName (in );
521- version = Version .readVersion (in );
549+ if (in .getVersion ().onOrAfter (BUILD_HASH_HANDSHAKE_VERSION )) {
550+ // the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear
551+ // on the wire as we expect them to even if this turns out to be an incompatible build
552+ version = Version .readVersion (in );
553+ buildHash = in .readString ();
554+
555+ try {
556+ // If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception
557+ // message, but recognise that this may fail
558+ discoveryNode = new DiscoveryNode (in );
559+ } catch (Exception e ) {
560+ if (isIncompatibleBuild (version , buildHash , requireCompatibleBuild )) {
561+ throw new IllegalArgumentException ("unidentifiable remote node is build [" + buildHash +
562+ "] of version [" + version + "] but this node is build [" + Build .CURRENT .hash () +
563+ "] of version [" + Version .CURRENT + "] which has an incompatible wire format" , e );
564+ } else {
565+ throw e ;
566+ }
567+ }
568+
569+ if (isIncompatibleBuild (version , buildHash , requireCompatibleBuild )) {
570+ if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS ) {
571+ logger .warn ("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " +
572+ "which may not be compatible; remove system property [{}] to resolve this warning" ,
573+ discoveryNode , buildHash , version , Build .CURRENT .hash (), Version .CURRENT ,
574+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY );
575+ } else {
576+ throw new IllegalArgumentException ("remote node [" + discoveryNode + "] is build [" + buildHash +
577+ "] of version [" + version + "] but this node is build [" + Build .CURRENT .hash () +
578+ "] of version [" + Version .CURRENT + "] which has an incompatible wire format" );
579+ }
580+ }
581+
582+ clusterName = new ClusterName (in );
583+ } else {
584+ discoveryNode = in .readOptionalWriteable (DiscoveryNode ::new );
585+ clusterName = new ClusterName (in );
586+ version = Version .readVersion (in );
587+ buildHash = null ;
588+ }
522589 }
523590
524591 @ Override
525592 public void writeTo (StreamOutput out ) throws IOException {
526- out .writeOptionalWriteable (discoveryNode );
527- clusterName .writeTo (out );
528- Version .writeVersion (version , out );
593+ if (out .getVersion ().onOrAfter (BUILD_HASH_HANDSHAKE_VERSION )) {
594+ Version .writeVersion (version , out );
595+ out .writeString (buildHash );
596+ discoveryNode .writeTo (out );
597+ clusterName .writeTo (out );
598+ } else {
599+ out .writeOptionalWriteable (discoveryNode );
600+ clusterName .writeTo (out );
601+ Version .writeVersion (version , out );
602+ }
603+ }
604+
605+ public Version getVersion () {
606+ return version ;
607+ }
608+
609+ public String getBuildHash () {
610+ return buildHash ;
529611 }
530612
531613 public DiscoveryNode getDiscoveryNode () {
@@ -535,6 +617,10 @@ public DiscoveryNode getDiscoveryNode() {
535617 public ClusterName getClusterName () {
536618 return clusterName ;
537619 }
620+
621+ private static boolean isIncompatibleBuild (Version version , String buildHash , boolean requireCompatibleBuild ) {
622+ return requireCompatibleBuild && version == Version .CURRENT && Build .CURRENT .hash ().equals (buildHash ) == false ;
623+ }
538624 }
539625
540626 public void disconnectFromNode (DiscoveryNode node ) {
0 commit comments