@@ -357,37 +357,35 @@ public void handleException(TransportException exp) {
357357 });
358358 } else {
359359 setPhase (replicationTask , "primary" );
360- createReplicatedOperation (primaryRequest .getRequest (),
361- ActionListener .wrap (result -> result .respond (
362- new ActionListener <>() {
363- @ Override
364- public void onResponse (Response response ) {
365- if (syncGlobalCheckpointAfterOperation ) {
366- final IndexShard shard = primaryShardReference .indexShard ;
367- try {
368- shard .maybeSyncGlobalCheckpoint ("post-operation" );
369- } catch (final Exception e ) {
370- // only log non-closed exceptions
371- if (ExceptionsHelper .unwrap (
372- e , AlreadyClosedException .class , IndexShardClosedException .class ) == null ) {
373- // intentionally swallow, a missed global checkpoint sync should not fail this operation
374- logger .info (
375- new ParameterizedMessage (
376- "{} failed to execute post-operation global checkpoint sync" , shard .shardId ()), e );
377- }
378- }
379- }
380- primaryShardReference .close (); // release shard operation lock before responding to caller
381- setPhase (replicationTask , "finished" );
382- onCompletionListener .onResponse (response );
383- }
384360
385- @ Override
386- public void onFailure (Exception e ) {
387- handleException (primaryShardReference , e );
361+ final ActionListener <Response > referenceClosingListener = ActionListener .wrap (response -> {
362+ primaryShardReference .close (); // release shard operation lock before responding to caller
363+ setPhase (replicationTask , "finished" );
364+ onCompletionListener .onResponse (response );
365+ }, e -> handleException (primaryShardReference , e ));
366+
367+ final ActionListener <Response > globalCheckpointSyncingListener = ActionListener .wrap (response -> {
368+ if (syncGlobalCheckpointAfterOperation ) {
369+ final IndexShard shard = primaryShardReference .indexShard ;
370+ try {
371+ shard .maybeSyncGlobalCheckpoint ("post-operation" );
372+ } catch (final Exception e ) {
373+ // only log non-closed exceptions
374+ if (ExceptionsHelper .unwrap (
375+ e , AlreadyClosedException .class , IndexShardClosedException .class ) == null ) {
376+ // intentionally swallow, a missed global checkpoint sync should not fail this operation
377+ logger .info (
378+ new ParameterizedMessage (
379+ "{} failed to execute post-operation global checkpoint sync" , shard .shardId ()), e );
388380 }
389- }), e -> handleException (primaryShardReference , e )
390- ), primaryShardReference ).execute ();
381+ }
382+ }
383+ referenceClosingListener .onResponse (response );
384+ }, referenceClosingListener ::onFailure );
385+
386+ new ReplicationOperation <>(primaryRequest .getRequest (), primaryShardReference ,
387+ ActionListener .wrap (result -> result .respond (globalCheckpointSyncingListener ), referenceClosingListener ::onFailure ),
388+ newReplicasProxy (), logger , actionName , primaryRequest .getPrimaryTerm ()).execute ();
391389 }
392390 } catch (Exception e ) {
393391 handleException (primaryShardReference , e );
@@ -405,12 +403,6 @@ public void onFailure(Exception e) {
405403 onCompletionListener .onFailure (e );
406404 }
407405
408- protected ReplicationOperation <Request , ReplicaRequest , PrimaryResult <ReplicaRequest , Response >> createReplicatedOperation (
409- Request request , ActionListener <PrimaryResult <ReplicaRequest , Response >> listener ,
410- PrimaryShardReference primaryShardReference ) {
411- return new ReplicationOperation <>(request , primaryShardReference , listener ,
412- newReplicasProxy (), logger , actionName , primaryRequest .getPrimaryTerm ());
413- }
414406 }
415407
416408 public static class PrimaryResult <ReplicaRequest extends ReplicationRequest <ReplicaRequest >,
0 commit comments