62
62
import java .io .IOException ;
63
63
import java .util .HashMap ;
64
64
import java .util .Map ;
65
+ import java .util .concurrent .atomic .AtomicBoolean ;
65
66
import java .util .concurrent .atomic .AtomicLong ;
66
67
import java .util .concurrent .atomic .AtomicReference ;
67
68
import java .util .function .BiConsumer ;
@@ -97,6 +98,7 @@ public class PublicationTransportHandler {
97
98
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong ();
98
99
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong ();
99
100
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong ();
101
+ private final AtomicBoolean allNodesRemotePublicationEnabled = new AtomicBoolean ();
100
102
// -> no need to put a timeout on the options here, because we want the response to eventually be received
101
103
// and not log an error if it arrives after the timeout
102
104
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions .builder ()
@@ -332,11 +334,18 @@ public PublicationContext newPublicationContext(
332
334
boolean isRemotePublicationEnabled ,
333
335
PersistedStateRegistry persistedStateRegistry
334
336
) {
335
- final PublicationContext publicationContext = new PublicationContext (
336
- clusterChangedEvent ,
337
- isRemotePublicationEnabled ,
338
- persistedStateRegistry
339
- );
337
+ if (isRemotePublicationEnabled == true ) {
338
+ if (allNodesRemotePublicationEnabled .get () == false ) {
339
+ if (validateRemotePublicationOnAllNodes (clusterChangedEvent .state ().nodes ()) == true ) {
340
+ allNodesRemotePublicationEnabled .set (true );
341
+ }
342
+ }
343
+ if (allNodesRemotePublicationEnabled .get () == true ) {
344
+ // if all nodes are remote then create remote publication context
345
+ return new RemotePublicationContext (clusterChangedEvent , persistedStateRegistry );
346
+ }
347
+ }
348
+ final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent , persistedStateRegistry );
340
349
341
350
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
342
351
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -345,6 +354,17 @@ public PublicationContext newPublicationContext(
345
354
return publicationContext ;
346
355
}
347
356
357
+ private boolean validateRemotePublicationOnAllNodes (DiscoveryNodes discoveryNodes ) {
358
+ assert ClusterMetadataManifest .getCodecForVersion (discoveryNodes .getMinNodeVersion ()) >= ClusterMetadataManifest .CODEC_V0 ;
359
+ for (DiscoveryNode node : discoveryNodes .getNodes ().values ()) {
360
+ // if a node is non-remote then created local publication context
361
+ if (node .isRemoteStatePublicationEnabled () == false ) {
362
+ return false ;
363
+ }
364
+ }
365
+ return true ;
366
+ }
367
+
348
368
// package private for testing
349
369
void setCurrentPublishRequestToSelf (PublishRequest publishRequest ) {
350
370
this .currentPublishRequestToSelf .set (publishRequest );
@@ -385,25 +405,19 @@ private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff,
385
405
*/
386
406
public class PublicationContext {
387
407
388
- private final DiscoveryNodes discoveryNodes ;
389
- private final ClusterState newState ;
390
- private final ClusterState previousState ;
391
- private final boolean sendFullVersion ;
408
+ protected final DiscoveryNodes discoveryNodes ;
409
+ protected final ClusterState newState ;
410
+ protected final ClusterState previousState ;
411
+ protected final boolean sendFullVersion ;
392
412
private final Map <Version , BytesReference > serializedStates = new HashMap <>();
393
413
private final Map <Version , BytesReference > serializedDiffs = new HashMap <>();
394
- private final boolean sendRemoteState ;
395
- private final PersistedStateRegistry persistedStateRegistry ;
414
+ protected final PersistedStateRegistry persistedStateRegistry ;
396
415
397
- PublicationContext (
398
- ClusterChangedEvent clusterChangedEvent ,
399
- boolean isRemotePublicationEnabled ,
400
- PersistedStateRegistry persistedStateRegistry
401
- ) {
416
+ PublicationContext (ClusterChangedEvent clusterChangedEvent , PersistedStateRegistry persistedStateRegistry ) {
402
417
discoveryNodes = clusterChangedEvent .state ().nodes ();
403
418
newState = clusterChangedEvent .state ();
404
419
previousState = clusterChangedEvent .previousState ();
405
420
sendFullVersion = previousState .getBlocks ().disableStatePersistence ();
406
- sendRemoteState = isRemotePublicationEnabled ;
407
421
this .persistedStateRegistry = persistedStateRegistry ;
408
422
}
409
423
@@ -468,17 +482,7 @@ public void onFailure(Exception e) {
468
482
} else {
469
483
responseActionListener = listener ;
470
484
}
471
- // TODO Decide to send remote state before starting publication by checking remote publication on all nodes
472
- if (sendRemoteState && destination .isRemoteStatePublicationEnabled ()) {
473
- logger .trace ("sending remote cluster state version [{}] to [{}]" , newState .version (), destination );
474
- sendRemoteClusterState (destination , publishRequest .getAcceptedState (), responseActionListener );
475
- } else if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
476
- logger .trace ("sending full cluster state version [{}] to [{}]" , newState .version (), destination );
477
- sendFullClusterState (destination , responseActionListener );
478
- } else {
479
- logger .trace ("sending cluster state diff for version [{}] to [{}]" , newState .version (), destination );
480
- sendClusterStateDiff (destination , responseActionListener );
481
- }
485
+ sendClusterState (destination , responseActionListener );
482
486
}
483
487
484
488
public void sendApplyCommit (
@@ -517,58 +521,14 @@ public String executor() {
517
521
);
518
522
}
519
523
520
- private void sendRemoteClusterState (
521
- final DiscoveryNode destination ,
522
- final ClusterState clusterState ,
523
- final ActionListener <PublishWithJoinResponse > listener
524
- ) {
525
- try {
526
- final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE ))
527
- .getLastUploadedManifestFile ();
528
- final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
529
- discoveryNodes .getLocalNode (),
530
- clusterState .term (),
531
- clusterState .getVersion (),
532
- clusterState .getClusterName ().value (),
533
- clusterState .metadata ().clusterUUID (),
534
- manifestFileName
535
- );
536
- final Consumer <TransportException > transportExceptionHandler = exp -> {
537
- logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
538
- listener .onFailure (exp );
539
- };
540
- final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <>() {
541
-
542
- @ Override
543
- public PublishWithJoinResponse read (StreamInput in ) throws IOException {
544
- return new PublishWithJoinResponse (in );
545
- }
546
-
547
- @ Override
548
- public void handleResponse (PublishWithJoinResponse response ) {
549
- listener .onResponse (response );
550
- }
551
-
552
- @ Override
553
- public void handleException (TransportException exp ) {
554
- transportExceptionHandler .accept (exp );
555
- }
556
-
557
- @ Override
558
- public String executor () {
559
- return ThreadPool .Names .GENERIC ;
560
- }
561
- };
562
- transportService .sendRequest (
563
- destination ,
564
- PUBLISH_REMOTE_STATE_ACTION_NAME ,
565
- remotePublishRequest ,
566
- stateRequestOptions ,
567
- responseHandler
568
- );
569
- } catch (Exception e ) {
570
- logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
571
- listener .onFailure (e );
524
+ public void sendClusterState (DiscoveryNode destination , ActionListener <PublishWithJoinResponse > listener ) {
525
+ logger .info ("sending cluster state over transport to node: {}" , destination .getName ());
526
+ if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
527
+ logger .trace ("sending full cluster state version [{}] to [{}]" , newState .version (), destination );
528
+ sendFullClusterState (destination , listener );
529
+ } else {
530
+ logger .trace ("sending cluster state diff for version [{}] to [{}]" , newState .version (), destination );
531
+ sendClusterStateDiff (destination , listener );
572
532
}
573
533
}
574
534
@@ -648,4 +608,69 @@ public String executor() {
648
608
}
649
609
}
650
610
611
+ /**
612
+ * An extension of {@code PublicationContext} to support remote cluster state publication
613
+ *
614
+ * @opensearch.internal
615
+ */
616
+ public class RemotePublicationContext extends PublicationContext {
617
+
618
+ RemotePublicationContext (ClusterChangedEvent clusterChangedEvent , PersistedStateRegistry persistedStateRegistry ) {
619
+ super (clusterChangedEvent , persistedStateRegistry );
620
+ }
621
+
622
+ @ Override
623
+ public void sendClusterState (final DiscoveryNode destination , final ActionListener <PublishWithJoinResponse > listener ) {
624
+ try {
625
+ logger .info ("sending remote cluster state to node: {}" , destination .getName ());
626
+ final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE ))
627
+ .getLastUploadedManifestFile ();
628
+ final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
629
+ discoveryNodes .getLocalNode (),
630
+ newState .term (),
631
+ newState .getVersion (),
632
+ newState .getClusterName ().value (),
633
+ newState .metadata ().clusterUUID (),
634
+ manifestFileName
635
+ );
636
+ final Consumer <TransportException > transportExceptionHandler = exp -> {
637
+ logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
638
+ listener .onFailure (exp );
639
+ };
640
+ final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <>() {
641
+
642
+ @ Override
643
+ public PublishWithJoinResponse read (StreamInput in ) throws IOException {
644
+ return new PublishWithJoinResponse (in );
645
+ }
646
+
647
+ @ Override
648
+ public void handleResponse (PublishWithJoinResponse response ) {
649
+ listener .onResponse (response );
650
+ }
651
+
652
+ @ Override
653
+ public void handleException (TransportException exp ) {
654
+ transportExceptionHandler .accept (exp );
655
+ }
656
+
657
+ @ Override
658
+ public String executor () {
659
+ return ThreadPool .Names .GENERIC ;
660
+ }
661
+ };
662
+ transportService .sendRequest (
663
+ destination ,
664
+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
665
+ remotePublishRequest ,
666
+ stateRequestOptions ,
667
+ responseHandler
668
+ );
669
+ } catch (Exception e ) {
670
+ logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
671
+ listener .onFailure (e );
672
+ }
673
+ }
674
+ }
675
+
651
676
}
0 commit comments